· 9 min read

Go’s concurrency and synchronization patterns

with more advanced examples and techniques.

with more advanced examples and techniques.

14. Worker Pool Pattern

A worker pool limits the number of concurrent goroutines and processes tasks in parallel.

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	for job := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, job)
		time.Sleep(time.Second) // Simulate work
		results <- job * 2
		fmt.Printf("Worker %d finished job %d\n", id, job)
	}
}

func main() {
	const numJobs = 10
	const numWorkers = 3

	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	// Start workers
	var wg sync.WaitGroup
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			worker(workerID, jobs, results)
		}(i)
	}

	// Send jobs
	for i := 1; i <= numJobs; i++ {
		jobs <- i
	}
	close(jobs)

	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()

	// Print results
	for result := range results {
		fmt.Println("Result:", result)
	}
}

15. Fan-Out, Fan-In Pattern

Fan-out distributes work across multiple goroutines, and fan-in collects results from multiple channels.

package main

import (
	"fmt"
	"sync"
	"time"
)

func producer(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
			time.Sleep(500 * time.Millisecond) // Simulate work
		}
		close(out)
	}()
	return out
}

func merge(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// Fan-in
	for _, ch := range channels {
		wg.Add(1)
		go func(ch <-chan int) {
			defer wg.Done()
			for n := range ch {
				out <- n
			}
		}(ch)
	}

	// Close out channel when all goroutines are done
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	in := producer(1, 2, 3, 4, 5)

	// Fan-out
	ch1 := square(in)
	ch2 := square(in)
	ch3 := square(in)

	// Fan-in and print results
	for result := range merge(ch1, ch2, ch3) {
		fmt.Println("Result:", result)
	}
}

16. Rate Limiting

Use a ticker to limit the rate of operations.

package main

import (
	"fmt"
	"time"
)

func main() {
	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)

	limiter := time.Tick(200 * time.Millisecond) // Limit to 5 operations per second

	for req := range requests {
		<-limiter // Wait for the next tick
		fmt.Println("Request", req, "processed at", time.Now())
	}
}

17. Dynamic Rate Limiting

Use a buffered channel to allow bursts of operations.

package main

import (
	"fmt"
	"time"
)

func main() {
	burstyLimiter := make(chan time.Time, 3)

	// Fill the channel to allow bursts
	for i := 0; i < 3; i++ {
		burstyLimiter <- time.Now()
	}

	// Refill the channel every 200ms
	go func() {
		for t := range time.Tick(200 * time.Millisecond) {
			burstyLimiter <- t
		}
	}()

	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)

	for req := range requests {
		<-burstyLimiter // Wait for the next token
		fmt.Println("Request", req, "processed at", time.Now())
	}
}

18. Pipeline Pattern

Break down a task into stages and process them concurrently.

package main

import (
	"fmt"
	"time"
)

func stage1(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * 2
			time.Sleep(500 * time.Millisecond) // Simulate work
		}
		close(out)
	}()
	return out
}

func stage2(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n + 1
			time.Sleep(500 * time.Millisecond) // Simulate work
		}
		close(out)
	}()
	return out
}

func main() {
	in := make(chan int)
	go func() {
		for i := 1; i <= 5; i++ {
			in <- i
		}
		close(in)
	}()

	// Pipeline: stage1 -> stage2
	result := stage2(stage1(in))

	for res := range result {
		fmt.Println("Result:", res)
	}
}

19. Timeout with Select

Use select to implement timeouts for operations.

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)

	go func() {
		time.Sleep(2 * time.Second)
		ch <- "result"
	}()

	select {
	case res := <-ch:
		fmt.Println(res)
	case <-time.After(1 * time.Second):
		fmt.Println("Timeout!")
	}
}

20. Cancellation with Context

Use context to cancel long-running operations.

package main

import (
	"context"
	"fmt"
	"time"
)

