Go Concurrency Skill
This skill provides expert guidance on Go's concurrency primitives and patterns, covering goroutines, channels, synchronization, and best practices for building concurrent systems.
When to Use
Activate this skill when:
-
Implementing concurrent/parallel processing
-
Working with goroutines and channels
-
Using synchronization primitives (mutexes, wait groups, etc.)
-
Debugging race conditions
-
Optimizing concurrent performance
-
Implementing worker pools or pipelines
-
Handling context cancellation
Goroutine Fundamentals
Basic Goroutines
// Simple goroutine go func() { fmt.Println("Hello from goroutine") }()
// Goroutine with parameters go func(msg string) { fmt.Println(msg) }("Hello")
// Goroutine with closure message := "Hello" go func() { fmt.Println(message) // Captures message }()
Common Pitfalls
// ❌ BAD: Loop variable capture for i := 0; i < 5; i++ { go func() { fmt.Println(i) // All goroutines may print 5 }() }
// ✅ GOOD: Pass as parameter for i := 0; i < 5; i++ { go func(n int) { fmt.Println(n) // Each prints correct value }(i) }
// ✅ GOOD: Create local copy for i := 0; i < 5; i++ { i := i // Create new variable go func() { fmt.Println(i) }() }
Channel Patterns
Channel Types
// Unbuffered channel (synchronous) ch := make(chan int)
// Buffered channel (asynchronous up to buffer size) ch := make(chan int, 10)
// Send-only channel func send(ch chan<- int) { ch <- 42 }
// Receive-only channel func receive(ch <-chan int) { value := <-ch }
// Bidirectional channel ch := make(chan int)
Channel Operations
// Send ch <- value
// Receive value := <-ch
// Receive with ok check value, ok := <-ch if !ok { // Channel closed }
// Close channel close(ch)
// Range over channel for value := range ch { fmt.Println(value) }
Select Statement
// Wait for first available operation select { case msg1 := <-ch1: fmt.Println("Received from ch1:", msg1) case msg2 := <-ch2: fmt.Println("Received from ch2:", msg2) case ch3 <- value: fmt.Println("Sent to ch3") default: fmt.Println("No channels ready") }
// Timeout pattern select { case result := <-ch: return result, nil case <-time.After(5 * time.Second): return nil, errors.New("timeout") }
// Context cancellation select { case result := <-ch: return result, nil case <-ctx.Done(): return nil, ctx.Err() }
Synchronization Primitives
Mutex
type SafeCounter struct { mu sync.Mutex count int }
func (c *SafeCounter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.count++ }
func (c *SafeCounter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.count }
RWMutex
type Cache struct { mu sync.RWMutex items map[string]interface{} }
func (c *Cache) Get(key string) (interface{}, bool) { c.mu.RLock() // Multiple readers allowed defer c.mu.RUnlock() value, ok := c.items[key] return value, ok }
func (c *Cache) Set(key string, value interface{}) { c.mu.Lock() // Exclusive write access defer c.mu.Unlock() c.items[key] = value }
WaitGroup
func processItems(items []Item) { var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
go func(item Item) {
defer wg.Done()
process(item)
}(item)
}
wg.Wait() // Wait for all goroutines
}
Once
type Database struct { instance *sql.DB once sync.Once }
func (d *Database) GetConnection() *sql.DB { d.once.Do(func() { d.instance, _ = sql.Open("postgres", "connection-string") }) return d.instance }
Concurrency Patterns
Worker Pool
type WorkerPool struct { workerCount int jobs chan Job results chan Result wg sync.WaitGroup }
type Job struct { ID int Data interface{} }
type Result struct { JobID int Value interface{} Error error }
func NewWorkerPool(workerCount int) *WorkerPool { return &WorkerPool{ workerCount: workerCount, jobs: make(chan Job, 100), results: make(chan Result, 100), } }
func (p *WorkerPool) Start(ctx context.Context) { for i := 0; i < p.workerCount; i++ { p.wg.Add(1) go p.worker(ctx) } }
func (p *WorkerPool) worker(ctx context.Context) { defer p.wg.Done()
for {
select {
case job, ok := <-p.jobs:
if !ok {
return
}
result := processJob(job)
select {
case p.results <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}
func (p *WorkerPool) Submit(job Job) { p.jobs <- job }
func (p *WorkerPool) Results() <-chan Result { return p.results }
func (p *WorkerPool) Close() { close(p.jobs) p.wg.Wait() close(p.results) }
// Usage ctx := context.Background() pool := NewWorkerPool(10) pool.Start(ctx)
for i := 0; i < 100; i++ { pool.Submit(Job{ID: i, Data: fmt.Sprintf("job-%d", i)}) }
go func() { for result := range pool.Results() { if result.Error != nil { log.Printf("Job %d failed: %v", result.JobID, result.Error) } else { log.Printf("Job %d completed: %v", result.JobID, result.Value) } } }()
pool.Close()
Pipeline Pattern
// Generator stage func generator(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-ctx.Done(): return } } }() return out }
// Processing stage func square(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-ctx.Done(): return } } }() return out }
// Another processing stage func double(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * 2: case <-ctx.Done(): return } } }() return out }
// Usage - compose pipeline ctx := context.Background() numbers := generator(ctx, 1, 2, 3, 4, 5) squared := square(ctx, numbers) doubled := double(ctx, squared)
for result := range doubled { fmt.Println(result) }
Fan-Out/Fan-In
// Fan-out: distribute work to multiple goroutines func fanOut(ctx context.Context, input <-chan int, workers int) []<-chan int { channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = worker(ctx, input)
}
return channels
}
func worker(ctx context.Context, input <-chan int) <-chan int { output := make(chan int) go func() { defer close(output) for n := range input { select { case output <- expensiveOperation(n): case <-ctx.Done(): return } } }() return output }
// Fan-in: merge multiple channels into one func fanIn(ctx context.Context, channels ...<-chan int) <-chan int { var wg sync.WaitGroup output := make(chan int)
multiplex := func(ch <-chan int) {
defer wg.Done()
for n := range ch {
select {
case output <- n:
case <-ctx.Done():
return
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go multiplex(ch)
}
go func() {
wg.Wait()
close(output)
}()
return output
}
// Usage ctx := context.Background() input := generator(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Fan-out to 3 workers workers := fanOut(ctx, input, 3)
// Fan-in results results := fanIn(ctx, workers...)
for result := range results { fmt.Println(result) }
Semaphore Pattern
type Semaphore struct { sem chan struct{} }
func NewSemaphore(maxConcurrency int) *Semaphore { return &Semaphore{ sem: make(chan struct{}, maxConcurrency), } }
func (s *Semaphore) Acquire() { s.sem <- struct{}{} }
func (s *Semaphore) Release() { <-s.sem }
// Usage sem := NewSemaphore(5) // Max 5 concurrent operations
for _, item := range items { sem.Acquire() go func(item Item) { defer sem.Release() process(item) }(item) }
Rate Limiting
// Token bucket rate limiter type RateLimiter struct { ticker *time.Ticker tokens chan struct{} }
func NewRateLimiter(rate time.Duration, burst int) *RateLimiter { rl := &RateLimiter{ ticker: time.NewTicker(rate), tokens: make(chan struct{}, burst), }
// Fill bucket initially
for i := 0; i < burst; i++ {
rl.tokens <- struct{}{}
}
// Refill tokens
go func() {
for range rl.ticker.C {
select {
case rl.tokens <- struct{}{}:
default:
}
}
}()
return rl
}
func (rl *RateLimiter) Wait(ctx context.Context) error { select { case <-rl.tokens: return nil case <-ctx.Done(): return ctx.Err() } }
func (rl *RateLimiter) Stop() { rl.ticker.Stop() }
// Usage limiter := NewRateLimiter(time.Second/10, 5) // 10 requests per second, burst of 5 defer limiter.Stop()
for _, request := range requests { if err := limiter.Wait(ctx); err != nil { log.Printf("Rate limit error: %v", err) continue } processRequest(request) }
Error Handling in Concurrent Code
errgroup Package
import "golang.org/x/sync/errgroup"
func fetchURLs(ctx context.Context, urls []string) error { g, ctx := errgroup.WithContext(ctx)
for _, url := range urls {
url := url // Capture for goroutine
g.Go(func() error {
return fetchURL(ctx, url)
})
}
// Wait for all goroutines, return first error
return g.Wait()
}
// With limited concurrency func fetchURLsLimited(ctx context.Context, urls []string) error { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // Max 10 concurrent
for _, url := range urls {
url := url
g.Go(func() error {
return fetchURL(ctx, url)
})
}
return g.Wait()
}
Best Practices
-
Always close channels from sender side
-
Use context for cancellation and timeouts
-
Avoid goroutine leaks - ensure they can exit
-
Use buffered channels to avoid blocking
-
Prefer sync.RWMutex for read-heavy workloads
-
Don't use defer in hot loops
-
Test with race detector: go test -race
-
Use errgroup for error propagation
-
Limit concurrent operations with worker pools
-
Profile before optimizing
Race Condition Detection
Run tests with race detector
go test -race ./...
Run program with race detector
go run -race main.go
Build with race detector
go build -race
Common Patterns to Avoid
// ❌ BAD: Unbounded goroutine creation for _, item := range millionItems { go process(item) // May create millions of goroutines }
// ✅ GOOD: Use worker pool pool := NewWorkerPool(100) for _, item := range millionItems { pool.Submit(item) }
// ❌ BAD: Goroutine leak func leak() <-chan int { ch := make(chan int) go func() { ch <- expensiveComputation() // If receiver never reads, goroutine leaks }() return ch }
// ✅ GOOD: Use context for cancellation func noLeak(ctx context.Context) <-chan int { ch := make(chan int) go func() { defer close(ch) result := expensiveComputation() select { case ch <- result: case <-ctx.Done(): } }() return ch }
Resources
Additional examples and patterns are available in:
-
assets/examples/
-
Complete concurrency examples
-
assets/patterns/
-
Common concurrency patterns
-
references/
-
Links to Go concurrency resources and papers