...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/broadcasters.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal

     1  package internal
     2  
     3  import (
     4  	"sync"
     5  
     6  	"golang.org/x/exp/slices"
     7  )
     8  
     9  // This file defines the publish-subscribe model we use for various status/event types in the SDK.
    10  //
    11  // The standard pattern is that AddListener returns a new receive-only channel; RemoveListener unsubscribes
    12  // that channel, and closes the sending end of it; Broadcast sends a value to all of the subscribed channels
    13  // (if any); and Close unsubscribes and closes all existing channels.
    14  
    15  // Arbitrary buffer size to make it less likely that we'll block when broadcasting to channels. It is still
    16  // the consumer's responsibility to make sure they're reading the channel.
    17  const subscriberChannelBufferLength = 10
    18  
    19  // Broadcaster is our generalized implementation of broadcasters.
    20  type Broadcaster[V any] struct {
    21  	subscribers []channelPair[V]
    22  	lock        sync.Mutex
    23  }
    24  
    25  // We need to keep track of both the channel we use for sending (stored as a reflect.Value, because Value
    26  // has methods for sending and closing), and also the
    27  type channelPair[V any] struct {
    28  	sendCh    chan<- V
    29  	receiveCh <-chan V
    30  }
    31  
    32  // NewBroadcaster creates a Broadcaster that operates on the specified value type.
    33  func NewBroadcaster[V any]() *Broadcaster[V] {
    34  	return &Broadcaster[V]{}
    35  }
    36  
    37  // AddListener adds a subscriber and returns a channel for it to receive values.
    38  func (b *Broadcaster[V]) AddListener() <-chan V {
    39  	ch := make(chan V, subscriberChannelBufferLength)
    40  	var receiveCh <-chan V = ch
    41  	chPair := channelPair[V]{sendCh: ch, receiveCh: receiveCh}
    42  	b.lock.Lock()
    43  	defer b.lock.Unlock()
    44  	b.subscribers = append(b.subscribers, chPair)
    45  	return receiveCh
    46  }
    47  
    48  // RemoveListener removes a subscriber. The parameter is the same channel that was returned by
    49  // AddListener.
    50  func (b *Broadcaster[V]) RemoveListener(ch <-chan V) {
    51  	b.lock.Lock()
    52  	defer b.lock.Unlock()
    53  	ss := b.subscribers
    54  	for i, s := range ss {
    55  		// The following equality test is the reason why we have to store both the sendCh (chan X) and
    56  		// the receiveCh (<-chan X) for each subscriber; "s.sendCh == ch" would not be true because
    57  		// they're of two different types.
    58  		if s.receiveCh == ch {
    59  			copy(ss[i:], ss[i+1:])
    60  			ss[len(ss)-1] = channelPair[V]{}
    61  			b.subscribers = ss[:len(ss)-1]
    62  			close(s.sendCh)
    63  			break
    64  		}
    65  	}
    66  }
    67  
    68  // HasListeners returns true if there are any current subscribers.
    69  func (b *Broadcaster[V]) HasListeners() bool {
    70  	return len(b.subscribers) > 0
    71  }
    72  
    73  // Broadcast broadcasts a value to all current subscribers.
    74  func (b *Broadcaster[V]) Broadcast(value V) {
    75  	b.lock.Lock()
    76  	ss := slices.Clone(b.subscribers)
    77  	b.lock.Unlock()
    78  	if len(ss) > 0 {
    79  		for _, ch := range ss {
    80  			ch.sendCh <- value
    81  		}
    82  	}
    83  }
    84  
    85  // Close closes all current subscriber channels.
    86  func (b *Broadcaster[V]) Close() {
    87  	b.lock.Lock()
    88  	defer b.lock.Unlock()
    89  	for _, s := range b.subscribers {
    90  		close(s.sendCh)
    91  	}
    92  	b.subscribers = nil
    93  }
    94  

View as plain text