package broadcaster import ( "context" "fmt" "sync" "sync/atomic" "time" ) type Broadcaster[T any] interface { Publish(ctx context.Context, data T) error Subscribe() chan T } type broadcaster[T any] struct { mu sync.RWMutex channels []*chanMeta[T] bufSize int } type chanMeta[T any] struct { ch chan T closed atomic.Bool } func New[T any](bufSize int) Broadcaster[T] { return &broadcaster[T]{ mu: sync.RWMutex{}, channels: make([]*chanMeta[T], 0), bufSize: bufSize, } } // Publish implements Broadcaster. func (b *broadcaster[T]) Publish(ctx context.Context, data T) error { b.mu.RLock() defer b.mu.RUnlock() for _, meta := range b.channels { if meta.closed.Load() { continue } func() { defer func() { if err := recover(); err != nil { meta.closed.Store(true) fmt.Println(err) } }() select { case meta.ch <- data: case <-time.After(100 * time.Millisecond): meta.closed.Store(true) close(meta.ch) } }() } return nil } // Subscribe implements Broadcaster. func (b *broadcaster[T]) Subscribe() chan T { b.mu.Lock() defer b.mu.Unlock() ch := make(chan T, b.bufSize) b.channels = append(b.channels, &chanMeta[T]{ ch: ch, closed: atomic.Bool{}, }) return ch }