func operation(ctx context.Context, duration time.Duration) {
	select {
	case <-time.After(duration):
		fmt.Println("Operation completed")
	case <-ctx.Done():
		fmt.Println("Operation canceled:", ctx.Err())
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go operation(ctx, 2*time.Second)

	time.Sleep(2 * time.Second) // Wait to see the result
}

Summary of Advanced Patterns:

  • Worker Pool: Limit concurrent goroutines.
  • Fan-Out, Fan-In: Distribute and collect work.
  • Rate Limiting: Control the rate of operations.
  • Pipeline: Break tasks into stages.
  • Timeout with Select: Handle timeouts gracefully.
  • Cancellation with Context: Cancel long-running operations.

These patterns will help you build scalable, efficient, and robust concurrent applications in Go.


21. Bounded Parallelism

Limit the number of concurrent tasks while processing a large dataset.

package main

import (
	"fmt"
	"sync"
	"time"
)

func processTask(taskID int) {
	fmt.Printf("Processing task %d\n", taskID)
	time.Sleep(500 * time.Millisecond) // Simulate work
	fmt.Printf("Task %d completed\n", taskID)
}

func main() {
	const numTasks = 10
	const maxConcurrency = 3

	var wg sync.WaitGroup
	sem := make(chan struct{}, maxConcurrency) // Semaphore for bounded parallelism

	for i := 1; i <= numTasks; i++ {
		wg.Add(1)
		go func(taskID int) {
			defer wg.Done()
			sem <- struct{}{} // Acquire semaphore
			processTask(taskID)
			<-sem // Release semaphore
		}(i)
	}

	wg.Wait()
	fmt.Println("All tasks completed")
}

22. Debouncing

Execute a function only after a certain amount of inactivity.

package main

import (
	"fmt"
	"sync"
	"time"
)

func debounce(fn func(), delay time.Duration) func() {
	var mu sync.Mutex
	var timer *time.Timer

	return func() {
		mu.Lock()
		defer mu.Unlock()

		if timer != nil {
			timer.Stop()
		}

		timer = time.AfterFunc(delay, fn)
	}
}

func main() {
	debounced := debounce(func() {
		fmt.Println("Executed after 1 second of inactivity")
	}, 1*time.Second)

	for i := 0; i < 5; i++ {
		debounced()
		time.Sleep(500 * time.Millisecond)
	}

	time.Sleep(2 * time.Second) // Wait to see the debounced execution
}

23. Throttling

Limit the rate at which a function can be executed.

package main

import (
	"fmt"
	"time"
)

func throttle(fn func(), delay time.Duration) func() {
	var lastExec time.Time

	return func() {
		if time.Since(lastExec) < delay {
			return
		}
		lastExec = time.Now()
		fn()
	}
}

func main() {
	throttled := throttle(func() {
		fmt.Println("Executed at most once per second")
	}, 1*time.Second)

	for i := 0; i < 5; i++ {
		throttled()
		time.Sleep(500 * time.Millisecond)
	}
}

24. Load Balancing

Distribute tasks across multiple workers dynamically.

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range tasks {
		fmt.Printf("Worker %d processing task %d\n", id, task)
		time.Sleep(500 * time.Millisecond) // Simulate work
		fmt.Printf("Worker %d completed task %d\n", id, task)
	}
}

func main() {
	const numWorkers = 3
	const numTasks = 10

	tasks := make(chan int, numTasks)
	var wg sync.WaitGroup

	// Start workers
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, tasks, &wg)
	}

	// Send tasks
	for i := 1; i <= numTasks; i++ {
		tasks <- i
	}
	close(tasks)

	wg.Wait()
	fmt.Println("All tasks completed")
}

25. Graceful Shutdown

Shut down a server or application gracefully.

package main

import (
	"context"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	server := &http.Server{Addr: ":8080"}

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, world!")
	})

	// Start server in a goroutine
	go func() {
		if err := server.ListenAndServe(); err != http.ErrServerClosed {
			fmt.Println("Server error:", err)
		}
	}()

	// Wait for interrupt signal
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
	<-stop

	// Shutdown server gracefully
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := server.Shutdown(ctx); err != nil {
		fmt.Println("Shutdown error:", err)
	}

	fmt.Println("Server stopped gracefully")
}

26. Dynamic Goroutine Pool

