...
1 package internal
2
3 import (
4 "sync"
5
6 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
7
8 "github.com/launchdarkly/go-sdk-common/v3/ldcontext"
9 "github.com/launchdarkly/go-sdk-common/v3/ldvalue"
10 )
11
12
13
14
15
16
17
18
19
20 type flagTrackerImpl struct {
21 broadcaster *Broadcaster[interfaces.FlagChangeEvent]
22 evaluateFn func(string, ldcontext.Context, ldvalue.Value) ldvalue.Value
23 valueChangeSubscriptions map[<-chan interfaces.FlagValueChangeEvent]<-chan interfaces.FlagChangeEvent
24 lock sync.Mutex
25 }
26
27
28 func NewFlagTrackerImpl(
29 broadcaster *Broadcaster[interfaces.FlagChangeEvent],
30 evaluateFn func(flagKey string, context ldcontext.Context, defaultValue ldvalue.Value) ldvalue.Value,
31 ) interfaces.FlagTracker {
32 return &flagTrackerImpl{
33 broadcaster: broadcaster,
34 evaluateFn: evaluateFn,
35 valueChangeSubscriptions: make(map[<-chan interfaces.FlagValueChangeEvent]<-chan interfaces.FlagChangeEvent),
36 }
37 }
38
39
40 func (f *flagTrackerImpl) AddFlagChangeListener() <-chan interfaces.FlagChangeEvent {
41 return f.broadcaster.AddListener()
42 }
43
44
45 func (f *flagTrackerImpl) RemoveFlagChangeListener(listener <-chan interfaces.FlagChangeEvent) {
46 f.broadcaster.RemoveListener(listener)
47 }
48
49
50 func (f *flagTrackerImpl) AddFlagValueChangeListener(
51 flagKey string,
52 context ldcontext.Context,
53 defaultValue ldvalue.Value,
54 ) <-chan interfaces.FlagValueChangeEvent {
55 valueCh := make(chan interfaces.FlagValueChangeEvent, subscriberChannelBufferLength)
56 flagCh := f.broadcaster.AddListener()
57 go runValueChangeListener(flagCh, valueCh, f.evaluateFn, flagKey, context, defaultValue)
58
59 f.lock.Lock()
60 f.valueChangeSubscriptions[valueCh] = flagCh
61 f.lock.Unlock()
62
63 return valueCh
64 }
65
66
67 func (f *flagTrackerImpl) RemoveFlagValueChangeListener(listener <-chan interfaces.FlagValueChangeEvent) {
68 f.lock.Lock()
69 flagCh, ok := f.valueChangeSubscriptions[listener]
70 delete(f.valueChangeSubscriptions, listener)
71 f.lock.Unlock()
72
73 if ok {
74 f.broadcaster.RemoveListener(flagCh)
75 }
76 }
77
78 func runValueChangeListener(
79 flagCh <-chan interfaces.FlagChangeEvent,
80 valueCh chan<- interfaces.FlagValueChangeEvent,
81 evaluateFn func(flagKey string, context ldcontext.Context, defaultValue ldvalue.Value) ldvalue.Value,
82 flagKey string,
83 context ldcontext.Context,
84 defaultValue ldvalue.Value,
85 ) {
86 currentValue := evaluateFn(flagKey, context, defaultValue)
87 for {
88 flagChange, ok := <-flagCh
89 if !ok {
90
91 close(valueCh)
92 return
93 }
94 if flagChange.Key != flagKey {
95 continue
96 }
97 newValue := evaluateFn(flagKey, context, defaultValue)
98 if newValue.Equal(currentValue) {
99 continue
100 }
101 event := interfaces.FlagValueChangeEvent{Key: flagKey, OldValue: currentValue, NewValue: newValue}
102 currentValue = newValue
103 valueCh <- event
104 }
105 }
106
View as plain text