Forráskód Böngészése

simple boradcaster

iwanhae 11 hónapja
szülő
commit
41c96f506a
4 módosított fájl, 136 hozzáadás és 1 törlés
  1. 3 0
      go.mod
  2. 2 1
      go.sum
  3. 67 0
      pkg/broadcaster/boradcaster_test.go
  4. 64 0
      pkg/broadcaster/broadcaster.go

+ 3 - 0
go.mod

@@ -5,11 +5,14 @@ go 1.20
 require (
 	github.com/onsi/ginkgo/v2 v2.11.0
 	github.com/onsi/gomega v1.27.10
+	github.com/stretchr/testify v1.8.4
 	k8s.io/apimachinery v0.28.0
 	k8s.io/client-go v0.28.0
 	sigs.k8s.io/controller-runtime v0.16.0
 )
 
+require github.com/pmezard/go-difflib v1.0.0 // indirect
+
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect

+ 2 - 1
go.sum

@@ -125,7 +125,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
-github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

+ 67 - 0
pkg/broadcaster/boradcaster_test.go

@@ -0,0 +1,67 @@
+package broadcaster_test
+
+import (
+	"context"
+	"testing"
+
+	"github.com/iwanhae/nodb/pkg/broadcaster"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestNewBroadcaster(t *testing.T) {
+	b := broadcaster.NewBroadcaster[int](50)
+	if b == nil {
+		t.Error("NewBroadcaster should not return nil")
+	}
+
+	chA := b.Subscribe()
+	chB := b.Subscribe()
+
+	ctx := context.Background()
+	b.Publish(ctx, 1)
+	b.Publish(ctx, 2)
+	b.Publish(ctx, 3)
+
+	idx := 1
+loopA:
+	for {
+		select {
+		case val := <-chA:
+			assert.Equal(t, idx, val)
+			idx += 1
+		default:
+			break loopA
+		}
+	}
+
+	idx = 1
+loopB:
+	for {
+		select {
+		case val := <-chB:
+			assert.Equal(t, idx, val)
+			idx += 1
+		default:
+			break loopB
+		}
+	}
+}
+
+func TestNewBroadcaster_auto_close(t *testing.T) {
+	b := broadcaster.NewBroadcaster[int](50)
+	if b == nil {
+		t.Error("NewBroadcaster should not return nil")
+	}
+
+	ch := b.Subscribe()
+	ctx := context.Background()
+	for i := 0; i < 52; i++ {
+		b.Publish(ctx, i)
+	}
+	for i := 0; i < 50; i++ {
+		_, ok := <-ch
+		assert.True(t, ok)
+	}
+	_, ok := <-ch
+	assert.False(t, ok)
+}

+ 64 - 0
pkg/broadcaster/broadcaster.go

@@ -0,0 +1,64 @@
+package broadcaster
+
+import (
+	"context"
+	"sync"
+	"sync/atomic"
+)
+
+type Broadcaster[T any] interface {
+	Publish(ctx context.Context, data T) error
+	Subscribe() chan T
+}
+
+type broadcaster[T any] struct {
+	mu       sync.RWMutex
+	channels []*chanMeta[T]
+	bufSize  int
+}
+
+type chanMeta[T any] struct {
+	ch     chan T
+	closed atomic.Bool
+}
+
+func NewBroadcaster[T any](bufSize int) Broadcaster[T] {
+	return &broadcaster[T]{
+		mu:       sync.RWMutex{},
+		channels: make([]*chanMeta[T], 0),
+		bufSize:  bufSize,
+	}
+}
+
+// Publish implements Broadcaster.
+func (b *broadcaster[T]) Publish(ctx context.Context, data T) error {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+
+	for _, meta := range b.channels {
+		if meta.closed.Load() {
+			continue
+		}
+		select {
+		case meta.ch <- data:
+		default:
+			meta.closed.Store(true)
+			close(meta.ch)
+		}
+	}
+
+	return nil
+}
+
+// Subscribe implements Broadcaster.
+func (b *broadcaster[T]) Subscribe() chan T {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+
+	ch := make(chan T, b.bufSize)
+	b.channels = append(b.channels, &chanMeta[T]{
+		ch:     ch,
+		closed: atomic.Bool{},
+	})
+	return ch
+}