...
1 package internal
2
3 import (
4 "sync"
5
6 "golang.org/x/exp/slices"
7 )
8
9
10
11
12
13
14
15
16
17 const subscriberChannelBufferLength = 10
18
19
20 type Broadcaster[V any] struct {
21 subscribers []channelPair[V]
22 lock sync.Mutex
23 }
24
25
26
27 type channelPair[V any] struct {
28 sendCh chan<- V
29 receiveCh <-chan V
30 }
31
32
33 func NewBroadcaster[V any]() *Broadcaster[V] {
34 return &Broadcaster[V]{}
35 }
36
37
38 func (b *Broadcaster[V]) AddListener() <-chan V {
39 ch := make(chan V, subscriberChannelBufferLength)
40 var receiveCh <-chan V = ch
41 chPair := channelPair[V]{sendCh: ch, receiveCh: receiveCh}
42 b.lock.Lock()
43 defer b.lock.Unlock()
44 b.subscribers = append(b.subscribers, chPair)
45 return receiveCh
46 }
47
48
49
50 func (b *Broadcaster[V]) RemoveListener(ch <-chan V) {
51 b.lock.Lock()
52 defer b.lock.Unlock()
53 ss := b.subscribers
54 for i, s := range ss {
55
56
57
58 if s.receiveCh == ch {
59 copy(ss[i:], ss[i+1:])
60 ss[len(ss)-1] = channelPair[V]{}
61 b.subscribers = ss[:len(ss)-1]
62 close(s.sendCh)
63 break
64 }
65 }
66 }
67
68
69 func (b *Broadcaster[V]) HasListeners() bool {
70 return len(b.subscribers) > 0
71 }
72
73
74 func (b *Broadcaster[V]) Broadcast(value V) {
75 b.lock.Lock()
76 ss := slices.Clone(b.subscribers)
77 b.lock.Unlock()
78 if len(ss) > 0 {
79 for _, ch := range ss {
80 ch.sendCh <- value
81 }
82 }
83 }
84
85
86 func (b *Broadcaster[V]) Close() {
87 b.lock.Lock()
88 defer b.lock.Unlock()
89 for _, s := range b.subscribers {
90 close(s.sendCh)
91 }
92 b.subscribers = nil
93 }
94
View as plain text