Go Concurrency
Master Go's concurrency model using goroutines, channels, and synchronization primitives for building concurrent applications.
Goroutines
Creating goroutines:
package main
import ( "fmt" "time" )
func sayHello() { fmt.Println("Hello from goroutine") }
func main() { // Launch goroutine go sayHello()
// Anonymous function goroutine
go func() {
fmt.Println("Hello from anonymous goroutine")
}()
// Give goroutines time to execute
time.Sleep(time.Second)
}
Goroutines with parameters:
func printNumber(n int) { fmt.Println(n) }
func main() { for i := 0; i < 10; i++ { go printNumber(i) } time.Sleep(time.Second) }
Channels
Basic channel operations:
func main() { // Create unbuffered channel ch := make(chan int)
// Send in goroutine (non-blocking)
go func() {
ch <- 42
}()
// Receive (blocks until value available)
value := <-ch
fmt.Println(value) // 42
}
Buffered channels:
func main() { // Buffered channel with capacity 2 ch := make(chan string, 2)
// Can send up to 2 values without blocking
ch <- "first"
ch <- "second"
fmt.Println(<-ch) // first
fmt.Println(<-ch) // second
}
Channel direction:
// Send-only channel func send(ch chan<- int) { ch <- 42 }
// Receive-only channel func receive(ch <-chan int) int { return <-ch }
func main() { ch := make(chan int)
go send(ch)
value := receive(ch)
fmt.Println(value)
}
Closing channels:
func main() { ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch) // Close channel
// Receive until channel is closed
for value := range ch {
fmt.Println(value)
}
// Check if channel is closed
value, ok := <-ch
fmt.Printf("Value: %d, Open: %v\n", value, ok) // Value: 0, Open: false
}
Select Statement
Multiplexing channels:
func main() { ch1 := make(chan string) ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()
// Wait for both
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Select with default:
func main() { ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println(val)
default:
fmt.Println("No value ready") // Executed
}
}
Select with timeout:
func main() { ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(time.Second):
fmt.Println("Timeout") // Executed after 1 second
}
}
Worker Pools
Implementing worker pool pattern:
func worker(id int, jobs <-chan int, results chan<- int) { for job := range jobs { fmt.Printf("Worker %d processing job %d\n", id, job) time.Sleep(time.Second) results <- job * 2 } }
func main() { jobs := make(chan int, 100) results := make(chan int, 100)
// Start 3 workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Send 5 jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Collect results
for a := 1; a <= 5; a++ {
<-results
}
}
sync.WaitGroup
Waiting for goroutines to complete:
import ( "fmt" "sync" "time" )
func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // Decrement counter when done
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() { var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // Increment counter
go worker(i, &wg)
}
wg.Wait() // Wait for all to complete
fmt.Println("All workers done")
}
sync.Mutex
Protecting shared state:
import ( "fmt" "sync" )
type Counter struct { mu sync.Mutex value int }
func (c *Counter) Increment() { c.mu.Lock() c.value++ c.mu.Unlock() }
func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value }
func main() { var wg sync.WaitGroup counter := Counter{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println(counter.Value()) // 1000
}
sync.RWMutex
Read-write locks:
type Cache struct { mu sync.RWMutex items map[string]string }
func (c *Cache) Get(key string) (string, bool) { c.mu.RLock() // Read lock defer c.mu.RUnlock() val, ok := c.items[key] return val, ok }
func (c *Cache) Set(key, value string) { c.mu.Lock() // Write lock defer c.mu.Unlock() c.items[key] = value }
func main() { cache := Cache{items: make(map[string]string)}
// Multiple readers can access simultaneously
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cache.Get("key")
}()
}
wg.Wait()
}
sync.Once
Execute once initialization:
var ( instance *Database once sync.Once )
type Database struct { conn string }
func GetDatabase() *Database { once.Do(func() { fmt.Println("Initializing database") instance = &Database{conn: "connected"} }) return instance }
func main() { var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDatabase() // Only initializes once
fmt.Println(db.conn)
}()
}
wg.Wait()
}
Context Package
Using context for cancellation:
import ( "context" "fmt" "time" )
func worker(ctx context.Context, id int) { for { select { case <-ctx.Done(): fmt.Printf("Worker %d cancelled\n", id) return default: fmt.Printf("Worker %d working\n", id) time.Sleep(500 * time.Millisecond) } } }
func main() { ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(2 * time.Second)
cancel() // Cancel all workers
time.Sleep(time.Second)
}
Context with timeout:
func slowOperation(ctx context.Context) error { select { case <-time.After(3 * time.Second): return nil case <-ctx.Done(): return ctx.Err() } }
func main() { ctx, cancel := context.WithTimeout( context.Background(), 2*time.Second, ) defer cancel()
err := slowOperation(ctx)
if err != nil {
fmt.Println("Operation timed out:", err)
}
}
Context with values:
func processRequest(ctx context.Context) { userID := ctx.Value("userID") fmt.Println("Processing for user:", userID) }
func main() { ctx := context.WithValue( context.Background(), "userID", "user123", ) processRequest(ctx) }
Error Handling in Concurrent Code
Using errgroup:
import ( "context" "fmt" "golang.org/x/sync/errgroup" "time" )
func fetchUser(ctx context.Context, id int) error { time.Sleep(time.Second) if id == 3 { return fmt.Errorf("user %d not found", id) } fmt.Printf("Fetched user %d\n", id) return nil }
func main() { g, ctx := errgroup.WithContext(context.Background())
userIDs := []int{1, 2, 3, 4, 5}
for _, id := range userIDs {
id := id // Capture loop variable
g.Go(func() error {
return fetchUser(ctx, id)
})
}
// Wait for all goroutines
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}
Fan-Out Fan-In Pattern
Distributing work and collecting results:
func generator(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
func square(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
func merge(cs ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() { in := generator(1, 2, 3, 4, 5)
// Fan out
c1 := square(in)
c2 := square(in)
// Fan in
for n := range merge(c1, c2) {
fmt.Println(n)
}
}
When to Use This Skill
Use go-concurrency when you need to:
-
Execute multiple operations concurrently
-
Build concurrent servers or workers
-
Implement producer-consumer patterns
-
Process data streams concurrently
-
Handle multiple I/O operations simultaneously
-
Implement timeout and cancellation
-
Coordinate multiple goroutines
-
Build fan-out/fan-in pipelines
-
Share state safely between goroutines
-
Implement rate limiting or throttling
Best Practices
-
Use channels for communication, mutexes for state
-
Close channels from sender side only
-
Always use WaitGroup to wait for goroutines
-
Pass contexts for cancellation and deadlines
-
Use buffered channels judiciously
-
Protect shared state with mutexes
-
Avoid goroutine leaks with proper cleanup
-
Use select with default for non-blocking ops
-
Prefer sync.Once for initialization
-
Document goroutine ownership and lifecycle
Common Pitfalls
-
Goroutine leaks (forgetting to exit)
-
Race conditions from unprotected shared state
-
Deadlocks from improper channel usage
-
Sending on closed channels (panics)
-
Not checking channel close status
-
Overusing mutexes instead of channels
-
Creating too many goroutines
-
Forgetting to call WaitGroup.Done()
-
Passing loop variables to goroutines
-
Not handling context cancellation
Resources
-
Go Concurrency Patterns
-
Effective Go - Concurrency
-
Go by Example - Goroutines
-
errgroup Package
-
Context Package