The Problem with Sequential Processing

Recently, I faced a common performance challenge: processing a substantial batch of independent tasks. Whether it’s fetching data from multiple URLs, performing image transformations, or executing independent computations, a standard for loop executing these tasks sequentially proved to be incredibly slow and inefficient. I needed to harness Go’s inherent concurrency capabilities to accelerate this process, but without overwhelming system resources. The goal was clear: distribute the workload among multiple goroutines (workers) and then efficiently collect all their individual results into a unified stream for subsequent processing.

Initial Attempts and Critical Pitfalls

My first naive approach was to simply spawn a new goroutine for every single item I needed to process, like go process(item). While seemingly straightforward, this quickly led to significant system instability. I observed massive CPU and memory spikes, and in network-intensive scenarios, it exhausted available network connections. This experience highlighted the crucial need for a controlled worker pool to manage concurrency (the Fan-Out pattern).

After establishing a worker pool, I then encountered the classic Go concurrency deadlock when attempting to aggregate the results (the Fan-In pattern). My main function would freeze indefinitely while trying to range over the results channel. The core issue was channel closure: if I closed the results channel too early, active workers would panic when trying to send data. If I didn’t close it at all, the main goroutine would deadlock, waiting for data that would never arrive because no one would signal the end of the stream. Properly signaling the completion of all workers and the subsequent closure of the results channel was the critical missing piece.

The Robust Solution: Fan-Out and Fan-In with sync.WaitGroup

The reliable solution to this problem leverages three fundamental Go concurrency primitives: a jobs channel, a results channel, and a sync.WaitGroup. This combination allows for controlled worker distribution and safe result aggregation.

  1. Fan-Out (Work Distribution): I start a fixed number of worker goroutines. Each worker continuously reads tasks from the jobs channel. This approach prevents uncontrolled goroutine spawning and allows for resource management.
  2. Fan-In (Result Aggregation): As workers complete their tasks, they push their results onto the results channel. The main goroutine then reads from this channel to collect all processed items.
  3. Safe Channel Closure: This is the most critical part for avoiding deadlocks. I use a sync.WaitGroup to track the completion of all worker goroutines. Once all workers have finished, and only then, I close the results channel. To achieve this safely, I launch a separate, anonymous goroutine whose sole responsibility is to wait for the WaitGroup to complete its count (wg.Wait()) and then execute close(results). This ensures the main goroutine can safely range over the results channel until it’s fully drained and closed, preventing deadlocks.

Here is the implementation illustrating this pattern:

package main

import (
	"fmt"
	"sync"
	"time"
)

// worker processes jobs and sends them to the results channel (Fan-out)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done() // Signal that this worker is done when the function exits
	for j := range jobs {
		fmt.Printf("Worker %d processing job %d\n", id, j)
		// Simulate work by sleeping for a short duration
		time.Sleep(100 * time.Millisecond) 
		results <- j * 2 // Send the processed result
	}
}

func main() {
	const numJobs = 10
	const numWorkers = 3

	// Create channels for jobs and results
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
	
	var wg sync.WaitGroup // WaitGroup to track worker completion

	// 1. Fan-out: Start the worker goroutines
	// Each worker will add itself to the WaitGroup
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1) // Increment the WaitGroup counter for each worker
		go worker(w, jobs, results, &wg)
	}

	// 2. Send jobs to the workers
	// These jobs will be picked up by any available worker
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs) // Close the jobs channel to signal workers that no more jobs are coming

	// 3. The critical goroutine to close results safely
	// This anonymous goroutine waits for all workers to finish
	// before closing the results channel.
	go func() {
		wg.Wait()       // Block until all workers have called wg.Done()
		close(results) // Close the results channel, unblocking the main goroutine's range loop
	}()

	// 4. Fan-in: Collect the aggregated results
	// The main goroutine ranges over the results channel until it's closed
	for result := range results {
		fmt.Printf("Result collected: %d\n", result)
	}
	
	fmt.Println("All processing complete.")
}

In this example, numWorkers goroutines concurrently process numJobs tasks. The jobs channel is closed after all tasks are sent, signaling workers to stop when they’ve processed everything in their queue. The wg.Wait() call ensures that the results channel is only closed after every worker has completed its execution, gracefully terminating the result collection loop in main.

Summary

Implementing the Fan-Out and Fan-In concurrency patterns in Go with a sync.WaitGroup provides a robust, efficient, and deadlock-free way to distribute and aggregate work. By controlling the number of worker goroutines, I avoid resource exhaustion, and by carefully managing channel closure with sync.WaitGroup, I prevent common deadlock scenarios. This pattern is invaluable for optimizing batch processing tasks in concurrent Go applications.