1 package datasource
2
3 import (
4 "fmt"
5 "sync"
6 "time"
7
8 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
9 intf "github.com/launchdarkly/go-server-sdk/v6/interfaces"
10 "github.com/launchdarkly/go-server-sdk/v6/internal"
11 "github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
12 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
13 st "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
14 )
15
16
17
18
19 type DataSourceUpdateSinkImpl struct {
20 store subsystems.DataStore
21 dataStoreStatusProvider intf.DataStoreStatusProvider
22 dataSourceStatusBroadcaster *internal.Broadcaster[intf.DataSourceStatus]
23 flagChangeEventBroadcaster *internal.Broadcaster[intf.FlagChangeEvent]
24 dependencyTracker *dependencyTracker
25 outageTracker *outageTracker
26 loggers ldlog.Loggers
27 currentStatus intf.DataSourceStatus
28 lastStoreUpdateFailed bool
29 lock sync.Mutex
30 }
31
32
33 func NewDataSourceUpdateSinkImpl(
34 store subsystems.DataStore,
35 dataStoreStatusProvider intf.DataStoreStatusProvider,
36 dataSourceStatusBroadcaster *internal.Broadcaster[intf.DataSourceStatus],
37 flagChangeEventBroadcaster *internal.Broadcaster[intf.FlagChangeEvent],
38 logDataSourceOutageAsErrorAfter time.Duration,
39 loggers ldlog.Loggers,
40 ) *DataSourceUpdateSinkImpl {
41 return &DataSourceUpdateSinkImpl{
42 store: store,
43 dataStoreStatusProvider: dataStoreStatusProvider,
44 dataSourceStatusBroadcaster: dataSourceStatusBroadcaster,
45 flagChangeEventBroadcaster: flagChangeEventBroadcaster,
46 dependencyTracker: newDependencyTracker(),
47 outageTracker: newOutageTracker(logDataSourceOutageAsErrorAfter, loggers),
48 loggers: loggers,
49 currentStatus: intf.DataSourceStatus{
50 State: intf.DataSourceStateInitializing,
51 StateSince: time.Now(),
52 },
53 }
54 }
55
56
57 func (d *DataSourceUpdateSinkImpl) Init(allData []st.Collection) bool {
58 var oldData map[st.DataKind]map[string]st.ItemDescriptor
59
60 if d.flagChangeEventBroadcaster.HasListeners() {
61
62 oldData = make(map[st.DataKind]map[string]st.ItemDescriptor)
63 for _, kind := range datakinds.AllDataKinds() {
64 if items, err := d.store.GetAll(kind); err == nil {
65 m := make(map[string]st.ItemDescriptor)
66 for _, item := range items {
67 m[item.Key] = item.Item
68 }
69 oldData[kind] = m
70 }
71 }
72 }
73
74 err := d.store.Init(sortCollectionsForDataStoreInit(allData))
75 updated := d.maybeUpdateError(err)
76
77 if updated {
78
79
80 d.updateDependencyTrackerFromFullDataSet(allData)
81
82
83
84 if oldData != nil {
85 d.sendChangeEvents(d.computeChangedItemsForFullDataSet(oldData, fullDataSetToMap(allData)))
86 }
87 }
88
89 return updated
90 }
91
92
93 func (d *DataSourceUpdateSinkImpl) Upsert(
94 kind st.DataKind,
95 key string,
96 item st.ItemDescriptor,
97 ) bool {
98 updated, err := d.store.Upsert(kind, key, item)
99 didNotGetError := d.maybeUpdateError(err)
100
101 if updated {
102 d.dependencyTracker.updateDependenciesFrom(kind, key, item)
103 if d.flagChangeEventBroadcaster.HasListeners() {
104 affectedItems := make(kindAndKeySet)
105 d.dependencyTracker.addAffectedItems(affectedItems, kindAndKey{kind, key})
106 d.sendChangeEvents(affectedItems)
107 }
108 }
109
110 return didNotGetError
111 }
112
113 func (d *DataSourceUpdateSinkImpl) maybeUpdateError(err error) bool {
114 if err == nil {
115 d.lock.Lock()
116 defer d.lock.Unlock()
117 d.lastStoreUpdateFailed = false
118 return true
119 }
120
121 d.UpdateStatus(
122 intf.DataSourceStateInterrupted,
123 intf.DataSourceErrorInfo{
124 Kind: intf.DataSourceErrorKindStoreError,
125 Message: err.Error(),
126 Time: time.Now(),
127 },
128 )
129
130 shouldLog := false
131 d.lock.Lock()
132 shouldLog = !d.lastStoreUpdateFailed
133 d.lastStoreUpdateFailed = true
134 d.lock.Unlock()
135 if shouldLog {
136 d.loggers.Warnf("Unexpected data store error when trying to store an update received from the data source: %s", err)
137 }
138
139 return false
140 }
141
142
143 func (d *DataSourceUpdateSinkImpl) UpdateStatus(
144 newState intf.DataSourceState,
145 newError intf.DataSourceErrorInfo,
146 ) {
147 if newState == "" {
148 return
149 }
150 if statusToBroadcast, changed := d.maybeUpdateStatus(newState, newError); changed {
151 d.dataSourceStatusBroadcaster.Broadcast(statusToBroadcast)
152 }
153 }
154
155 func (d *DataSourceUpdateSinkImpl) maybeUpdateStatus(
156 newState intf.DataSourceState,
157 newError intf.DataSourceErrorInfo,
158 ) (intf.DataSourceStatus, bool) {
159 d.lock.Lock()
160 defer d.lock.Unlock()
161
162 oldStatus := d.currentStatus
163
164 if newState == intf.DataSourceStateInterrupted && oldStatus.State == intf.DataSourceStateInitializing {
165 newState = intf.DataSourceStateInitializing
166 }
167
168 if newState == oldStatus.State && newError.Kind == "" {
169 return intf.DataSourceStatus{}, false
170 }
171
172 stateSince := oldStatus.StateSince
173 if newState != oldStatus.State {
174 stateSince = time.Now()
175 }
176 lastError := oldStatus.LastError
177 if newError.Kind != "" {
178 lastError = newError
179 }
180 d.currentStatus = intf.DataSourceStatus{
181 State: newState,
182 StateSince: stateSince,
183 LastError: lastError,
184 }
185
186 d.outageTracker.trackDataSourceState(newState, newError)
187
188 return d.currentStatus, true
189 }
190
191
192 func (d *DataSourceUpdateSinkImpl) GetDataStoreStatusProvider() intf.DataStoreStatusProvider {
193 return d.dataStoreStatusProvider
194 }
195
196
197 func (d *DataSourceUpdateSinkImpl) GetLastStatus() intf.DataSourceStatus {
198 d.lock.Lock()
199 defer d.lock.Unlock()
200 return d.currentStatus
201 }
202
203 func (d *DataSourceUpdateSinkImpl) waitFor(desiredState intf.DataSourceState, timeout time.Duration) bool {
204 d.lock.Lock()
205 if d.currentStatus.State == desiredState {
206 d.lock.Unlock()
207 return true
208 }
209 if d.currentStatus.State == intf.DataSourceStateOff {
210 d.lock.Unlock()
211 return false
212 }
213
214 statusCh := d.dataSourceStatusBroadcaster.AddListener()
215 defer d.dataSourceStatusBroadcaster.RemoveListener(statusCh)
216 d.lock.Unlock()
217
218 var deadline <-chan time.Time
219 if timeout > 0 {
220 deadline = time.After(timeout)
221 }
222
223 for {
224 select {
225 case newStatus := <-statusCh:
226 if newStatus.State == desiredState {
227 return true
228 }
229 if newStatus.State == intf.DataSourceStateOff {
230 return false
231 }
232 case <-deadline:
233 return false
234 }
235 }
236 }
237
238 func (d *DataSourceUpdateSinkImpl) sendChangeEvents(affectedItems kindAndKeySet) {
239 for item := range affectedItems {
240 if item.kind == datakinds.Features {
241 d.flagChangeEventBroadcaster.Broadcast(intf.FlagChangeEvent{Key: item.key})
242 }
243 }
244 }
245
246 func (d *DataSourceUpdateSinkImpl) updateDependencyTrackerFromFullDataSet(allData []st.Collection) {
247 d.dependencyTracker.reset()
248 for _, coll := range allData {
249 for _, item := range coll.Items {
250 d.dependencyTracker.updateDependenciesFrom(coll.Kind, item.Key, item.Item)
251 }
252 }
253 }
254
255 func fullDataSetToMap(allData []st.Collection) map[st.DataKind]map[string]st.ItemDescriptor {
256 ret := make(map[st.DataKind]map[string]st.ItemDescriptor, len(allData))
257 for _, coll := range allData {
258 m := make(map[string]st.ItemDescriptor, len(coll.Items))
259 for _, item := range coll.Items {
260 m[item.Key] = item.Item
261 }
262 ret[coll.Kind] = m
263 }
264 return ret
265 }
266
267 func (d *DataSourceUpdateSinkImpl) computeChangedItemsForFullDataSet(
268 oldDataMap map[st.DataKind]map[string]st.ItemDescriptor,
269 newDataMap map[st.DataKind]map[string]st.ItemDescriptor,
270 ) kindAndKeySet {
271 affectedItems := make(kindAndKeySet)
272 for _, kind := range datakinds.AllDataKinds() {
273 oldItems := oldDataMap[kind]
274 newItems := newDataMap[kind]
275 allKeys := make([]string, 0, len(oldItems)+len(newItems))
276 for key := range oldItems {
277 allKeys = append(allKeys, key)
278 }
279 for key := range newItems {
280 if _, found := oldItems[key]; !found {
281 allKeys = append(allKeys, key)
282 }
283 }
284 for _, key := range allKeys {
285 oldItem, haveOld := oldItems[key]
286 newItem, haveNew := newItems[key]
287 if haveOld || haveNew {
288 if !haveOld || !haveNew || oldItem.Version < newItem.Version {
289 d.dependencyTracker.addAffectedItems(affectedItems, kindAndKey{kind, key})
290 }
291 }
292 }
293 }
294 return affectedItems
295 }
296
297 type outageTracker struct {
298 outageLoggingTimeout time.Duration
299 loggers ldlog.Loggers
300 inOutage bool
301 errorCounts map[intf.DataSourceErrorInfo]int
302 timeoutCloser chan struct{}
303 lock sync.Mutex
304 }
305
306 func newOutageTracker(outageLoggingTimeout time.Duration, loggers ldlog.Loggers) *outageTracker {
307 return &outageTracker{
308 outageLoggingTimeout: outageLoggingTimeout,
309 loggers: loggers,
310 }
311 }
312
313 func (o *outageTracker) trackDataSourceState(newState intf.DataSourceState, newError intf.DataSourceErrorInfo) {
314 if o.outageLoggingTimeout == 0 {
315 return
316 }
317
318 o.lock.Lock()
319 defer o.lock.Unlock()
320
321 if newState == intf.DataSourceStateInterrupted || newError.Kind != "" ||
322 (newState == intf.DataSourceStateInitializing && o.inOutage) {
323
324
325 if o.inOutage {
326
327 o.recordError(newError)
328 } else {
329
330 o.inOutage = true
331 o.errorCounts = make(map[intf.DataSourceErrorInfo]int)
332 o.recordError(newError)
333 o.timeoutCloser = make(chan struct{})
334 go o.awaitTimeout(o.timeoutCloser)
335 }
336 } else {
337 if o.timeoutCloser != nil {
338 close(o.timeoutCloser)
339 o.timeoutCloser = nil
340 }
341 o.inOutage = false
342 }
343 }
344
345 func (o *outageTracker) recordError(newError intf.DataSourceErrorInfo) {
346
347
348 basicErrorInfo := intf.DataSourceErrorInfo{Kind: newError.Kind, StatusCode: newError.StatusCode}
349 o.errorCounts[basicErrorInfo]++
350 }
351
352 func (o *outageTracker) awaitTimeout(closer chan struct{}) {
353 select {
354 case <-closer:
355 return
356 case <-time.After(o.outageLoggingTimeout):
357 break
358 }
359
360 o.lock.Lock()
361 if !o.inOutage {
362
363 o.lock.Unlock()
364 return
365 }
366 errorsDesc := o.describeErrors()
367 o.timeoutCloser = nil
368 o.lock.Unlock()
369
370 o.loggers.Errorf(
371 "LaunchDarkly data source outage - updates have been unavailable for at least %s with the following errors: %s",
372 o.outageLoggingTimeout,
373 errorsDesc,
374 )
375 }
376
377 func (o *outageTracker) describeErrors() string {
378 ret := ""
379 for err, count := range o.errorCounts {
380 if ret != "" {
381 ret += ", "
382 }
383 times := "times"
384 if count == 1 {
385 times = "time"
386 }
387 ret += fmt.Sprintf("%s (%d %s)", err, count, times)
388 }
389 return ret
390 }
391
View as plain text