broadcaster.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package broadcaster
  2. import (
  3. "context"
  4. "sync"
  5. "sync/atomic"
  6. )
  7. type Broadcaster[T any] interface {
  8. Publish(ctx context.Context, data T) error
  9. Subscribe() chan T
  10. }
  11. type broadcaster[T any] struct {
  12. mu sync.RWMutex
  13. channels []*chanMeta[T]
  14. bufSize int
  15. }
  16. type chanMeta[T any] struct {
  17. ch chan T
  18. closed atomic.Bool
  19. }
  20. func NewBroadcaster[T any](bufSize int) Broadcaster[T] {
  21. return &broadcaster[T]{
  22. mu: sync.RWMutex{},
  23. channels: make([]*chanMeta[T], 0),
  24. bufSize: bufSize,
  25. }
  26. }
  27. // Publish implements Broadcaster.
  28. func (b *broadcaster[T]) Publish(ctx context.Context, data T) error {
  29. b.mu.RLock()
  30. defer b.mu.RUnlock()
  31. for _, meta := range b.channels {
  32. if meta.closed.Load() {
  33. continue
  34. }
  35. select {
  36. case meta.ch <- data:
  37. default:
  38. meta.closed.Store(true)
  39. close(meta.ch)
  40. }
  41. }
  42. return nil
  43. }
  44. // Subscribe implements Broadcaster.
  45. func (b *broadcaster[T]) Subscribe() chan T {
  46. b.mu.Lock()
  47. defer b.mu.Unlock()
  48. ch := make(chan T, b.bufSize)
  49. b.channels = append(b.channels, &chanMeta[T]{
  50. ch: ch,
  51. closed: atomic.Bool{},
  52. })
  53. return ch
  54. }