1 package ldstoreimpl
2
3 import (
4 "sync"
5 "time"
6
7 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
8 "github.com/launchdarkly/go-sdk-common/v3/ldreason"
9 "github.com/launchdarkly/go-sdk-common/v3/ldtime"
10 ldeval "github.com/launchdarkly/go-server-sdk-evaluation/v2"
11 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
12 "github.com/launchdarkly/go-server-sdk/v6/internal/bigsegments"
13 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
14
15 "github.com/launchdarkly/ccache"
16
17 "golang.org/x/sync/singleflight"
18 )
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 type BigSegmentStoreWrapper struct {
35 store subsystems.BigSegmentStore
36 statusUpdateFn func(interfaces.BigSegmentStoreStatus)
37 staleTime time.Duration
38 contextCache *ccache.Cache
39 cacheTTL time.Duration
40 pollInterval time.Duration
41 haveStatus bool
42 lastStatus interfaces.BigSegmentStoreStatus
43 requests singleflight.Group
44 pollCloser chan struct{}
45 pollingActive bool
46 loggers ldlog.Loggers
47 lock sync.RWMutex
48 }
49
50
51
52
53
54
55
56
57
58
59 func NewBigSegmentStoreWrapperWithConfig(
60 config BigSegmentsConfigurationProperties,
61 statusUpdateFn func(interfaces.BigSegmentStoreStatus),
62 loggers ldlog.Loggers,
63 ) *BigSegmentStoreWrapper {
64 pollCloser := make(chan struct{})
65 w := &BigSegmentStoreWrapper{
66 store: config.Store,
67 statusUpdateFn: statusUpdateFn,
68 staleTime: config.StaleAfter,
69 contextCache: ccache.New(ccache.Configure().MaxSize(int64(config.ContextCacheSize))),
70 cacheTTL: config.ContextCacheTime,
71 pollInterval: config.StatusPollInterval,
72 pollCloser: pollCloser,
73 pollingActive: config.StartPolling,
74 loggers: loggers,
75 }
76
77 if config.StartPolling {
78 go w.runPollTask(config.StatusPollInterval, pollCloser)
79 }
80
81 return w
82 }
83
84
85 func (w *BigSegmentStoreWrapper) Close() {
86 w.lock.Lock()
87 if w.pollCloser != nil {
88 close(w.pollCloser)
89 w.pollCloser = nil
90 }
91 if w.contextCache != nil {
92 w.contextCache.Stop()
93 w.contextCache = nil
94 }
95 w.lock.Unlock()
96
97 _ = w.store.Close()
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 func (w *BigSegmentStoreWrapper) GetMembership(
115 contextKey string,
116 ) (ldeval.BigSegmentMembership, ldreason.BigSegmentsStatus) {
117 entry := w.safeCacheGet(contextKey)
118 var result ldeval.BigSegmentMembership
119 if entry == nil || entry.Expired() {
120
121
122 value, err, _ := w.requests.Do(contextKey, func() (interface{}, error) {
123 hash := bigsegments.HashForContextKey(contextKey)
124 w.loggers.Debugf("querying Big Segment state for context hash %q", hash)
125 return w.store.GetMembership(hash)
126 })
127 if err != nil {
128 w.loggers.Errorf("Big Segment store returned error: %s", err)
129 return nil, ldreason.BigSegmentsStoreError
130 }
131 if value == nil {
132 w.safeCacheSet(contextKey, nil, w.cacheTTL)
133 return nil, ldreason.BigSegmentsHealthy
134 }
135 if membership, ok := value.(subsystems.BigSegmentMembership); ok {
136 w.safeCacheSet(contextKey, membership, w.cacheTTL)
137 result = membership
138 } else {
139 w.loggers.Error("BigSegmentStoreWrapper got wrong value type from request - this should not be possible")
140 return nil, ldreason.BigSegmentsStoreError
141 }
142 } else if entry.Value() != nil {
143 if membership, ok := entry.Value().(subsystems.BigSegmentMembership); ok {
144 result = membership
145 } else {
146 w.loggers.Error("BigSegmentStoreWrapper got wrong value type from cache - this should not be possible")
147 return nil, ldreason.BigSegmentsStoreError
148 }
149 }
150
151 status := ldreason.BigSegmentsHealthy
152 if w.GetStatus().Stale {
153 status = ldreason.BigSegmentsStale
154 }
155
156 return result, status
157 }
158
159
160
161
162
163
164
165
166
167
168 func (w *BigSegmentStoreWrapper) GetStatus() interfaces.BigSegmentStoreStatus {
169 w.lock.RLock()
170 status := w.lastStatus
171 haveStatus := w.haveStatus
172 w.lock.RUnlock()
173
174 if haveStatus {
175 return status
176 }
177
178 return w.pollStoreAndUpdateStatus()
179 }
180
181
182
183
184
185 func (w *BigSegmentStoreWrapper) ClearCache() {
186 w.lock.Lock()
187 if w.contextCache != nil {
188 w.contextCache.Clear()
189 }
190 w.lock.Unlock()
191 w.loggers.Debug("invalidated cache")
192 }
193
194
195
196
197 func (w *BigSegmentStoreWrapper) SetPollingActive(active bool) {
198 w.lock.Lock()
199 defer w.lock.Unlock()
200 if w.pollingActive != active {
201 w.pollingActive = active
202 if active {
203 w.pollCloser = make(chan struct{})
204 go w.runPollTask(w.pollInterval, w.pollCloser)
205 } else if w.pollCloser != nil {
206 close(w.pollCloser)
207 w.pollCloser = nil
208 }
209 w.loggers.Debugf("setting status polling to %t", active)
210 }
211 }
212
213 func (w *BigSegmentStoreWrapper) pollStoreAndUpdateStatus() interfaces.BigSegmentStoreStatus {
214 var newStatus interfaces.BigSegmentStoreStatus
215 w.loggers.Debug("querying Big Segment store metadata")
216 metadata, err := w.store.GetMetadata()
217
218 w.lock.Lock()
219 if err == nil {
220 newStatus.Available = true
221 newStatus.Stale = w.isStale(metadata.LastUpToDate)
222 w.loggers.Debugf("Big Segment store was last updated at %d", metadata.LastUpToDate)
223 } else {
224 w.loggers.Errorf("Big Segment store status query returned error: %s", err)
225 newStatus.Available = false
226 }
227 oldStatus := w.lastStatus
228 w.lastStatus = newStatus
229 hadStatus := w.haveStatus
230 w.haveStatus = true
231 w.lock.Unlock()
232
233 if !hadStatus || (newStatus != oldStatus) {
234 w.loggers.Debugf(
235 "Big Segment store status has changed from %+v to %+v",
236 oldStatus,
237 newStatus,
238 )
239 if w.statusUpdateFn != nil {
240 w.statusUpdateFn(newStatus)
241 }
242 }
243
244 return newStatus
245 }
246
247 func (w *BigSegmentStoreWrapper) isStale(updateTime ldtime.UnixMillisecondTime) bool {
248 age := time.Duration(uint64(ldtime.UnixMillisNow())-uint64(updateTime)) * time.Millisecond
249 return age >= w.staleTime
250 }
251
252 func (w *BigSegmentStoreWrapper) runPollTask(pollInterval time.Duration, pollCloser <-chan struct{}) {
253 if pollInterval > w.staleTime {
254 pollInterval = w.staleTime
255 }
256 ticker := time.NewTicker(pollInterval)
257 for {
258 select {
259 case <-pollCloser:
260 ticker.Stop()
261 return
262 case <-ticker.C:
263 _ = w.pollStoreAndUpdateStatus()
264 }
265 }
266 }
267
268
269
270 func (w *BigSegmentStoreWrapper) safeCacheGet(key string) *ccache.Item {
271 var ret *ccache.Item
272 w.lock.RLock()
273 if w.contextCache != nil {
274 ret = w.contextCache.Get(key)
275 }
276 w.lock.RUnlock()
277 return ret
278 }
279
280 func (w *BigSegmentStoreWrapper) safeCacheSet(key string, value interface{}, ttl time.Duration) {
281 w.lock.RLock()
282 if w.contextCache != nil {
283 w.contextCache.Set(key, value, ttl)
284 }
285 w.lock.RUnlock()
286 }
287
View as plain text