1 package datasource
2
3 import (
4 "errors"
5 "fmt"
6 "net/http"
7 "net/http/httptest"
8 "testing"
9 "time"
10
11 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
12
13 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
14 "github.com/launchdarkly/go-sdk-common/v3/ldlogtest"
15 "github.com/launchdarkly/go-sdk-common/v3/ldvalue"
16 ldevents "github.com/launchdarkly/go-sdk-events/v2"
17 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
18 "github.com/launchdarkly/go-server-sdk/v6/internal"
19 "github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
20 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
21 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
22 "github.com/launchdarkly/go-server-sdk/v6/testhelpers/ldservices"
23
24 "github.com/launchdarkly/eventsource"
25 th "github.com/launchdarkly/go-test-helpers/v3"
26 "github.com/launchdarkly/go-test-helpers/v3/httphelpers"
27
28 "github.com/stretchr/testify/assert"
29 )
30
31 const (
32 briefDelay = time.Millisecond * 50
33 streamProcessorTestHeaderName = "my-header"
34 streamProcessorTestHeaderValue = "my-value"
35 )
36
37 type streamingTestParams struct {
38 events chan<- eventsource.Event
39 updates *mocks.MockDataSourceUpdates
40 stream httphelpers.SSEStreamControl
41 requests <-chan httphelpers.HTTPRequestInfo
42 mockLog *ldlogtest.MockLog
43 }
44
45 func runStreamingTest(
46 t *testing.T,
47 initialData *ldservices.ServerSDKData,
48 test func(streamingTestParams),
49 ) {
50 runStreamingTestWithConfiguration(t, initialData, nil, test)
51 }
52
53 func runStreamingTestWithConfiguration(
54 t *testing.T,
55 initialData *ldservices.ServerSDKData,
56 configureUpdates func(*mocks.MockDataSourceUpdates),
57 test func(streamingTestParams),
58 ) {
59 events := make(chan eventsource.Event, 1000)
60 streamHandler, stream := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
61
62
63
64 extraStreamHandler, _ := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
65
66 handler, requestsCh := httphelpers.RecordingHandler(
67 httphelpers.SequentialHandler(streamHandler, extraStreamHandler),
68 )
69
70 headers := make(http.Header)
71 headers.Set(streamProcessorTestHeaderName, streamProcessorTestHeaderValue)
72 mockLog := ldlogtest.NewMockLog()
73 mockLog.Loggers.SetMinLevel(ldlog.Debug)
74 defer mockLog.DumpIfTestFailed(t)
75 context := sharedtest.NewTestContext("", &subsystems.HTTPConfiguration{DefaultHeaders: headers},
76 &subsystems.LoggingConfiguration{Loggers: mockLog.Loggers})
77
78 httphelpers.WithServer(handler, func(streamServer *httptest.Server) {
79 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
80 if configureUpdates != nil {
81 configureUpdates(dataSourceUpdates)
82 }
83
84 sp := NewStreamProcessor(
85 context,
86 dataSourceUpdates,
87 streamServer.URL,
88 briefDelay,
89 )
90 defer sp.Close()
91
92 closeWhenReady := make(chan struct{})
93
94 sp.Start(closeWhenReady)
95
96 if !th.AssertChannelClosed(t, closeWhenReady, time.Second, "timed out waiting for data source to start") {
97 return
98 }
99
100 params := streamingTestParams{events, dataSourceUpdates, stream, requestsCh, mockLog}
101 test(params)
102 })
103 })
104 }
105
106 func TestStreamProcessor(t *testing.T) {
107 t.Parallel()
108 initialData := ldservices.NewServerSDKData().
109 Flags(ldservices.FlagOrSegment("my-flag", 2)).
110 Segments(ldservices.FlagOrSegment("my-segment", 2))
111 timeout := 3 * time.Second
112
113 t.Run("configured headers are passed in request", func(t *testing.T) {
114 runStreamingTest(t, initialData, func(p streamingTestParams) {
115 r := <-p.requests
116 assert.Equal(t, streamProcessorTestHeaderValue, r.Request.Header.Get(streamProcessorTestHeaderName))
117 })
118 })
119
120 t.Run("initial put", func(t *testing.T) {
121 runStreamingTest(t, initialData, func(p streamingTestParams) {
122 p.updates.DataStore.WaitForInit(t, initialData, timeout)
123 })
124 })
125
126 t.Run("patch flag", func(t *testing.T) {
127 runStreamingTest(t, initialData, func(p streamingTestParams) {
128 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
129 Data: `{"path": "/flags/my-flag", "data": {"key": "my-flag", "version": 3}}`})
130
131 p.updates.DataStore.WaitForUpsert(t, datakinds.Features, "my-flag", 3, timeout)
132 })
133 })
134
135 t.Run("delete flag", func(t *testing.T) {
136 runStreamingTest(t, initialData, func(p streamingTestParams) {
137 p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
138 Data: `{"path": "/flags/my-flag", "version": 4}`})
139
140 p.updates.DataStore.WaitForDelete(t, datakinds.Features, "my-flag", 4, timeout)
141 })
142 })
143
144 t.Run("patch segment", func(t *testing.T) {
145 runStreamingTest(t, initialData, func(p streamingTestParams) {
146 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
147 Data: `{"path": "/segments/my-segment", "data": {"key": "my-segment", "version": 7}}`})
148
149 p.updates.DataStore.WaitForUpsert(t, datakinds.Segments, "my-segment", 7, timeout)
150 })
151 })
152
153 t.Run("delete segment", func(t *testing.T) {
154 runStreamingTest(t, initialData, func(p streamingTestParams) {
155 p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
156 Data: `{"path": "/segments/my-segment", "version": 8}`})
157
158 p.updates.DataStore.WaitForDelete(t, datakinds.Segments, "my-segment", 8, timeout)
159 })
160 })
161 }
162
163 func TestStreamProcessorRecoverableErrorsCauseStreamRestart(t *testing.T) {
164 t.Parallel()
165
166 expectRestart := func(t *testing.T, p streamingTestParams) {
167 <-p.requests
168 th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
169 p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid)
170 p.updates.RequireStatusOf(t, interfaces.DataSourceStateInterrupted)
171 p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid)
172 }
173
174 for _, status := range []int{400, 500} {
175 t.Run(fmt.Sprintf("HTTP status %d", status), func(t *testing.T) {
176 testStreamProcessorRecoverableHTTPError(t, status)
177 })
178 }
179
180 t.Run("dropped connection", func(t *testing.T) {
181 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
182 p.stream.EndAll()
183 <-time.After(300 * time.Millisecond)
184 expectRestart(t, p)
185 p.mockLog.AssertMessageMatch(t, true, ldlog.Warn, ".*Error in stream connection")
186 })
187 })
188
189 t.Run("put with malformed JSON", func(t *testing.T) {
190 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
191 p.stream.Send(httphelpers.SSEEvent{Event: putEvent, Data: `{"path": "/", "data": }"`})
192 expectRestart(t, p)
193 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
194 })
195 })
196
197 t.Run("put with well-formed JSON but malformed data model item", func(t *testing.T) {
198 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
199 p.stream.Send(httphelpers.SSEEvent{Event: putEvent,
200 Data: `{"path": "/", "data": {"flags": {"flagkey": {"key": [], "version": true}}, "segments": {}}}`})
201 expectRestart(t, p)
202 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
203 })
204 })
205
206 t.Run("patch with omitted path", func(t *testing.T) {
207 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
208 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
209 Data: `{"data": {"key": "flagkey"}}`})
210 expectRestart(t, p)
211 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*a required property \"path\" was missing.*will restart")
212 })
213 })
214
215 t.Run("patch with malformed JSON", func(t *testing.T) {
216 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
217 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent, Data: `{"path":"/flags/flagkey"`})
218 expectRestart(t, p)
219 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
220 })
221 })
222
223 t.Run("patch with well-formed JSON but malformed data model item", func(t *testing.T) {
224 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
225 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
226 Data: `{"path":"/flags/flagkey", "data": {"key": [], "version": true}}`})
227 expectRestart(t, p)
228 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
229 })
230 })
231
232 t.Run("delete with omitted path", func(t *testing.T) {
233 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
234 p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent, Data: `{"version": 8}`})
235 expectRestart(t, p)
236 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*a required property \"path\" was missing.*will restart")
237 })
238 })
239
240 t.Run("patch with malformed JSON", func(t *testing.T) {
241 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
242 p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent, Data: `{"path":"/flags/flagkey"`})
243 expectRestart(t, p)
244 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
245 })
246 })
247 }
248
249 func TestStreamProcessorUnrecoverableErrorsCauseStreamShutdown(t *testing.T) {
250 for _, status := range []int{401, 403} {
251 t.Run(fmt.Sprintf("HTTP status %d", status), func(t *testing.T) {
252 testStreamProcessorUnrecoverableHTTPError(t, status)
253 })
254 }
255 }
256
257 func TestStreamProcessorUnrecognizedDataIsIgnored(t *testing.T) {
258 t.Parallel()
259
260 expectNoRestart := func(t *testing.T, p streamingTestParams) {
261 <-p.requests
262
263 th.AssertNoMoreValues(t, p.requests, time.Millisecond*100, "stream restarted unexpectedly")
264
265 assert.Len(t, p.mockLog.GetOutput(ldlog.Error), 0)
266
267 p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid)
268 th.AssertNoMoreValues(t, p.updates.Statuses, time.Millisecond*100, "unexpected data source status change")
269 }
270
271 t.Run("patch with unrecognized path", func(t *testing.T) {
272 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
273 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
274 Data: `{"path": "/wrong", "data": {"key": "flagkey"}}`})
275 expectNoRestart(t, p)
276 })
277 })
278
279 t.Run("delete with unrecognized path", func(t *testing.T) {
280 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
281 p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
282 Data: `{"path": "/wrong", "version": 8}`})
283 expectNoRestart(t, p)
284 })
285 })
286
287 t.Run("unknown message type", func(t *testing.T) {
288 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
289 p.stream.Send(httphelpers.SSEEvent{Event: "weird-event", Data: `x`})
290 expectNoRestart(t, p)
291 })
292 })
293 }
294
295 func TestStreamProcessorStoreUpdateFailureWithStatusTracking(t *testing.T) {
296
297
298
299
300 fakeError := errors.New("sorry")
301
302 expectStoreFailureAndRecovery := func(t *testing.T, p streamingTestParams) {
303 <-p.requests
304
305 th.AssertNoMoreValues(t, p.requests, time.Millisecond*100, "stream restarted unexpectedly")
306
307 p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid)
308 p.mockLog.AssertMessageMatch(t, true, ldlog.Error,
309 ".*Failed to store.*will try again once data store is working")
310
311 p.updates.DataStore.SetFakeError(nil)
312 p.updates.UpdateStoreStatus(interfaces.DataStoreStatus{Available: true, NeedsRefresh: true})
313
314 th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
315
316 p.mockLog.AssertMessageMatch(t, true, ldlog.Warn, "Restarting stream.*after data store outage")
317 }
318
319 t.Run("Init fails on put", func(t *testing.T) {
320 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
321 p.updates.DataStore.SetFakeError(fakeError)
322
323 p.stream.Send(ldservices.NewServerSDKData().ToPutEvent())
324
325 expectStoreFailureAndRecovery(t, p)
326 })
327 })
328
329 t.Run("Upsert fails on patch", func(t *testing.T) {
330 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
331 p.updates.DataStore.SetFakeError(fakeError)
332
333 p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
334 Data: `{"path": "/flags/my-flag", "data": {"key": "my-flag", "version": 3}}`})
335
336 expectStoreFailureAndRecovery(t, p)
337 })
338 })
339
340 t.Run("Upsert fails on delete", func(t *testing.T) {
341 runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
342 p.updates.DataStore.SetFakeError(fakeError)
343
344 p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
345 Data: `{"path": "/flags/my-flag", "version": 4}`})
346
347 expectStoreFailureAndRecovery(t, p)
348 })
349 })
350 }
351
352 func TestStreamProcessorStoreUpdateFailureWithoutStatusTracking(t *testing.T) {
353
354
355
356
357
358 fakeError := errors.New("sorry")
359
360 initialData := ldservices.NewServerSDKData()
361 noStatusMonitoring := func(u *mocks.MockDataSourceUpdates) {
362 u.DataStore.SetStatusMonitoringEnabled(false)
363 }
364
365 runStreamingTestWithConfiguration(t, initialData, noStatusMonitoring, func(p streamingTestParams) {
366 <-p.requests
367
368 p.updates.DataStore.SetFakeError(fakeError)
369
370 p.stream.Send(initialData.ToPutEvent())
371
372 th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
373
374 p.mockLog.AssertMessageMatch(t, true, ldlog.Error, "Failed to store.*will restart stream")
375 })
376 }
377
378 func testStreamProcessorUnrecoverableHTTPError(t *testing.T, statusCode int) {
379 mockLog := ldlogtest.NewMockLog()
380 defer mockLog.DumpIfTestFailed(t)
381 httphelpers.WithServer(httphelpers.HandlerWithStatus(statusCode), func(ts *httptest.Server) {
382 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
383 id := ldevents.NewDiagnosticID(testSDKKey)
384 diagnosticsManager := ldevents.NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), time.Now(), nil)
385 context := &internal.ClientContextImpl{
386 BasicClientContext: subsystems.BasicClientContext{
387 SDKKey: testSDKKey,
388 Logging: subsystems.LoggingConfiguration{Loggers: mockLog.Loggers},
389 },
390 DiagnosticsManager: diagnosticsManager,
391 }
392
393 sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, time.Second)
394 defer sp.Close()
395
396 closeWhenReady := make(chan struct{})
397
398 sp.Start(closeWhenReady)
399
400 th.AssertChannelClosed(t, closeWhenReady, time.Second*3, "Initialization shouldn't block after this error")
401
402 event := diagnosticsManager.CreateStatsEventAndReset(0, 0, 0)
403 assert.Equal(t, 1, event.GetByKey("streamInits").Count())
404 assert.Equal(t, ldvalue.Bool(true), event.GetByKey("streamInits").GetByIndex(0).GetByKey("failed"))
405
406 status := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateOff)
407 assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, status.LastError.Kind)
408 assert.Equal(t, statusCode, status.LastError.StatusCode)
409 })
410 })
411 }
412
413 func testStreamProcessorRecoverableHTTPError(t *testing.T, statusCode int) {
414 initialData := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 2))
415 streamHandler, _ := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
416 sequentialHandler := httphelpers.SequentialHandler(
417 httphelpers.HandlerWithStatus(statusCode),
418 streamHandler,
419 )
420 mockLog := ldlogtest.NewMockLog()
421 defer mockLog.DumpIfTestFailed(t)
422 httphelpers.WithServer(sequentialHandler, func(ts *httptest.Server) {
423 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
424 id := ldevents.NewDiagnosticID(testSDKKey)
425 diagnosticsManager := ldevents.NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), time.Now(), nil)
426 context := &internal.ClientContextImpl{
427 BasicClientContext: subsystems.BasicClientContext{
428 SDKKey: testSDKKey,
429 Logging: subsystems.LoggingConfiguration{Loggers: mockLog.Loggers},
430 },
431 DiagnosticsManager: diagnosticsManager,
432 }
433
434 sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, briefDelay)
435 defer sp.Close()
436
437 closeWhenReady := make(chan struct{})
438 sp.Start(closeWhenReady)
439
440 th.AssertChannelClosed(t, closeWhenReady, time.Second*3, "Should have successfully retried before now")
441
442 event := diagnosticsManager.CreateStatsEventAndReset(0, 0, 0)
443 assert.Equal(t, 2, event.GetByKey("streamInits").Count())
444 assert.Equal(t, ldvalue.Bool(true), event.GetByKey("streamInits").GetByIndex(0).GetByKey("failed"))
445 assert.Equal(t, ldvalue.Bool(false), event.GetByKey("streamInits").GetByIndex(1).GetByKey("failed"))
446
447
448
449
450 status1 := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateInterrupted)
451 assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, status1.LastError.Kind)
452 assert.Equal(t, statusCode, status1.LastError.StatusCode)
453 _ = dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateValid)
454 })
455 })
456 }
457
458 func TestStreamProcessorUsesHTTPClientFactory(t *testing.T) {
459 handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(401))
460
461 httphelpers.WithServer(handler, func(ts *httptest.Server) {
462 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
463 httpClientFactory := urlAppendingHTTPClientFactory("/transformed")
464 httpConfig := subsystems.HTTPConfiguration{CreateHTTPClient: httpClientFactory}
465 context := sharedtest.NewTestContext(testSDKKey, &httpConfig, nil)
466
467 sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, briefDelay)
468 defer sp.Close()
469 closeWhenReady := make(chan struct{})
470 sp.Start(closeWhenReady)
471
472 r := <-requestsCh
473
474 assert.Equal(t, "/all/transformed", r.Request.URL.Path)
475 })
476 })
477 }
478
479 func TestStreamProcessorDoesNotUseConfiguredTimeoutAsReadTimeout(t *testing.T) {
480 streamHandler, _ := ldservices.ServerSideStreamingServiceHandler(ldservices.NewServerSDKData().ToPutEvent())
481 handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
482
483 httphelpers.WithServer(handler, func(ts *httptest.Server) {
484 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
485 httpClientFactory := func() *http.Client {
486 c := *http.DefaultClient
487 c.Timeout = 200 * time.Millisecond
488 return &c
489 }
490 httpConfig := subsystems.HTTPConfiguration{CreateHTTPClient: httpClientFactory}
491 context := sharedtest.NewTestContext(testSDKKey, &httpConfig, nil)
492
493 sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, briefDelay)
494 defer sp.Close()
495 closeWhenReady := make(chan struct{})
496 sp.Start(closeWhenReady)
497
498 <-time.After(500 * time.Millisecond)
499 assert.Equal(t, 1, len(requestsCh))
500 })
501 })
502 }
503
504 func TestStreamProcessorRestartsStreamIfStoreNeedsRefresh(t *testing.T) {
505 initialData := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 1))
506 updatedData := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 2))
507 streamHandler1, _ := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
508 streamHandler2, _ := ldservices.ServerSideStreamingServiceHandler(updatedData.ToPutEvent())
509 streamHandler := httphelpers.SequentialHandler(streamHandler1, streamHandler2)
510
511 httphelpers.WithServer(streamHandler, func(ts *httptest.Server) {
512 withMockDataSourceUpdates(func(updates *mocks.MockDataSourceUpdates) {
513 sp := NewStreamProcessor(basicClientContext(), updates, ts.URL, briefDelay)
514 defer sp.Close()
515
516 closeWhenReady := make(chan struct{})
517 sp.Start(closeWhenReady)
518
519
520 updates.DataStore.WaitForInit(t, initialData, 3*time.Second)
521
522
523 updates.UpdateStoreStatus(interfaces.DataStoreStatus{Available: false})
524 updates.UpdateStoreStatus(interfaces.DataStoreStatus{Available: true, NeedsRefresh: true})
525
526
527 updates.DataStore.WaitForInit(t, updatedData, 3*time.Second)
528 })
529 })
530 }
531
532 func TestMalformedStreamBaseURI(t *testing.T) {
533 mockLog := ldlogtest.NewMockLog()
534 defer mockLog.DumpIfTestFailed(t)
535 clientContext := &internal.ClientContextImpl{
536 BasicClientContext: subsystems.BasicClientContext{
537 SDKKey: testSDKKey,
538 Logging: subsystems.LoggingConfiguration{Loggers: mockLog.Loggers},
539 },
540 }
541 withMockDataSourceUpdates(func(updates *mocks.MockDataSourceUpdates) {
542 sp := NewStreamProcessor(clientContext, updates, ":/", briefDelay)
543 defer sp.Close()
544
545 closeWhenReady := make(chan struct{})
546 sp.Start(closeWhenReady)
547
548 status := updates.RequireStatusOf(t, interfaces.DataSourceStateOff)
549 assert.Equal(t, interfaces.DataSourceErrorKindUnknown, status.LastError.Kind)
550 <-closeWhenReady
551
552 mockLog.AssertMessageMatch(t, true, ldlog.Error, "Unable to create a stream request")
553 })
554 }
555
View as plain text