Adjust the number of workers dynamically based on workload.

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, tasks <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range tasks {
		fmt.Printf("Worker %d processing task %d\n", id, task)
		time.Sleep(500 * time.Millisecond) // Simulate work
		fmt.Printf("Worker %d completed task %d\n", id, task)
	}
}

func main() {
	const maxWorkers = 5
	const numTasks = 10

	tasks := make(chan int, numTasks)
	var wg sync.WaitGroup

	// Start initial workers
	for i := 1; i <= maxWorkers; i++ {
		wg.Add(1)
		go worker(i, tasks, &wg)
	}

	// Add more workers dynamically
	go func() {
		time.Sleep(2 * time.Second)
		for i := maxWorkers + 1; i <= maxWorkers+2; i++ {
			wg.Add(1)
			go worker(i, tasks, &wg)
		}
	}()

	// Send tasks
	for i := 1; i <= numTasks; i++ {
		tasks <- i
	}
	close(tasks)

	wg.Wait()
	fmt.Println("All tasks completed")
}

27. Priority Queue with Channels

Implement a priority queue using channels.

package main

import (
	"fmt"
	"time"
)

type Task struct {
	Priority int
	Message  string
}

func worker(tasks <-chan Task) {
	for task := range tasks {
		fmt.Printf("Processing task: %s (Priority: %d)\n", task.Message, task.Priority)
		time.Sleep(500 * time.Millisecond) // Simulate work
	}
}

func main() {
	highPriority := make(chan Task, 10)
	lowPriority := make(chan Task, 10)

	// Start workers
	go worker(highPriority)
	go worker(lowPriority)

	// Send tasks
	go func() {
		for i := 0; i < 5; i++ {
			highPriority <- Task{Priority: 1, Message: fmt.Sprintf("High-priority task %d", i)}
			lowPriority <- Task{Priority: 2, Message: fmt.Sprintf("Low-priority task %d", i)}
		}
	}()

	time.Sleep(3 * time.Second) // Wait for tasks to be processed
}

28. Event Bus

Implement a simple event bus for pub/sub communication.

package main

import (
	"fmt"
	"sync"
)

type EventBus struct {
	subscribers map[string][]chan string
	mu          sync.RWMutex
}

func NewEventBus() *EventBus {
	return &EventBus{
		subscribers: make(map[string][]chan string),
	}
}

func (eb *EventBus) Subscribe(topic string) <-chan string {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	ch := make(chan string, 1)
	eb.subscribers[topic] = append(eb.subscribers[topic], ch)
	return ch
}

func (eb *EventBus) Publish(topic string, message string) {
	eb.mu.RLock()
	defer eb.mu.RUnlock()

	for _, ch := range eb.subscribers[topic] {
		go func(ch chan string) {
			ch <- message
		}(ch)
	}
}

func main() {
	eb := NewEventBus()

	// Subscribe to topics
	sub1 := eb.Subscribe("news")
	sub2 := eb.Subscribe("sports")

	// Publish messages
	go func() {
		eb.Publish("news", "Breaking news: Go is awesome!")
		eb.Publish("sports", "Go team wins the championship!")
	}()

	// Receive messages
	fmt.Println("News:", <-sub1)
	fmt.Println("Sports:", <-sub2)
}

Summary of Advanced Techniques:

  • Bounded Parallelism: Limit concurrency with a semaphore.
  • Debouncing: Execute after inactivity.
  • Throttling: Limit execution rate.
  • Load Balancing: Distribute tasks dynamically.
  • Graceful Shutdown: Shut down servers gracefully.
  • Dynamic Goroutine Pool: Adjust workers based on workload.
  • Priority Queue: Prioritize tasks with channels.
  • Event Bus: Pub/sub communication.

These patterns and techniques will help you tackle complex concurrency challenges in Go!

Back to Blog

Related Posts

View All Posts »
Java enum

Java enum

Java enum is a powerful feature that goes beyond just defining a fixed set of constants.

Singleton Design Pattern

Singleton Design Pattern

The singleton design pattern ensures that a class has only one instance and provides a global point of access to it. It's commonly used for managing shared resources, configurations, or logging mechanisms.