1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- package broadcaster
- import (
- "context"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- type Broadcaster[T any] interface {
- Publish(ctx context.Context, data T) error
- Subscribe() chan T
- }
- type broadcaster[T any] struct {
- mu sync.RWMutex
- channels []*chanMeta[T]
- bufSize int
- }
- type chanMeta[T any] struct {
- ch chan T
- closed atomic.Bool
- }
- func New[T any](bufSize int) Broadcaster[T] {
- return &broadcaster[T]{
- mu: sync.RWMutex{},
- channels: make([]*chanMeta[T], 0),
- bufSize: bufSize,
- }
- }
- // Publish implements Broadcaster.
- func (b *broadcaster[T]) Publish(ctx context.Context, data T) error {
- b.mu.RLock()
- defer b.mu.RUnlock()
- for _, meta := range b.channels {
- if meta.closed.Load() {
- continue
- }
- func() {
- defer func() {
- if err := recover(); err != nil {
- meta.closed.Store(true)
- fmt.Println(err)
- }
- }()
- select {
- case meta.ch <- data:
- case <-time.After(100 * time.Millisecond):
- meta.closed.Store(true)
- close(meta.ch)
- }
- }()
- }
- return nil
- }
- // Subscribe implements Broadcaster.
- func (b *broadcaster[T]) Subscribe() chan T {
- b.mu.Lock()
- defer b.mu.Unlock()
- ch := make(chan T, b.bufSize)
- b.channels = append(b.channels, &chanMeta[T]{
- ch: ch,
- closed: atomic.Bool{},
- })
- return ch
- }
|