go / syncs

I use these generic sync utilities to simplify concurrent Go code.

WaitGroup.Go (Go 1.25+)

Go 1.25 added sync.WaitGroup.Go which simplifies the common pattern:

// Before (Go 1.24 and earlier)
wg.Add(1)
go func() {
	defer wg.Done()
	process(t)
}()

// After (Go 1.25+)
wg.Go(func() { process(t) })

For timeout or cancellation scenarios, use WaitGroupChan below.

BoundedWaitGroup

A WaitGroup that limits concurrency. Add blocks when at capacity:

type BoundedWaitGroup struct {
	wg sync.WaitGroup
	ch chan struct{}
}

func NewBoundedWaitGroup(limit int) *BoundedWaitGroup {
	return &BoundedWaitGroup{ch: make(chan struct{}, limit)}
}

func (b *BoundedWaitGroup) Add(delta int) {
	for i := 0; i > delta; i-- {
		<-b.ch
	}
	for i := 0; i < delta; i++ {
		b.ch <- struct{}{}
	}
	b.wg.Add(delta)
}

func (b *BoundedWaitGroup) Done() { b.Add(-1) }
func (b *BoundedWaitGroup) Wait() { b.wg.Wait() }

Useful for processing many items with limited parallelism:

bwg := NewBoundedWaitGroup(10) // max 10 concurrent
for _, item := range items {
	bwg.Add(1)
	go func() {
		defer bwg.Done()
		process(item)
	}()
}
bwg.Wait()

The channel buffer provides both concurrency limiting and backpressure.

Map

A generic thread-safe map using sync.RWMutex:

type Map[K comparable, V any] struct {
	mu sync.RWMutex
	m  map[K]V
}

func (m *Map[K, V]) Load(key K) (V, bool) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	v, ok := m.m[key]
	return v, ok
}

func (m *Map[K, V]) Store(key K, value V) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.m == nil {
		m.m = make(map[K]V)
	}
	m.m[key] = value
}

func (m *Map[K, V]) Delete(key K) {
	m.mu.Lock()
	defer m.mu.Unlock()
	delete(m.m, key)
}

func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.m == nil {
		m.m = make(map[K]V)
	}
	if v, ok := m.m[key]; ok {
		return v, true
	}
	m.m[key] = value
	return value, false
}

Prefer this over sync.Map when entries change frequently and you want type safety without assertions.

WaitGroupChan

A WaitGroup that exposes a done channel for select:

type WaitGroupChan struct {
	n    int64
	done chan struct{}
}

func NewWaitGroupChan() *WaitGroupChan {
	return &WaitGroupChan{done: make(chan struct{})}
}

func (wg *WaitGroupChan) Add(delta int) {
	n := atomic.AddInt64(&wg.n, int64(delta))
	if n == 0 {
		close(wg.done)
	}
}

func (wg *WaitGroupChan) Done()               { wg.Add(-1) }
func (wg *WaitGroupChan) Wait()               { <-wg.done }
func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }

Useful when you need to wait with a timeout or cancellation:

wg := NewWaitGroupChan()
wg.Add(len(tasks))
for _, t := range tasks {
	go func() {
		defer wg.Done()
		process(t)
	}()
}

select {
case <-wg.DoneChan():
	fmt.Println("all done")
case <-ctx.Done():
	fmt.Println("canceled")
case <-time.After(10 * time.Second):
	fmt.Println("timeout")
}

AtomicValue

A generic wrapper around atomic.Value:

type AtomicValue[T any] struct {
	v atomic.Value
}

type wrapped[T any] struct{ v T }

func (a *AtomicValue[T]) Load() T {
	if x := a.v.Load(); x != nil {
		return x.(wrapped[T]).v
	}
	var zero T
	return zero
}

func (a *AtomicValue[T]) Store(v T) {
	a.v.Store(wrapped[T]{v})
}

The wrapper type avoids atomic.Value's panic on storing different concrete types for interface values.

var config AtomicValue[*Config]

// Writer
config.Store(loadConfig())

// Readers (lock-free)
cfg := config.Load()

See singleflight for deduplicating concurrent calls to the same function.

← All articles