12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- package broadcaster
- import (
- "context"
- "sync"
- "sync/atomic"
- )
- type Broadcaster[T any] interface {
- Publish(ctx context.Context, data T) error
- Subscribe() chan T
- }
- type broadcaster[T any] struct {
- mu sync.RWMutex
- channels []*chanMeta[T]
- bufSize int
- }
- type chanMeta[T any] struct {
- ch chan T
- closed atomic.Bool
- }
- func NewBroadcaster[T any](bufSize int) Broadcaster[T] {
- return &broadcaster[T]{
- mu: sync.RWMutex{},
- channels: make([]*chanMeta[T], 0),
- bufSize: bufSize,
- }
- }
- // Publish implements Broadcaster.
- func (b *broadcaster[T]) Publish(ctx context.Context, data T) error {
- b.mu.RLock()
- defer b.mu.RUnlock()
- for _, meta := range b.channels {
- if meta.closed.Load() {
- continue
- }
- select {
- case meta.ch <- data:
- default:
- meta.closed.Store(true)
- close(meta.ch)
- }
- }
- return nil
- }
- // Subscribe implements Broadcaster.
- func (b *broadcaster[T]) Subscribe() chan T {
- b.mu.Lock()
- defer b.mu.Unlock()
- ch := make(chan T, b.bufSize)
- b.channels = append(b.channels, &chanMeta[T]{
- ch: ch,
- closed: atomic.Bool{},
- })
- return ch
- }
|