1 package datasource
2
3 import (
4 "errors"
5 "fmt"
6 "strings"
7 "testing"
8 "time"
9
10 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
11
12 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
13 "github.com/launchdarkly/go-sdk-common/v3/ldlogtest"
14 "github.com/launchdarkly/go-server-sdk-evaluation/v2/ldbuilders"
15
16 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
17 intf "github.com/launchdarkly/go-server-sdk/v6/interfaces"
18 "github.com/launchdarkly/go-server-sdk/v6/internal"
19 "github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
20 "github.com/launchdarkly/go-server-sdk/v6/internal/datastore"
21 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
22 st "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
23
24 th "github.com/launchdarkly/go-test-helpers/v3"
25
26 "github.com/stretchr/testify/assert"
27 "github.com/stretchr/testify/require"
28 )
29
30 const testDataSourceOutageTimeout = 200 * time.Millisecond
31
32 type dataSourceUpdateSinkImplTestParams struct {
33 store *mocks.CapturingDataStore
34 dataStoreStatusProvider intf.DataStoreStatusProvider
35 dataSourceUpdates *DataSourceUpdateSinkImpl
36 flagChangeBroadcaster *internal.Broadcaster[interfaces.FlagChangeEvent]
37 mockLoggers *ldlogtest.MockLog
38 }
39
40 func dataSourceUpdateSinkImplTest(action func(dataSourceUpdateSinkImplTestParams)) {
41 p := dataSourceUpdateSinkImplTestParams{}
42 p.mockLoggers = ldlogtest.NewMockLog()
43 p.store = mocks.NewCapturingDataStore(datastore.NewInMemoryDataStore(p.mockLoggers.Loggers))
44 dataStoreUpdates := datastore.NewDataStoreUpdateSinkImpl(nil)
45 p.dataStoreStatusProvider = datastore.NewDataStoreStatusProviderImpl(p.store, dataStoreUpdates)
46 dataSourceStatusBroadcaster := internal.NewBroadcaster[interfaces.DataSourceStatus]()
47 defer dataSourceStatusBroadcaster.Close()
48 p.flagChangeBroadcaster = internal.NewBroadcaster[interfaces.FlagChangeEvent]()
49 defer p.flagChangeBroadcaster.Close()
50 p.dataSourceUpdates = NewDataSourceUpdateSinkImpl(
51 p.store,
52 p.dataStoreStatusProvider,
53 dataSourceStatusBroadcaster,
54 p.flagChangeBroadcaster,
55 testDataSourceOutageTimeout,
56 p.mockLoggers.Loggers,
57 )
58
59 action(p)
60 }
61
62 func TestDataSourceUpdateSinkImpl(t *testing.T) {
63 storeError := errors.New("sorry")
64 expectedStoreErrorMessage := "Unexpected data store error when trying to store an update received from the data source: sorry"
65
66 t.Run("Init", func(t *testing.T) {
67 t.Run("passes data to store", func(t *testing.T) {
68 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
69 inputData := sharedtest.NewDataSetBuilder().Flags(ldbuilders.NewFlagBuilder("a").Build())
70
71 result := p.dataSourceUpdates.Init(inputData.Build())
72 assert.True(t, result)
73
74 p.store.WaitForInit(t, inputData.ToServerSDKData(), time.Second)
75 })
76 })
77
78 t.Run("detects error from store", func(t *testing.T) {
79 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
80 p.store.SetFakeError(storeError)
81
82 result := p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build())
83 assert.False(t, result)
84 assert.Equal(t, intf.DataSourceErrorKindStoreError, p.dataSourceUpdates.GetLastStatus().LastError.Kind)
85
86 log1 := p.mockLoggers.GetOutput(ldlog.Warn)
87 assert.Equal(t, []string{expectedStoreErrorMessage}, log1)
88
89
90 assert.False(t, p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build()))
91 log2 := p.mockLoggers.GetOutput(ldlog.Warn)
92 assert.Equal(t, log1, log2)
93
94
95 p.store.SetFakeError(nil)
96 assert.True(t, p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build()))
97 p.store.SetFakeError(storeError)
98 assert.False(t, p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build()))
99 log3 := p.mockLoggers.GetOutput(ldlog.Warn)
100 assert.Equal(t, []string{expectedStoreErrorMessage, expectedStoreErrorMessage}, log3)
101 })
102 })
103
104 t.Run("sorts the data set", testDataSourceUpdatesImplSortsInitData)
105 })
106
107 t.Run("Upsert", func(t *testing.T) {
108 t.Run("passes data to store", func(t *testing.T) {
109 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
110 flag := ldbuilders.NewFlagBuilder("key").Version(1).Build()
111 itemDesc := st.ItemDescriptor{Version: 1, Item: &flag}
112 result := p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc)
113 assert.True(t, result)
114
115 p.store.WaitForUpsert(t, datakinds.Features, flag.Key, itemDesc.Version, time.Second)
116 })
117 })
118
119 t.Run("detects error from store", func(t *testing.T) {
120 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
121 p.store.SetFakeError(storeError)
122
123 flag := ldbuilders.NewFlagBuilder("key").Version(1).Build()
124 itemDesc := st.ItemDescriptor{Version: 1, Item: &flag}
125 result := p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc)
126 assert.False(t, result)
127 assert.Equal(t, intf.DataSourceErrorKindStoreError, p.dataSourceUpdates.GetLastStatus().LastError.Kind)
128
129 log1 := p.mockLoggers.GetOutput(ldlog.Warn)
130 assert.Equal(t, []string{expectedStoreErrorMessage}, log1)
131
132
133 assert.False(t, p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc))
134 log2 := p.mockLoggers.GetOutput(ldlog.Warn)
135 assert.Equal(t, log1, log2)
136
137
138 p.store.SetFakeError(nil)
139 assert.True(t, p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc))
140 p.store.SetFakeError(storeError)
141 assert.False(t, p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc))
142 log3 := p.mockLoggers.GetOutput(ldlog.Warn)
143 assert.Equal(t, []string{expectedStoreErrorMessage, expectedStoreErrorMessage}, log3)
144 })
145 })
146 })
147
148 t.Run("UpdateStatus", func(t *testing.T) {
149
150
151 t.Run("does not update status if state is the same and errorInfo is empty", func(t *testing.T) {
152 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
153 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
154 status1 := p.dataSourceUpdates.currentStatus
155 <-time.After(time.Millisecond)
156
157 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
158 status2 := p.dataSourceUpdates.currentStatus
159 assert.Equal(t, status1, status2)
160 })
161 })
162
163 t.Run("does not update status if new state is empty", func(t *testing.T) {
164 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
165 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
166 status1 := p.dataSourceUpdates.currentStatus
167
168 p.dataSourceUpdates.UpdateStatus("", intf.DataSourceErrorInfo{})
169 status2 := p.dataSourceUpdates.currentStatus
170 assert.Equal(t, status1, status2)
171 })
172 })
173
174 t.Run("updates status if state is the same and errorInfo is not empty", func(t *testing.T) {
175 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
176 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
177 status1 := p.dataSourceUpdates.currentStatus
178 <-time.After(time.Millisecond)
179
180 errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
181 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, errorInfo)
182 status2 := p.dataSourceUpdates.currentStatus
183 assert.NotEqual(t, status1, status2)
184 assert.Equal(t, status1.State, status2.State)
185 assert.Equal(t, errorInfo, status2.LastError)
186 })
187 })
188
189 t.Run("updates status if state is not the same", func(t *testing.T) {
190 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
191 errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
192 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, errorInfo)
193
194 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, intf.DataSourceErrorInfo{})
195 status := p.dataSourceUpdates.currentStatus
196 assert.Equal(t, intf.DataSourceStateInterrupted, status.State)
197 assert.Equal(t, errorInfo, status.LastError)
198 })
199 })
200
201 t.Run("Initialized is used instead of Interrupted during startup", func(t *testing.T) {
202 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
203 errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
204 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo)
205 status1 := p.dataSourceUpdates.currentStatus
206 assert.Equal(t, intf.DataSourceStateInitializing, status1.State)
207 assert.Equal(t, errorInfo, status1.LastError)
208
209 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
210 status2 := p.dataSourceUpdates.currentStatus
211 assert.Equal(t, intf.DataSourceStateValid, status2.State)
212
213 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, intf.DataSourceErrorInfo{})
214 status3 := p.dataSourceUpdates.currentStatus
215 assert.Equal(t, intf.DataSourceStateInterrupted, status3.State)
216 assert.Equal(t, errorInfo, status3.LastError)
217 })
218 })
219
220 t.Run("can log outage at Error level after timeout", TestDataSourceOutageLoggingTimeout)
221 })
222
223 t.Run("GetDataStoreStatusProvider", func(t *testing.T) {
224 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
225 assert.Equal(t, p.dataStoreStatusProvider, p.dataSourceUpdates.GetDataStoreStatusProvider())
226 })
227 })
228 }
229
230 func testDataSourceUpdatesImplSortsInitData(t *testing.T) {
231
232
233 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
234 inputData := makeDependencyOrderingDataSourceTestData()
235
236 result := p.dataSourceUpdates.Init(inputData)
237 require.True(t, result)
238
239 receivedData := p.store.WaitForNextInit(t, time.Second)
240
241 verifySortedData(t, receivedData, inputData)
242 })
243 }
244
245 func TestDataSourceUpdatesImplFlagChangeEvents(t *testing.T) {
246 t.Run("sends events on init for newly added flags", func(t *testing.T) {
247 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
248 builder := sharedtest.NewDataSetBuilder().
249 Flags(ldbuilders.NewFlagBuilder("flag1").Version(1).Build()).
250 Segments(ldbuilders.NewSegmentBuilder("segment1").Version(1).Build())
251
252 p.dataSourceUpdates.Init(builder.Build())
253
254 ch := p.flagChangeBroadcaster.AddListener()
255
256 builder.Flags(ldbuilders.NewFlagBuilder("flag2").Version(1).Build()).
257 Segments(ldbuilders.NewSegmentBuilder("segment2").Version(1).Build())
258
259
260 p.dataSourceUpdates.Init(builder.Build())
261
262 sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
263 })
264 })
265
266 t.Run("sends event on update for newly added flag", func(t *testing.T) {
267 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
268 builder := sharedtest.NewDataSetBuilder().
269 Flags(ldbuilders.NewFlagBuilder("flag1").Version(1).Build()).
270 Segments(ldbuilders.NewSegmentBuilder("segment1").Version(1).Build())
271
272 p.dataSourceUpdates.Init(builder.Build())
273
274 ch := p.flagChangeBroadcaster.AddListener()
275
276 flag2 := ldbuilders.NewFlagBuilder("flag2").Version(1).Build()
277 p.dataSourceUpdates.Upsert(datakinds.Features, flag2.Key, st.ItemDescriptor{Version: flag2.Version, Item: &flag2})
278
279 sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
280 })
281 })
282
283 t.Run("sends events on init for updated flags", func(t *testing.T) {
284 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
285 builder := sharedtest.NewDataSetBuilder().
286 Flags(
287 ldbuilders.NewFlagBuilder("flag1").Version(1).Build(),
288 ldbuilders.NewFlagBuilder("flag2").Version(1).Build(),
289 ).
290 Segments(
291 ldbuilders.NewSegmentBuilder("segment1").Version(1).Build(),
292 ldbuilders.NewSegmentBuilder("segment2").Version(1).Build(),
293 )
294
295 p.dataSourceUpdates.Init(builder.Build())
296
297 ch := p.flagChangeBroadcaster.AddListener()
298
299 builder.Flags(
300 ldbuilders.NewFlagBuilder("flag2").Version(2).Build(),
301 ).Segments(
302 ldbuilders.NewSegmentBuilder("segment2").Version(2).Build(),
303 )
304
305 p.dataSourceUpdates.Init(builder.Build())
306
307 sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
308 })
309 })
310
311 t.Run("sends event on update for updated flag", func(t *testing.T) {
312 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
313 builder := sharedtest.NewDataSetBuilder().
314 Flags(
315 ldbuilders.NewFlagBuilder("flag1").Version(1).Build(),
316 ldbuilders.NewFlagBuilder("flag2").Version(1).Build(),
317 ).
318 Segments(ldbuilders.NewSegmentBuilder("segment1").Version(1).Build())
319
320 p.dataSourceUpdates.Init(builder.Build())
321
322 ch := p.flagChangeBroadcaster.AddListener()
323
324 flag2 := ldbuilders.NewFlagBuilder("flag2").Version(2).Build()
325 p.dataSourceUpdates.Upsert(datakinds.Features, flag2.Key, st.ItemDescriptor{Version: flag2.Version, Item: &flag2})
326
327 sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
328 })
329 })
330
331 t.Run("does not send event on update if item was not really updated", func(t *testing.T) {
332 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
333 builder := sharedtest.NewDataSetBuilder().
334 Flags(
335 ldbuilders.NewFlagBuilder("flag1").Version(1).Build(),
336 ldbuilders.NewFlagBuilder("flag2").Version(1).Build(),
337 )
338
339 p.dataSourceUpdates.Init(builder.Build())
340
341 ch := p.flagChangeBroadcaster.AddListener()
342
343 flag2 := ldbuilders.NewFlagBuilder("flag2").Version(1).Build()
344 p.dataSourceUpdates.Upsert(datakinds.Features, flag2.Key, st.ItemDescriptor{Version: flag2.Version, Item: &flag2})
345
346 th.AssertNoMoreValues(t, ch, time.Millisecond*100)
347 })
348 })
349 }
350
351 func TestDataSourceOutageLoggingTimeout(t *testing.T) {
352 t.Run("does not log error if data source recovers before timeout", func(t *testing.T) {
353 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
354 errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
355 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo)
356 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
357
358 <-time.After(testDataSourceOutageTimeout)
359
360 assert.Len(t, p.mockLoggers.GetOutput(ldlog.Error), 0)
361 })
362 })
363
364 t.Run("logs error if data source does not recover before timeout", func(t *testing.T) {
365 dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
366
367 errorInfo1 := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown, Time: time.Now()}
368 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo1)
369 errorInfo2 := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindErrorResponse, StatusCode: 500, Time: time.Now()}
370 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo2)
371
372 <-time.After(testDataSourceOutageTimeout + (100 * time.Millisecond))
373
374 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
375
376 <-time.After(testDataSourceOutageTimeout)
377
378 require.Len(t, p.mockLoggers.GetOutput(ldlog.Error), 1)
379 message := p.mockLoggers.GetOutput(ldlog.Error)[0]
380 assert.True(t, strings.HasPrefix(
381 message,
382 fmt.Sprintf(
383 "LaunchDarkly data source outage - updates have been unavailable for at least %s with the following errors:",
384 testDataSourceOutageTimeout,
385 )))
386 assert.Contains(t, message, "UNKNOWN (1 time)")
387 assert.Contains(t, message, "ERROR_RESPONSE(500) (1 time)")
388 })
389 })
390 }
391
View as plain text