package broadcaster import ( "context" "sync" "sync/atomic" ) type Broadcaster[T any] interface { Publish(ctx context.Context, data T) error Subscribe() chan T } type broadcaster[T any] struct { mu sync.RWMutex channels []*chanMeta[T] bufSize int } type chanMeta[T any] struct { ch chan T closed atomic.Bool } func NewBroadcaster[T any](bufSize int) Broadcaster[T] { return &broadcaster[T]{ mu: sync.RWMutex{}, channels: make([]*chanMeta[T], 0), bufSize: bufSize, } } // Publish implements Broadcaster. func (b *broadcaster[T]) Publish(ctx context.Context, data T) error { b.mu.RLock() defer b.mu.RUnlock() for _, meta := range b.channels { if meta.closed.Load() { continue } select { case meta.ch <- data: default: meta.closed.Store(true) close(meta.ch) } } return nil } // Subscribe implements Broadcaster. func (b *broadcaster[T]) Subscribe() chan T { b.mu.Lock() defer b.mu.Unlock() ch := make(chan T, b.bufSize) b.channels = append(b.channels, &chanMeta[T]{ ch: ch, closed: atomic.Bool{}, }) return ch }