Collecting results from goroutines

blog
Published

December 4, 2024

Let’s say that you are facing a problem where you have multiple jobs that need to be run, then you need to collect the results. Easy peasy, you write the code:

func job(x int) int {
    return x + 1
}

func main() {
    var results []int

    for i := 0; i < 10; i++ {
        results = append(results, job(i))
    }

    fmt.Println(results)
}

So far, so good. But wait, there’s an additional requirement for the jobs to run in parallel, because they are computationally intensive (your company pivots to AI, so you would be asking LLM to do the addition). You adapt your code:

func main() {
    var results []int

    for i := 0; i < 10; i++ {
+        go func() {
            results = append(results, job(i))
+        }()
    }

    fmt.Println(results)
}

Damn, it didn’t work! You are spawning the jobs, but you don’t wait long enough to collect the results, so you end up with empty results slice. To make sure that we have all the results, let’s wait a bit:

func main() {
    var results []int

    for i := 0; i < 10; i++ {
        go func() {
            results = append(results, job(i))
        }()
    }

+   time.Sleep(5 * time.Second)
    fmt.Println(results)
}

Now it takes longer, but it does not return the correct results! It collects only some of the results, at random. Maybe waiting for a fixed time is not enough, so let’s make sure we wait for all the tasks to finish. We can use sync.WaitGroup for that:

func main() {
+   var wg sync.WaitGroup
    var results []int

    for i := 0; i < 10; i++ {
+       wg.Add(1)
        go func() {
+           defer wg.Done()
            results = append(results, job(i))
        }()
    }

+   wg.Wait()
    fmt.Println(results)
}

Ok, the code is now more robust, but still—at random—it fails. Again, we are not necessarily getting all the results printed. The problems is that append is not thread-safe, so there is a race condition between different jobs writing to results.

func main() {
    var wg sync.WaitGroup
    var results []int
+   ch := make(chan int)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
-           results = append(results, job(i))
+           ch <- job(i)
        }()
    }

    wg.Wait()
+
+   for x := range ch {
+       results = append(results, x)
+   }
    fmt.Println(results)
}

This should work, right? But no, we end up with a fatal error informing us about the deadlock. What happened? The problem is that our goroutines are sending the results to the channel ch. The channel is blocking, so to send a new value, the previous value needs to be read from the channel. We are reading the values in the for loop that starts after wg.Wait(). So both processes wait for each other and we have a deadlock.

To fix this, we could run wg.Wait() in a separate gouroutine, so it does not block the loop below from running:

func main() {
    var wg sync.WaitGroup
    var results []int
    ch := make(chan int)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            ch <- job(i)
        }()
    }

+   go func() {
        wg.Wait()
+   }()

    for x := range ch {
        results = append(results, x)
    }
    fmt.Println(results)
}

But this also fails, with the all goroutines are asleep - deadlock! error. What does it mean? The problem is that wg.Wait() hapenes in a parallel process, so the rest of the program didn’t really need to care about it waiting for anything. This is also mentioned in the Go specification:

Program execution begins by initializing the program and then invoking the function main in package main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.

The solution is to explicitly close the channel only after we are done:

func main() {
    var wg sync.WaitGroup
    var results []int
    ch := make(chan int)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            ch <- job(i)
        }()
    }

    go func() {
        wg.Wait()
+       close(ch)
    }()

    for x := range ch {
        results = append(results, x)
    }
    fmt.Println(results)
}

This way, the for loop would be reading the results from the channel as long as it is open. When all the jobs are done, we close the channel and the loop finishes. We ensure that all the tasks finished and that we read all of their results. Otherwise, we would end up with a deadlock.

Xzibit: “Yo dawg I heard you had too many parallel process so I added a parallel process to monitor it”

As next steps, there’s a great talk about parallelism in Go by Rob Pike (it’s quite similar to Erlang). Accidentally, just recently there was a post by Aarav Joshi on the “nursery” pattern in Go which describes exactly the pattern mentioned above.