Go Concurrency Patterns
Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.
When to Use
- Building concurrent Go applications
- Implementing worker pools and pipelines
- Managing goroutine lifecycles and cancellation
- Debugging race conditions
- Implementing graceful shutdown
Concurrency Primitives
| Primitive | Purpose | When to Use |
|---|---|---|
goroutine | Lightweight concurrent execution | Any concurrent work |
channel | Communication between goroutines | Passing data, signaling |
select | Multiplex channel operations | Waiting on multiple channels |
sync.Mutex | Mutual exclusion | Protecting shared state |
sync.WaitGroup | Wait for goroutines to complete | Coordinating goroutine completion |
context.Context | Cancellation and deadlines | Request-scoped lifecycle management |
errgroup.Group | Concurrent tasks with errors | Parallel work that can fail |
Go Concurrency Mantra: Don't communicate by sharing memory; share memory by communicating.
Quick Start
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
results := make(chan string, 10)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case results <- fmt.Sprintf("Worker %d done", id):
}
}(i)
}
go func() { wg.Wait(); close(results) }()
for result := range results {
fmt.Println(result)
}
}
Pattern 1: Worker Pool
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Err error
}
func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
select {
case <-ctx.Done():
return
default:
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Processed: %s", job.Data),
}
}
}
}()
}
go func() { wg.Wait(); close(results) }()
return results
}
// Usage
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
jobs := make(chan Job, 100)
go func() {
for i := 0; i < 50; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}
}
close(jobs)
}()
for result := range WorkerPool(ctx, 5, jobs) {
fmt.Printf("Result: %+v\n", result)
}
}
Pattern 2: Fan-Out / Fan-In Pipeline
// Stage 1: Generate values
func generate(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}()
return out
}
// Stage 2: Transform (run multiple instances for fan-out)
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done(): return
case out <- n * n:
}
}
}()
return out
}
// Fan-in: Merge multiple channels into one
func merge(ctx context.Context, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case <-ctx.Done(): return
case out <- n:
}
}
}(ch)
}
go func() { wg.Wait(); close(out) }()
return out
}
// Usage: fan out to 3 squarers, fan in results
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c1 := square(ctx, in)
c2 := square(ctx, in)
c3 := square(ctx, in)
for result := range merge(ctx, c1, c2, c3) {
fmt.Println(result)
}
}
Pattern 3: errgroup with Cancellation
import "golang.org/x/sync/errgroup"
func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("creating request for %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
defer resp.Body.Close()
results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err // First error cancels all others via ctx
}
return results, nil
}
// With concurrency limit
func fetchWithLimit(ctx context.Context, urls []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // Max concurrent goroutines
results := make([]string, len(urls))
for i, url := range urls {
i, url := i, url
g.Go(func() error {
result, err := fetchURL(ctx, url)
if err != nil { return err }
results[i] = result
return nil
})
}
return results, g.Wait()
}
Pattern 4: Bounded Concurrency (Semaphore)
import "golang.org/x/sync/semaphore"
type RateLimitedWorker struct {
sem *semaphore.Weighted
}
func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
return &RateLimitedWorker{sem: semaphore.NewWeighted(maxConcurrent)}
}
func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
var (
wg sync.WaitGroup
mu sync.Mutex
errors []error
)
for _, task := range tasks {
if err := w.sem.Acquire(ctx, 1); err != nil {
return []error{err}
}
wg.Add(1)
go func(t func() error) {
defer wg.Done()
defer w.sem.Release(1)
if err := t(); err != nil {
mu.Lock()
errors = append(errors, err)
mu.Unlock()
}
}(task)
}
wg.Wait()
return errors
}
// Simpler alternative: channel-based semaphore
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore { return make(chan struct{}, n) }
func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }
Pattern 5: Graceful Shutdown
func main() {
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
server := NewServer()
server.Start(ctx)
sig := <-sigCh
fmt.Printf("Received signal: %v\n", sig)
cancel() // Cancel context to stop all workers
server.Shutdown(5 * time.Second)
}
type Server struct {
wg sync.WaitGroup
}
func (s *Server) Start(ctx context.Context) {
for i := 0; i < 5; i++ {
s.wg.Add(1)
go s.worker(ctx, i)
}
}
func (s *Server) worker(ctx context.Context, id int) {
defer s.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d cleaning up...\n", id)
return
case <-ticker.C:
fmt.Printf("Worker %d working...\n", id)
}
}
}
func (s *Server) Shutdown(timeout time.Duration) {
done := make(chan struct{})
go func() { s.wg.Wait(); close(done) }()
select {
case <-done:
fmt.Println("Clean shutdown completed")
case <-time.After(timeout):
fmt.Println("Shutdown timed out, forcing exit")
}
}
Pattern 6: Concurrent Map
// sync.Map: optimized for read-heavy workloads with stable keys
type Cache struct {
m sync.Map
}
func (c *Cache) Get(key string) (any, bool) { return c.m.Load(key) }
func (c *Cache) Set(key string, value any) { c.m.Store(key, value) }
func (c *Cache) GetOrSet(key string, val any) (any, bool) {
return c.m.LoadOrStore(key, val)
}
// ShardedMap: better for write-heavy workloads
type ShardedMap struct {
shards []*shard
numShards int
}
type shard struct {
sync.RWMutex
data map[string]any
}
func NewShardedMap(n int) *ShardedMap {
m := &ShardedMap{shards: make([]*shard, n), numShards: n}
for i := range m.shards {
m.shards[i] = &shard{data: make(map[string]any)}
}
return m
}
func (m *ShardedMap) getShard(key string) *shard {
h := 0
for _, c := range key {
h = 31*h + int(c)
}
return m.shards[h%m.numShards]
}
func (m *ShardedMap) Get(key string) (any, bool) {
s := m.getShard(key)
s.RLock()
defer s.RUnlock()
v, ok := s.data[key]
return v, ok
}
func (m *ShardedMap) Set(key string, value any) {
s := m.getShard(key)
s.Lock()
defer s.Unlock()
s.data[key] = value
}
When to use which:
sync.Map— Few keys, many reads, keys added once and rarely deletedShardedMap— Many keys, frequent writes, need predictable performance
Select Patterns
// Timeout
select {
case v := <-ch:
fmt.Println("Received:", v)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
// Non-blocking send/receive
select {
case ch <- 42:
fmt.Println("Sent")
default:
fmt.Println("Channel full, skipping")
}
// Priority select: check high-priority first
for {
select {
case msg := <-highPriority:
handle(msg)
default:
select {
case msg := <-highPriority:
handle(msg)
case msg := <-lowPriority:
handle(msg)
}
}
}
Race Detection
go test -race ./... # Tests with race detector
go build -race . # Build with race detector
go run -race main.go # Run with race detector
Best Practices
Do:
- Use
context.Contextfor cancellation and deadlines on every goroutine - Close channels from the sender side only
- Use
errgroupfor concurrent operations that return errors - Buffer channels when count is known upfront
- Prefer channels over mutexes for coordination
- Always run tests with
-race
Don't:
- Leak goroutines — every goroutine must have an exit path
- Close a channel from the receiver — causes panic
- Use
time.Sleepfor synchronization — use proper primitives - Ignore
ctx.Done()in long-running goroutines - Share memory without synchronization — use channels or mutexes
NEVER Do
- NEVER close a channel from the receiver — Only the sender should close; receivers panic on closed channels
- NEVER send on a closed channel — Causes panic; design so sender controls close
- NEVER use unbounded goroutine spawning — Use worker pools or semaphores for bounded concurrency
- NEVER ignore the
-raceflag in testing — Data races are silent bugs that corrupt state - NEVER pass pointers to loop variables into goroutines — Capture the value or use index closure pattern
- NEVER use
time.Sleepas synchronization — Use channels, WaitGroups, or context