· 9 min read
Go’s concurrency and synchronization patterns
with more advanced examples and techniques.

14. Worker Pool Pattern
A worker pool limits the number of concurrent goroutines and processes tasks in parallel.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job)
time.Sleep(time.Second) // Simulate work
results <- job * 2
fmt.Printf("Worker %d finished job %d\n", id, job)
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// Start workers
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
worker(workerID, jobs, results)
}(i)
}
// Send jobs
for i := 1; i <= numJobs; i++ {
jobs <- i
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
// Print results
for result := range results {
fmt.Println("Result:", result)
}
}
15. Fan-Out, Fan-In Pattern
Fan-out distributes work across multiple goroutines, and fan-in collects results from multiple channels.
package main
import (
"fmt"
"sync"
"time"
)
func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
time.Sleep(500 * time.Millisecond) // Simulate work
}
close(out)
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Fan-in
for _, ch := range channels {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(ch)
}
// Close out channel when all goroutines are done
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := producer(1, 2, 3, 4, 5)
// Fan-out
ch1 := square(in)
ch2 := square(in)
ch3 := square(in)
// Fan-in and print results
for result := range merge(ch1, ch2, ch3) {
fmt.Println("Result:", result)
}
}
16. Rate Limiting
Use a ticker to limit the rate of operations.
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond) // Limit to 5 operations per second
for req := range requests {
<-limiter // Wait for the next tick
fmt.Println("Request", req, "processed at", time.Now())
}
}
17. Dynamic Rate Limiting
Use a buffered channel to allow bursts of operations.
package main
import (
"fmt"
"time"
)
func main() {
burstyLimiter := make(chan time.Time, 3)
// Fill the channel to allow bursts
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
// Refill the channel every 200ms
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
for req := range requests {
<-burstyLimiter // Wait for the next token
fmt.Println("Request", req, "processed at", time.Now())
}
}
18. Pipeline Pattern
Break down a task into stages and process them concurrently.
package main
import (
"fmt"
"time"
)
func stage1(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * 2
time.Sleep(500 * time.Millisecond) // Simulate work
}
close(out)
}()
return out
}
func stage2(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n + 1
time.Sleep(500 * time.Millisecond) // Simulate work
}
close(out)
}()
return out
}
func main() {
in := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
in <- i
}
close(in)
}()
// Pipeline: stage1 -> stage2
result := stage2(stage1(in))
for res := range result {
fmt.Println("Result:", res)
}
}
19. Timeout with Select
Use select
to implement timeouts for operations.
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-time.After(1 * time.Second):
fmt.Println("Timeout!")
}
}
20. Cancellation with Context
Use context
to cancel long-running operations.
package main
import (
"context"
"fmt"
"time"
)
func operation(ctx context.Context, duration time.Duration) {
select {
case <-time.After(duration):
fmt.Println("Operation completed")
case <-ctx.Done():
fmt.Println("Operation canceled:", ctx.Err())
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go operation(ctx, 2*time.Second)
time.Sleep(2 * time.Second) // Wait to see the result
}
Summary of Advanced Patterns:
- Worker Pool: Limit concurrent goroutines.
- Fan-Out, Fan-In: Distribute and collect work.
- Rate Limiting: Control the rate of operations.
- Pipeline: Break tasks into stages.
- Timeout with Select: Handle timeouts gracefully.
- Cancellation with Context: Cancel long-running operations.
These patterns will help you build scalable, efficient, and robust concurrent applications in Go.
21. Bounded Parallelism
Limit the number of concurrent tasks while processing a large dataset.
package main
import (
"fmt"
"sync"
"time"
)
func processTask(taskID int) {
fmt.Printf("Processing task %d\n", taskID)
time.Sleep(500 * time.Millisecond) // Simulate work
fmt.Printf("Task %d completed\n", taskID)
}
func main() {
const numTasks = 10
const maxConcurrency = 3
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrency) // Semaphore for bounded parallelism
for i := 1; i <= numTasks; i++ {
wg.Add(1)
go func(taskID int) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
processTask(taskID)
<-sem // Release semaphore
}(i)
}
wg.Wait()
fmt.Println("All tasks completed")
}
22. Debouncing
Execute a function only after a certain amount of inactivity.
package main
import (
"fmt"
"sync"
"time"
)
func debounce(fn func(), delay time.Duration) func() {
var mu sync.Mutex
var timer *time.Timer
return func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(delay, fn)
}
}
func main() {
debounced := debounce(func() {
fmt.Println("Executed after 1 second of inactivity")
}, 1*time.Second)
for i := 0; i < 5; i++ {
debounced()
time.Sleep(500 * time.Millisecond)
}
time.Sleep(2 * time.Second) // Wait to see the debounced execution
}
23. Throttling
Limit the rate at which a function can be executed.
package main
import (
"fmt"
"time"
)
func throttle(fn func(), delay time.Duration) func() {
var lastExec time.Time
return func() {
if time.Since(lastExec) < delay {
return
}
lastExec = time.Now()
fn()
}
}
func main() {
throttled := throttle(func() {
fmt.Println("Executed at most once per second")
}, 1*time.Second)
for i := 0; i < 5; i++ {
throttled()
time.Sleep(500 * time.Millisecond)
}
}
24. Load Balancing
Distribute tasks across multiple workers dynamically.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(500 * time.Millisecond) // Simulate work
fmt.Printf("Worker %d completed task %d\n", id, task)
}
}
func main() {
const numWorkers = 3
const numTasks = 10
tasks := make(chan int, numTasks)
var wg sync.WaitGroup
// Start workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
// Send tasks
for i := 1; i <= numTasks; i++ {
tasks <- i
}
close(tasks)
wg.Wait()
fmt.Println("All tasks completed")
}
25. Graceful Shutdown
Shut down a server or application gracefully.
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
server := &http.Server{Addr: ":8080"}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, world!")
})
// Start server in a goroutine
go func() {
if err := server.ListenAndServe(); err != http.ErrServerClosed {
fmt.Println("Server error:", err)
}
}()
// Wait for interrupt signal
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
// Shutdown server gracefully
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
fmt.Println("Shutdown error:", err)
}
fmt.Println("Server stopped gracefully")
}
26. Dynamic Goroutine Pool
Adjust the number of workers dynamically based on workload.
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d processing task %d\n", id, task)
time.Sleep(500 * time.Millisecond) // Simulate work
fmt.Printf("Worker %d completed task %d\n", id, task)
}
}
func main() {
const maxWorkers = 5
const numTasks = 10
tasks := make(chan int, numTasks)
var wg sync.WaitGroup
// Start initial workers
for i := 1; i <= maxWorkers; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
// Add more workers dynamically
go func() {
time.Sleep(2 * time.Second)
for i := maxWorkers + 1; i <= maxWorkers+2; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
}()
// Send tasks
for i := 1; i <= numTasks; i++ {
tasks <- i
}
close(tasks)
wg.Wait()
fmt.Println("All tasks completed")
}
27. Priority Queue with Channels
Implement a priority queue using channels.
package main
import (
"fmt"
"time"
)
type Task struct {
Priority int
Message string
}
func worker(tasks <-chan Task) {
for task := range tasks {
fmt.Printf("Processing task: %s (Priority: %d)\n", task.Message, task.Priority)
time.Sleep(500 * time.Millisecond) // Simulate work
}
}
func main() {
highPriority := make(chan Task, 10)
lowPriority := make(chan Task, 10)
// Start workers
go worker(highPriority)
go worker(lowPriority)
// Send tasks
go func() {
for i := 0; i < 5; i++ {
highPriority <- Task{Priority: 1, Message: fmt.Sprintf("High-priority task %d", i)}
lowPriority <- Task{Priority: 2, Message: fmt.Sprintf("Low-priority task %d", i)}
}
}()
time.Sleep(3 * time.Second) // Wait for tasks to be processed
}
28. Event Bus
Implement a simple event bus for pub/sub communication.
package main
import (
"fmt"
"sync"
)
type EventBus struct {
subscribers map[string][]chan string
mu sync.RWMutex
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]chan string),
}
}
func (eb *EventBus) Subscribe(topic string) <-chan string {
eb.mu.Lock()
defer eb.mu.Unlock()
ch := make(chan string, 1)
eb.subscribers[topic] = append(eb.subscribers[topic], ch)
return ch
}
func (eb *EventBus) Publish(topic string, message string) {
eb.mu.RLock()
defer eb.mu.RUnlock()
for _, ch := range eb.subscribers[topic] {
go func(ch chan string) {
ch <- message
}(ch)
}
}
func main() {
eb := NewEventBus()
// Subscribe to topics
sub1 := eb.Subscribe("news")
sub2 := eb.Subscribe("sports")
// Publish messages
go func() {
eb.Publish("news", "Breaking news: Go is awesome!")
eb.Publish("sports", "Go team wins the championship!")
}()
// Receive messages
fmt.Println("News:", <-sub1)
fmt.Println("Sports:", <-sub2)
}
Summary of Advanced Techniques:
- Bounded Parallelism: Limit concurrency with a semaphore.
- Debouncing: Execute after inactivity.
- Throttling: Limit execution rate.
- Load Balancing: Distribute tasks dynamically.
- Graceful Shutdown: Shut down servers gracefully.
- Dynamic Goroutine Pool: Adjust workers based on workload.
- Priority Queue: Prioritize tasks with channels.
- Event Bus: Pub/sub communication.
These patterns and techniques will help you tackle complex concurrency challenges in Go!