go / syncs
I use these generic sync utilities to simplify concurrent Go code.
WaitGroup.Go (Go 1.25+)
Go 1.25 added sync.WaitGroup.Go which simplifies the common pattern:
// Before (Go 1.24 and earlier)
wg.Add(1)
go func() {
defer wg.Done()
process(t)
}()
// After (Go 1.25+)
wg.Go(func() { process(t) })
For timeout or cancellation scenarios, use WaitGroupChan below.
BoundedWaitGroup
A WaitGroup that limits concurrency. Add blocks when at capacity:
type BoundedWaitGroup struct {
wg sync.WaitGroup
ch chan struct{}
}
func NewBoundedWaitGroup(limit int) *BoundedWaitGroup {
return &BoundedWaitGroup{ch: make(chan struct{}, limit)}
}
func (b *BoundedWaitGroup) Add(delta int) {
for i := 0; i > delta; i-- {
<-b.ch
}
for i := 0; i < delta; i++ {
b.ch <- struct{}{}
}
b.wg.Add(delta)
}
func (b *BoundedWaitGroup) Done() { b.Add(-1) }
func (b *BoundedWaitGroup) Wait() { b.wg.Wait() }
Useful for processing many items with limited parallelism:
bwg := NewBoundedWaitGroup(10) // max 10 concurrent
for _, item := range items {
bwg.Add(1)
go func() {
defer bwg.Done()
process(item)
}()
}
bwg.Wait()
The channel buffer provides both concurrency limiting and backpressure.
Map
A generic thread-safe map using sync.RWMutex:
type Map[K comparable, V any] struct {
mu sync.RWMutex
m map[K]V
}
func (m *Map[K, V]) Load(key K) (V, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
v, ok := m.m[key]
return v, ok
}
func (m *Map[K, V]) Store(key K, value V) {
m.mu.Lock()
defer m.mu.Unlock()
if m.m == nil {
m.m = make(map[K]V)
}
m.m[key] = value
}
func (m *Map[K, V]) Delete(key K) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.m, key)
}
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
m.mu.Lock()
defer m.mu.Unlock()
if m.m == nil {
m.m = make(map[K]V)
}
if v, ok := m.m[key]; ok {
return v, true
}
m.m[key] = value
return value, false
}
Prefer this over sync.Map when entries change frequently
and you want type safety without assertions.
WaitGroupChan
A WaitGroup that exposes a done channel for select:
type WaitGroupChan struct {
n int64
done chan struct{}
}
func NewWaitGroupChan() *WaitGroupChan {
return &WaitGroupChan{done: make(chan struct{})}
}
func (wg *WaitGroupChan) Add(delta int) {
n := atomic.AddInt64(&wg.n, int64(delta))
if n == 0 {
close(wg.done)
}
}
func (wg *WaitGroupChan) Done() { wg.Add(-1) }
func (wg *WaitGroupChan) Wait() { <-wg.done }
func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
Useful when you need to wait with a timeout or cancellation:
wg := NewWaitGroupChan()
wg.Add(len(tasks))
for _, t := range tasks {
go func() {
defer wg.Done()
process(t)
}()
}
select {
case <-wg.DoneChan():
fmt.Println("all done")
case <-ctx.Done():
fmt.Println("canceled")
case <-time.After(10 * time.Second):
fmt.Println("timeout")
}
AtomicValue
A generic wrapper around atomic.Value:
type AtomicValue[T any] struct {
v atomic.Value
}
type wrapped[T any] struct{ v T }
func (a *AtomicValue[T]) Load() T {
if x := a.v.Load(); x != nil {
return x.(wrapped[T]).v
}
var zero T
return zero
}
func (a *AtomicValue[T]) Store(v T) {
a.v.Store(wrapped[T]{v})
}
The wrapper type avoids atomic.Value's panic on storing
different concrete types for interface values.
var config AtomicValue[*Config]
// Writer
config.Store(loadConfig())
// Readers (lock-free)
cfg := config.Load()
See singleflight for deduplicating concurrent calls to the same function.