...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datasource/streaming_data_source_test.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datasource

     1  package datasource
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net/http"
     7  	"net/http/httptest"
     8  	"testing"
     9  	"time"
    10  
    11  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
    12  
    13  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    14  	"github.com/launchdarkly/go-sdk-common/v3/ldlogtest"
    15  	"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
    16  	ldevents "github.com/launchdarkly/go-sdk-events/v2"
    17  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
    18  	"github.com/launchdarkly/go-server-sdk/v6/internal"
    19  	"github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
    20  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
    21  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    22  	"github.com/launchdarkly/go-server-sdk/v6/testhelpers/ldservices"
    23  
    24  	"github.com/launchdarkly/eventsource"
    25  	th "github.com/launchdarkly/go-test-helpers/v3"
    26  	"github.com/launchdarkly/go-test-helpers/v3/httphelpers"
    27  
    28  	"github.com/stretchr/testify/assert"
    29  )
    30  
    31  const (
    32  	briefDelay                     = time.Millisecond * 50
    33  	streamProcessorTestHeaderName  = "my-header"
    34  	streamProcessorTestHeaderValue = "my-value"
    35  )
    36  
    37  type streamingTestParams struct {
    38  	events   chan<- eventsource.Event
    39  	updates  *mocks.MockDataSourceUpdates
    40  	stream   httphelpers.SSEStreamControl
    41  	requests <-chan httphelpers.HTTPRequestInfo
    42  	mockLog  *ldlogtest.MockLog
    43  }
    44  
    45  func runStreamingTest(
    46  	t *testing.T,
    47  	initialData *ldservices.ServerSDKData,
    48  	test func(streamingTestParams),
    49  ) {
    50  	runStreamingTestWithConfiguration(t, initialData, nil, test)
    51  }
    52  
    53  func runStreamingTestWithConfiguration(
    54  	t *testing.T,
    55  	initialData *ldservices.ServerSDKData,
    56  	configureUpdates func(*mocks.MockDataSourceUpdates),
    57  	test func(streamingTestParams),
    58  ) {
    59  	events := make(chan eventsource.Event, 1000)
    60  	streamHandler, stream := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
    61  
    62  	// We provide a second stream handler so that if the first stream gets explicitly closed by a test,
    63  	// we'll be able to able to reconnect (a closed stream handler can't be reused)
    64  	extraStreamHandler, _ := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
    65  
    66  	handler, requestsCh := httphelpers.RecordingHandler(
    67  		httphelpers.SequentialHandler(streamHandler, extraStreamHandler),
    68  	)
    69  
    70  	headers := make(http.Header)
    71  	headers.Set(streamProcessorTestHeaderName, streamProcessorTestHeaderValue)
    72  	mockLog := ldlogtest.NewMockLog()
    73  	mockLog.Loggers.SetMinLevel(ldlog.Debug)
    74  	defer mockLog.DumpIfTestFailed(t)
    75  	context := sharedtest.NewTestContext("", &subsystems.HTTPConfiguration{DefaultHeaders: headers},
    76  		&subsystems.LoggingConfiguration{Loggers: mockLog.Loggers})
    77  
    78  	httphelpers.WithServer(handler, func(streamServer *httptest.Server) {
    79  		withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
    80  			if configureUpdates != nil {
    81  				configureUpdates(dataSourceUpdates)
    82  			}
    83  
    84  			sp := NewStreamProcessor(
    85  				context,
    86  				dataSourceUpdates,
    87  				streamServer.URL,
    88  				briefDelay,
    89  			)
    90  			defer sp.Close()
    91  
    92  			closeWhenReady := make(chan struct{})
    93  
    94  			sp.Start(closeWhenReady)
    95  
    96  			if !th.AssertChannelClosed(t, closeWhenReady, time.Second, "timed out waiting for data source to start") {
    97  				return
    98  			}
    99  
   100  			params := streamingTestParams{events, dataSourceUpdates, stream, requestsCh, mockLog}
   101  			test(params)
   102  		})
   103  	})
   104  }
   105  
   106  func TestStreamProcessor(t *testing.T) {
   107  	t.Parallel()
   108  	initialData := ldservices.NewServerSDKData().
   109  		Flags(ldservices.FlagOrSegment("my-flag", 2)).
   110  		Segments(ldservices.FlagOrSegment("my-segment", 2))
   111  	timeout := 3 * time.Second
   112  
   113  	t.Run("configured headers are passed in request", func(t *testing.T) {
   114  		runStreamingTest(t, initialData, func(p streamingTestParams) {
   115  			r := <-p.requests
   116  			assert.Equal(t, streamProcessorTestHeaderValue, r.Request.Header.Get(streamProcessorTestHeaderName))
   117  		})
   118  	})
   119  
   120  	t.Run("initial put", func(t *testing.T) {
   121  		runStreamingTest(t, initialData, func(p streamingTestParams) {
   122  			p.updates.DataStore.WaitForInit(t, initialData, timeout)
   123  		})
   124  	})
   125  
   126  	t.Run("patch flag", func(t *testing.T) {
   127  		runStreamingTest(t, initialData, func(p streamingTestParams) {
   128  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
   129  				Data: `{"path": "/flags/my-flag", "data": {"key": "my-flag", "version": 3}}`})
   130  
   131  			p.updates.DataStore.WaitForUpsert(t, datakinds.Features, "my-flag", 3, timeout)
   132  		})
   133  	})
   134  
   135  	t.Run("delete flag", func(t *testing.T) {
   136  		runStreamingTest(t, initialData, func(p streamingTestParams) {
   137  			p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
   138  				Data: `{"path": "/flags/my-flag", "version": 4}`})
   139  
   140  			p.updates.DataStore.WaitForDelete(t, datakinds.Features, "my-flag", 4, timeout)
   141  		})
   142  	})
   143  
   144  	t.Run("patch segment", func(t *testing.T) {
   145  		runStreamingTest(t, initialData, func(p streamingTestParams) {
   146  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
   147  				Data: `{"path": "/segments/my-segment", "data": {"key": "my-segment", "version": 7}}`})
   148  
   149  			p.updates.DataStore.WaitForUpsert(t, datakinds.Segments, "my-segment", 7, timeout)
   150  		})
   151  	})
   152  
   153  	t.Run("delete segment", func(t *testing.T) {
   154  		runStreamingTest(t, initialData, func(p streamingTestParams) {
   155  			p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
   156  				Data: `{"path": "/segments/my-segment", "version": 8}`})
   157  
   158  			p.updates.DataStore.WaitForDelete(t, datakinds.Segments, "my-segment", 8, timeout)
   159  		})
   160  	})
   161  }
   162  
   163  func TestStreamProcessorRecoverableErrorsCauseStreamRestart(t *testing.T) {
   164  	t.Parallel()
   165  
   166  	expectRestart := func(t *testing.T, p streamingTestParams) {
   167  		<-p.requests // ignore initial HTTP request
   168  		th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
   169  		p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid)       // the initial connection
   170  		p.updates.RequireStatusOf(t, interfaces.DataSourceStateInterrupted) // the error
   171  		p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid)       // the restarted connection
   172  	}
   173  
   174  	for _, status := range []int{400, 500} {
   175  		t.Run(fmt.Sprintf("HTTP status %d", status), func(t *testing.T) {
   176  			testStreamProcessorRecoverableHTTPError(t, status)
   177  		})
   178  	}
   179  
   180  	t.Run("dropped connection", func(t *testing.T) {
   181  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   182  			p.stream.EndAll()
   183  			<-time.After(300 * time.Millisecond)
   184  			expectRestart(t, p)
   185  			p.mockLog.AssertMessageMatch(t, true, ldlog.Warn, ".*Error in stream connection")
   186  		})
   187  	})
   188  
   189  	t.Run("put with malformed JSON", func(t *testing.T) {
   190  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   191  			p.stream.Send(httphelpers.SSEEvent{Event: putEvent, Data: `{"path": "/", "data": }"`})
   192  			expectRestart(t, p)
   193  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
   194  		})
   195  	})
   196  
   197  	t.Run("put with well-formed JSON but malformed data model item", func(t *testing.T) {
   198  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   199  			p.stream.Send(httphelpers.SSEEvent{Event: putEvent,
   200  				Data: `{"path": "/", "data": {"flags": {"flagkey": {"key": [], "version": true}}, "segments": {}}}`})
   201  			expectRestart(t, p)
   202  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
   203  		})
   204  	})
   205  
   206  	t.Run("patch with omitted path", func(t *testing.T) {
   207  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   208  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
   209  				Data: `{"data": {"key": "flagkey"}}`})
   210  			expectRestart(t, p)
   211  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*a required property \"path\" was missing.*will restart")
   212  		})
   213  	})
   214  
   215  	t.Run("patch with malformed JSON", func(t *testing.T) {
   216  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   217  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent, Data: `{"path":"/flags/flagkey"`})
   218  			expectRestart(t, p)
   219  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
   220  		})
   221  	})
   222  
   223  	t.Run("patch with well-formed JSON but malformed data model item", func(t *testing.T) {
   224  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   225  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
   226  				Data: `{"path":"/flags/flagkey", "data": {"key": [], "version": true}}`})
   227  			expectRestart(t, p)
   228  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
   229  		})
   230  	})
   231  
   232  	t.Run("delete with omitted path", func(t *testing.T) {
   233  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   234  			p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent, Data: `{"version": 8}`})
   235  			expectRestart(t, p)
   236  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*a required property \"path\" was missing.*will restart")
   237  		})
   238  	})
   239  
   240  	t.Run("patch with malformed JSON", func(t *testing.T) {
   241  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   242  			p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent, Data: `{"path":"/flags/flagkey"`})
   243  			expectRestart(t, p)
   244  			p.mockLog.AssertMessageMatch(t, true, ldlog.Error, ".*malformed JSON data.*will restart")
   245  		})
   246  	})
   247  }
   248  
   249  func TestStreamProcessorUnrecoverableErrorsCauseStreamShutdown(t *testing.T) {
   250  	for _, status := range []int{401, 403} {
   251  		t.Run(fmt.Sprintf("HTTP status %d", status), func(t *testing.T) {
   252  			testStreamProcessorUnrecoverableHTTPError(t, status)
   253  		})
   254  	}
   255  }
   256  
   257  func TestStreamProcessorUnrecognizedDataIsIgnored(t *testing.T) {
   258  	t.Parallel()
   259  
   260  	expectNoRestart := func(t *testing.T, p streamingTestParams) {
   261  		<-p.requests // ignore initial HTTP request
   262  
   263  		th.AssertNoMoreValues(t, p.requests, time.Millisecond*100, "stream restarted unexpectedly")
   264  
   265  		assert.Len(t, p.mockLog.GetOutput(ldlog.Error), 0)
   266  
   267  		p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid) // the initial connection
   268  		th.AssertNoMoreValues(t, p.updates.Statuses, time.Millisecond*100, "unexpected data source status change")
   269  	}
   270  
   271  	t.Run("patch with unrecognized path", func(t *testing.T) {
   272  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   273  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
   274  				Data: `{"path": "/wrong", "data": {"key": "flagkey"}}`})
   275  			expectNoRestart(t, p)
   276  		})
   277  	})
   278  
   279  	t.Run("delete with unrecognized path", func(t *testing.T) {
   280  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   281  			p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
   282  				Data: `{"path": "/wrong", "version": 8}`})
   283  			expectNoRestart(t, p)
   284  		})
   285  	})
   286  
   287  	t.Run("unknown message type", func(t *testing.T) {
   288  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   289  			p.stream.Send(httphelpers.SSEEvent{Event: "weird-event", Data: `x`})
   290  			expectNoRestart(t, p)
   291  		})
   292  	})
   293  }
   294  
   295  func TestStreamProcessorStoreUpdateFailureWithStatusTracking(t *testing.T) {
   296  	// Normally, a data store can only fail if it is a persistent store that uses the standard
   297  	// PersistentDataStore framework, in which case store status tracking is available and the
   298  	// stream will only restart after a store failure if the store tells it to.
   299  
   300  	fakeError := errors.New("sorry")
   301  
   302  	expectStoreFailureAndRecovery := func(t *testing.T, p streamingTestParams) {
   303  		<-p.requests // ignore initial HTTP request
   304  
   305  		th.AssertNoMoreValues(t, p.requests, time.Millisecond*100, "stream restarted unexpectedly")
   306  
   307  		p.updates.RequireStatusOf(t, interfaces.DataSourceStateValid) // the initial connection
   308  		p.mockLog.AssertMessageMatch(t, true, ldlog.Error,
   309  			".*Failed to store.*will try again once data store is working")
   310  
   311  		p.updates.DataStore.SetFakeError(nil)
   312  		p.updates.UpdateStoreStatus(interfaces.DataStoreStatus{Available: true, NeedsRefresh: true})
   313  
   314  		th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
   315  
   316  		p.mockLog.AssertMessageMatch(t, true, ldlog.Warn, "Restarting stream.*after data store outage")
   317  	}
   318  
   319  	t.Run("Init fails on put", func(t *testing.T) {
   320  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   321  			p.updates.DataStore.SetFakeError(fakeError)
   322  
   323  			p.stream.Send(ldservices.NewServerSDKData().ToPutEvent())
   324  
   325  			expectStoreFailureAndRecovery(t, p)
   326  		})
   327  	})
   328  
   329  	t.Run("Upsert fails on patch", func(t *testing.T) {
   330  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   331  			p.updates.DataStore.SetFakeError(fakeError)
   332  
   333  			p.stream.Send(httphelpers.SSEEvent{Event: patchEvent,
   334  				Data: `{"path": "/flags/my-flag", "data": {"key": "my-flag", "version": 3}}`})
   335  
   336  			expectStoreFailureAndRecovery(t, p)
   337  		})
   338  	})
   339  
   340  	t.Run("Upsert fails on delete", func(t *testing.T) {
   341  		runStreamingTest(t, ldservices.NewServerSDKData(), func(p streamingTestParams) {
   342  			p.updates.DataStore.SetFakeError(fakeError)
   343  
   344  			p.stream.Send(httphelpers.SSEEvent{Event: deleteEvent,
   345  				Data: `{"path": "/flags/my-flag", "version": 4}`})
   346  
   347  			expectStoreFailureAndRecovery(t, p)
   348  		})
   349  	})
   350  }
   351  
   352  func TestStreamProcessorStoreUpdateFailureWithoutStatusTracking(t *testing.T) {
   353  	// In the unusual case where a store update fails but the store does not support status tracking
   354  	// (like if it's some custom implementation), the store should restart immediately after the failure.
   355  	// We're only testing this case with a single kind of event because it doesn't really matter which
   356  	// kind of operation failed in this case.
   357  
   358  	fakeError := errors.New("sorry")
   359  
   360  	initialData := ldservices.NewServerSDKData()
   361  	noStatusMonitoring := func(u *mocks.MockDataSourceUpdates) {
   362  		u.DataStore.SetStatusMonitoringEnabled(false)
   363  	}
   364  
   365  	runStreamingTestWithConfiguration(t, initialData, noStatusMonitoring, func(p streamingTestParams) {
   366  		<-p.requests // ignore initial HTTP request
   367  
   368  		p.updates.DataStore.SetFakeError(fakeError)
   369  
   370  		p.stream.Send(initialData.ToPutEvent())
   371  
   372  		th.RequireValue(t, p.requests, time.Millisecond*300, "expected stream restart, did not see one")
   373  
   374  		p.mockLog.AssertMessageMatch(t, true, ldlog.Error, "Failed to store.*will restart stream")
   375  	})
   376  }
   377  
   378  func testStreamProcessorUnrecoverableHTTPError(t *testing.T, statusCode int) {
   379  	mockLog := ldlogtest.NewMockLog()
   380  	defer mockLog.DumpIfTestFailed(t)
   381  	httphelpers.WithServer(httphelpers.HandlerWithStatus(statusCode), func(ts *httptest.Server) {
   382  		withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   383  			id := ldevents.NewDiagnosticID(testSDKKey)
   384  			diagnosticsManager := ldevents.NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), time.Now(), nil)
   385  			context := &internal.ClientContextImpl{
   386  				BasicClientContext: subsystems.BasicClientContext{
   387  					SDKKey:  testSDKKey,
   388  					Logging: subsystems.LoggingConfiguration{Loggers: mockLog.Loggers},
   389  				},
   390  				DiagnosticsManager: diagnosticsManager,
   391  			}
   392  
   393  			sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, time.Second)
   394  			defer sp.Close()
   395  
   396  			closeWhenReady := make(chan struct{})
   397  
   398  			sp.Start(closeWhenReady)
   399  
   400  			th.AssertChannelClosed(t, closeWhenReady, time.Second*3, "Initialization shouldn't block after this error")
   401  
   402  			event := diagnosticsManager.CreateStatsEventAndReset(0, 0, 0)
   403  			assert.Equal(t, 1, event.GetByKey("streamInits").Count())
   404  			assert.Equal(t, ldvalue.Bool(true), event.GetByKey("streamInits").GetByIndex(0).GetByKey("failed"))
   405  
   406  			status := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateOff)
   407  			assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, status.LastError.Kind)
   408  			assert.Equal(t, statusCode, status.LastError.StatusCode)
   409  		})
   410  	})
   411  }
   412  
   413  func testStreamProcessorRecoverableHTTPError(t *testing.T, statusCode int) {
   414  	initialData := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 2))
   415  	streamHandler, _ := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
   416  	sequentialHandler := httphelpers.SequentialHandler(
   417  		httphelpers.HandlerWithStatus(statusCode), // fails the first time
   418  		streamHandler, // then gets a valid stream
   419  	)
   420  	mockLog := ldlogtest.NewMockLog()
   421  	defer mockLog.DumpIfTestFailed(t)
   422  	httphelpers.WithServer(sequentialHandler, func(ts *httptest.Server) {
   423  		withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   424  			id := ldevents.NewDiagnosticID(testSDKKey)
   425  			diagnosticsManager := ldevents.NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), time.Now(), nil)
   426  			context := &internal.ClientContextImpl{
   427  				BasicClientContext: subsystems.BasicClientContext{
   428  					SDKKey:  testSDKKey,
   429  					Logging: subsystems.LoggingConfiguration{Loggers: mockLog.Loggers},
   430  				},
   431  				DiagnosticsManager: diagnosticsManager,
   432  			}
   433  
   434  			sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, briefDelay)
   435  			defer sp.Close()
   436  
   437  			closeWhenReady := make(chan struct{})
   438  			sp.Start(closeWhenReady)
   439  
   440  			th.AssertChannelClosed(t, closeWhenReady, time.Second*3, "Should have successfully retried before now")
   441  
   442  			event := diagnosticsManager.CreateStatsEventAndReset(0, 0, 0)
   443  			assert.Equal(t, 2, event.GetByKey("streamInits").Count())
   444  			assert.Equal(t, ldvalue.Bool(true), event.GetByKey("streamInits").GetByIndex(0).GetByKey("failed"))
   445  			assert.Equal(t, ldvalue.Bool(false), event.GetByKey("streamInits").GetByIndex(1).GetByKey("failed"))
   446  
   447  			// should have gotten two status updates: first for the error, then the success - note that we're checking
   448  			// here for Interrupted because that's how the StreamProcessor reports the error, even though in the public
   449  			// API it would show up as Initializing because it was still initializing
   450  			status1 := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateInterrupted)
   451  			assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, status1.LastError.Kind)
   452  			assert.Equal(t, statusCode, status1.LastError.StatusCode)
   453  			_ = dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateValid)
   454  		})
   455  	})
   456  }
   457  
   458  func TestStreamProcessorUsesHTTPClientFactory(t *testing.T) {
   459  	handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(401)) // we don't care about getting valid stream data
   460  
   461  	httphelpers.WithServer(handler, func(ts *httptest.Server) {
   462  		withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   463  			httpClientFactory := urlAppendingHTTPClientFactory("/transformed")
   464  			httpConfig := subsystems.HTTPConfiguration{CreateHTTPClient: httpClientFactory}
   465  			context := sharedtest.NewTestContext(testSDKKey, &httpConfig, nil)
   466  
   467  			sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, briefDelay)
   468  			defer sp.Close()
   469  			closeWhenReady := make(chan struct{})
   470  			sp.Start(closeWhenReady)
   471  
   472  			r := <-requestsCh
   473  
   474  			assert.Equal(t, "/all/transformed", r.Request.URL.Path)
   475  		})
   476  	})
   477  }
   478  
   479  func TestStreamProcessorDoesNotUseConfiguredTimeoutAsReadTimeout(t *testing.T) {
   480  	streamHandler, _ := ldservices.ServerSideStreamingServiceHandler(ldservices.NewServerSDKData().ToPutEvent())
   481  	handler, requestsCh := httphelpers.RecordingHandler(streamHandler)
   482  
   483  	httphelpers.WithServer(handler, func(ts *httptest.Server) {
   484  		withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
   485  			httpClientFactory := func() *http.Client {
   486  				c := *http.DefaultClient
   487  				c.Timeout = 200 * time.Millisecond
   488  				return &c
   489  			}
   490  			httpConfig := subsystems.HTTPConfiguration{CreateHTTPClient: httpClientFactory}
   491  			context := sharedtest.NewTestContext(testSDKKey, &httpConfig, nil)
   492  
   493  			sp := NewStreamProcessor(context, dataSourceUpdates, ts.URL, briefDelay)
   494  			defer sp.Close()
   495  			closeWhenReady := make(chan struct{})
   496  			sp.Start(closeWhenReady)
   497  
   498  			<-time.After(500 * time.Millisecond)
   499  			assert.Equal(t, 1, len(requestsCh))
   500  		})
   501  	})
   502  }
   503  
   504  func TestStreamProcessorRestartsStreamIfStoreNeedsRefresh(t *testing.T) {
   505  	initialData := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 1))
   506  	updatedData := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 2))
   507  	streamHandler1, _ := ldservices.ServerSideStreamingServiceHandler(initialData.ToPutEvent())
   508  	streamHandler2, _ := ldservices.ServerSideStreamingServiceHandler(updatedData.ToPutEvent())
   509  	streamHandler := httphelpers.SequentialHandler(streamHandler1, streamHandler2)
   510  
   511  	httphelpers.WithServer(streamHandler, func(ts *httptest.Server) {
   512  		withMockDataSourceUpdates(func(updates *mocks.MockDataSourceUpdates) {
   513  			sp := NewStreamProcessor(basicClientContext(), updates, ts.URL, briefDelay)
   514  			defer sp.Close()
   515  
   516  			closeWhenReady := make(chan struct{})
   517  			sp.Start(closeWhenReady)
   518  
   519  			// Wait until the stream has received data and put it in the store
   520  			updates.DataStore.WaitForInit(t, initialData, 3*time.Second)
   521  
   522  			// Make the data store simulate an outage and recovery with NeedsRefresh: true
   523  			updates.UpdateStoreStatus(interfaces.DataStoreStatus{Available: false})
   524  			updates.UpdateStoreStatus(interfaces.DataStoreStatus{Available: true, NeedsRefresh: true})
   525  
   526  			// When the stream restarts, it'll call Init with the updated data from streamHandler1
   527  			updates.DataStore.WaitForInit(t, updatedData, 3*time.Second)
   528  		})
   529  	})
   530  }
   531  
   532  func TestMalformedStreamBaseURI(t *testing.T) {
   533  	mockLog := ldlogtest.NewMockLog()
   534  	defer mockLog.DumpIfTestFailed(t)
   535  	clientContext := &internal.ClientContextImpl{
   536  		BasicClientContext: subsystems.BasicClientContext{
   537  			SDKKey:  testSDKKey,
   538  			Logging: subsystems.LoggingConfiguration{Loggers: mockLog.Loggers},
   539  		},
   540  	}
   541  	withMockDataSourceUpdates(func(updates *mocks.MockDataSourceUpdates) {
   542  		sp := NewStreamProcessor(clientContext, updates, ":/", briefDelay)
   543  		defer sp.Close()
   544  
   545  		closeWhenReady := make(chan struct{})
   546  		sp.Start(closeWhenReady)
   547  
   548  		status := updates.RequireStatusOf(t, interfaces.DataSourceStateOff)
   549  		assert.Equal(t, interfaces.DataSourceErrorKindUnknown, status.LastError.Kind)
   550  		<-closeWhenReady
   551  
   552  		mockLog.AssertMessageMatch(t, true, ldlog.Error, "Unable to create a stream request")
   553  	})
   554  }
   555  

View as plain text