broadcaster.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package broadcaster
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. )
  9. type Broadcaster[T any] interface {
  10. Publish(ctx context.Context, data T) error
  11. Subscribe() chan T
  12. }
  13. type broadcaster[T any] struct {
  14. mu sync.RWMutex
  15. channels []*chanMeta[T]
  16. bufSize int
  17. }
  18. type chanMeta[T any] struct {
  19. ch chan T
  20. closed atomic.Bool
  21. }
  22. func New[T any](bufSize int) Broadcaster[T] {
  23. return &broadcaster[T]{
  24. mu: sync.RWMutex{},
  25. channels: make([]*chanMeta[T], 0),
  26. bufSize: bufSize,
  27. }
  28. }
  29. // Publish implements Broadcaster.
  30. func (b *broadcaster[T]) Publish(ctx context.Context, data T) error {
  31. b.mu.RLock()
  32. defer b.mu.RUnlock()
  33. for _, meta := range b.channels {
  34. if meta.closed.Load() {
  35. continue
  36. }
  37. func() {
  38. defer func() {
  39. if err := recover(); err != nil {
  40. meta.closed.Store(true)
  41. fmt.Println(err)
  42. }
  43. }()
  44. select {
  45. case meta.ch <- data:
  46. case <-time.After(100 * time.Millisecond):
  47. meta.closed.Store(true)
  48. close(meta.ch)
  49. }
  50. }()
  51. }
  52. return nil
  53. }
  54. // Subscribe implements Broadcaster.
  55. func (b *broadcaster[T]) Subscribe() chan T {
  56. b.mu.Lock()
  57. defer b.mu.Unlock()
  58. ch := make(chan T, b.bufSize)
  59. b.channels = append(b.channels, &chanMeta[T]{
  60. ch: ch,
  61. closed: atomic.Bool{},
  62. })
  63. return ch
  64. }