5 minutes
Fan-In and Fan-Out Golang Concurrency Patterns
The Problem with Sequential Processing
Recently, I faced a common performance challenge: processing a substantial batch of independent tasks. Whether it’s fetching data from multiple URLs, performing image transformations, or executing independent computations, a standard for loop executing these tasks sequentially proved to be incredibly slow and inefficient. I needed to harness Go’s inherent concurrency capabilities to accelerate this process, but without overwhelming system resources. The goal was clear: distribute the workload among multiple goroutines (workers) and then efficiently collect all their individual results into a unified stream for subsequent processing.
Initial Attempts and Critical Pitfalls
My first naive approach was to simply spawn a new goroutine for every single item I needed to process, like go process(item). While seemingly straightforward, this quickly led to significant system instability. I observed massive CPU and memory spikes, and in network-intensive scenarios, it exhausted available network connections. This experience highlighted the crucial need for a controlled worker pool to manage concurrency (the Fan-Out pattern).
After establishing a worker pool, I then encountered the classic Go concurrency deadlock when attempting to aggregate the results (the Fan-In pattern). My main function would freeze indefinitely while trying to range over the results channel. The core issue was channel closure: if I closed the results channel too early, active workers would panic when trying to send data. If I didn’t close it at all, the main goroutine would deadlock, waiting for data that would never arrive because no one would signal the end of the stream. Properly signaling the completion of all workers and the subsequent closure of the results channel was the critical missing piece.
The Robust Solution: Fan-Out and Fan-In with sync.WaitGroup
The reliable solution to this problem leverages three fundamental Go concurrency primitives: a jobs channel, a results channel, and a sync.WaitGroup. This combination allows for controlled worker distribution and safe result aggregation.
- Fan-Out (Work Distribution): I start a fixed number of worker goroutines. Each worker continuously reads tasks from the
jobschannel. This approach prevents uncontrolled goroutine spawning and allows for resource management. - Fan-In (Result Aggregation): As workers complete their tasks, they push their results onto the
resultschannel. The main goroutine then reads from this channel to collect all processed items. - Safe Channel Closure: This is the most critical part for avoiding deadlocks. I use a
sync.WaitGroupto track the completion of all worker goroutines. Once all workers have finished, and only then, I close theresultschannel. To achieve this safely, I launch a separate, anonymous goroutine whose sole responsibility is to wait for theWaitGroupto complete its count (wg.Wait()) and then executeclose(results). This ensures the main goroutine can safelyrangeover theresultschannel until it’s fully drained and closed, preventing deadlocks.
Here is the implementation illustrating this pattern:
package main
import (
"fmt"
"sync"
"time"
)
// worker processes jobs and sends them to the results channel (Fan-out)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done() // Signal that this worker is done when the function exits
for j := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, j)
// Simulate work by sleeping for a short duration
time.Sleep(100 * time.Millisecond)
results <- j * 2 // Send the processed result
}
}
func main() {
const numJobs = 10
const numWorkers = 3
// Create channels for jobs and results
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup // WaitGroup to track worker completion
// 1. Fan-out: Start the worker goroutines
// Each worker will add itself to the WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1) // Increment the WaitGroup counter for each worker
go worker(w, jobs, results, &wg)
}
// 2. Send jobs to the workers
// These jobs will be picked up by any available worker
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // Close the jobs channel to signal workers that no more jobs are coming
// 3. The critical goroutine to close results safely
// This anonymous goroutine waits for all workers to finish
// before closing the results channel.
go func() {
wg.Wait() // Block until all workers have called wg.Done()
close(results) // Close the results channel, unblocking the main goroutine's range loop
}()
// 4. Fan-in: Collect the aggregated results
// The main goroutine ranges over the results channel until it's closed
for result := range results {
fmt.Printf("Result collected: %d\n", result)
}
fmt.Println("All processing complete.")
}
In this example, numWorkers goroutines concurrently process numJobs tasks. The jobs channel is closed after all tasks are sent, signaling workers to stop when they’ve processed everything in their queue. The wg.Wait() call ensures that the results channel is only closed after every worker has completed its execution, gracefully terminating the result collection loop in main.
Summary
Implementing the Fan-Out and Fan-In concurrency patterns in Go with a sync.WaitGroup provides a robust, efficient, and deadlock-free way to distribute and aggregate work. By controlling the number of worker goroutines, I avoid resource exhaustion, and by carefully managing channel closure with sync.WaitGroup, I prevent common deadlock scenarios. This pattern is invaluable for optimizing batch processing tasks in concurrent Go applications.
golang concurrency fan-in fan-out worker pool channels sync.waitgroup
868 Words
2026-04-22 10:19 (Last updated: 2026-04-22 10:19)