1 package datasource
2
3 import (
4 "errors"
5 "fmt"
6 "net/http/httptest"
7 "testing"
8 "time"
9
10 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
11
12 "github.com/launchdarkly/go-server-sdk-evaluation/v2/ldbuilders"
13 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
14 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
15 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
16 "github.com/launchdarkly/go-server-sdk/v6/testhelpers/ldservices"
17
18 th "github.com/launchdarkly/go-test-helpers/v3"
19 "github.com/launchdarkly/go-test-helpers/v3/httphelpers"
20
21 "github.com/stretchr/testify/assert"
22 )
23
24 func TestPollingProcessorClosingItShouldNotBlock(t *testing.T) {
25 r := newMockRequestor()
26 defer r.Close()
27 r.requestAllRespCh <- mockRequestAllResponse{}
28
29 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
30 p := newPollingProcessor(basicClientContext(), dataSourceUpdates, r, time.Minute)
31
32 p.Close()
33
34 closeWhenReady := make(chan struct{})
35 p.Start(closeWhenReady)
36
37 th.AssertChannelClosed(t, closeWhenReady, time.Second, "starting a closed processor shouldn't block")
38 })
39 }
40
41 func TestPollingProcessorInitialization(t *testing.T) {
42 flag := ldbuilders.NewFlagBuilder("flagkey").Version(1).Build()
43 segment := ldbuilders.NewSegmentBuilder("segmentkey").Version(1).Build()
44
45 r := newMockRequestor()
46 defer r.Close()
47 expectedData := sharedtest.NewDataSetBuilder().Flags(flag).Segments(segment)
48 resp := mockRequestAllResponse{data: expectedData.Build()}
49 r.requestAllRespCh <- resp
50
51 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
52 p := newPollingProcessor(basicClientContext(), dataSourceUpdates, r, time.Millisecond*10)
53 defer p.Close()
54
55 closeWhenReady := make(chan struct{})
56 p.Start(closeWhenReady)
57
58 if !th.AssertChannelClosed(t, closeWhenReady, time.Second, "Failed to initialize") {
59 return
60 }
61
62 assert.True(t, p.IsInitialized())
63
64 dataSourceUpdates.DataStore.WaitForInit(t, expectedData.ToServerSDKData(), 2*time.Second)
65
66 for i := 0; i < 2; i++ {
67 r.requestAllRespCh <- resp
68 if _, ok, closed := th.TryReceive(r.pollsCh, time.Second); !ok || closed {
69 assert.Fail(t, "Expected 2 polls", "but only got %d", i)
70 return
71 }
72 }
73 })
74 }
75
76 func TestPollingProcessorRecoverableErrors(t *testing.T) {
77 for _, statusCode := range []int{400, 408, 429, 500, 503} {
78 t.Run(fmt.Sprintf("HTTP %d", statusCode), func(t *testing.T) {
79 testPollingProcessorRecoverableError(
80 t,
81 httpStatusError{Code: statusCode},
82 func(errorInfo interfaces.DataSourceErrorInfo) {
83 assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, errorInfo.Kind)
84 assert.Equal(t, statusCode, errorInfo.StatusCode)
85 },
86 )
87 })
88 }
89
90 t.Run("network error", func(t *testing.T) {
91 testPollingProcessorRecoverableError(
92 t,
93 errors.New("arbitrary error"),
94 func(errorInfo interfaces.DataSourceErrorInfo) {
95 assert.Equal(t, interfaces.DataSourceErrorKindNetworkError, errorInfo.Kind)
96 assert.Equal(t, "arbitrary error", errorInfo.Message)
97 },
98 )
99 })
100
101 t.Run("malformed data", func(t *testing.T) {
102 testPollingProcessorRecoverableError(
103 t,
104 malformedJSONError{innerError: errors.New("sorry")},
105 func(errorInfo interfaces.DataSourceErrorInfo) {
106 assert.Equal(t, string(interfaces.DataSourceErrorKindInvalidData), string(errorInfo.Kind))
107 assert.Contains(t, errorInfo.Message, "sorry")
108 },
109 )
110 })
111 }
112
113 func testPollingProcessorRecoverableError(t *testing.T, err error, verifyError func(interfaces.DataSourceErrorInfo)) {
114 req := newMockRequestor()
115 defer req.Close()
116
117 req.requestAllRespCh <- mockRequestAllResponse{err: err}
118
119 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
120 p := newPollingProcessor(basicClientContext(), dataSourceUpdates, req, time.Millisecond*10)
121 defer p.Close()
122 closeWhenReady := make(chan struct{})
123 p.Start(closeWhenReady)
124
125
126 <-req.pollsCh
127
128 status := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateInterrupted)
129 verifyError(status.LastError)
130
131 if !th.AssertChannelNotClosed(t, closeWhenReady, 0) {
132 t.FailNow()
133 }
134
135 req.requestAllRespCh <- mockRequestAllResponse{}
136
137
138 th.RequireValue(t, req.pollsCh, time.Second, "failed to retry")
139
140 waitForReadyWithTimeout(t, closeWhenReady, time.Second)
141 _ = dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateValid)
142 })
143 }
144
145 func TestPollingProcessorUnrecoverableErrors(t *testing.T) {
146 for _, statusCode := range []int{401, 403, 405} {
147 t.Run(fmt.Sprintf("HTTP %d", statusCode), func(t *testing.T) {
148 testPollingProcessorUnrecoverableError(
149 t,
150 httpStatusError{Code: statusCode},
151 func(errorInfo interfaces.DataSourceErrorInfo) {
152 assert.Equal(t, interfaces.DataSourceErrorKindErrorResponse, errorInfo.Kind)
153 assert.Equal(t, statusCode, errorInfo.StatusCode)
154 },
155 )
156 })
157 }
158 }
159
160 func testPollingProcessorUnrecoverableError(
161 t *testing.T,
162 err error,
163 verifyError func(interfaces.DataSourceErrorInfo),
164 ) {
165 req := newMockRequestor()
166 defer req.Close()
167
168 req.requestAllRespCh <- mockRequestAllResponse{err: err}
169 req.requestAllRespCh <- mockRequestAllResponse{}
170
171 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
172 p := newPollingProcessor(basicClientContext(), dataSourceUpdates, req, time.Millisecond*10)
173 defer p.Close()
174 closeWhenReady := make(chan struct{})
175 p.Start(closeWhenReady)
176
177
178 <-req.pollsCh
179
180 waitForReadyWithTimeout(t, closeWhenReady, time.Second)
181
182 status := dataSourceUpdates.RequireStatusOf(t, interfaces.DataSourceStateOff)
183 verifyError(status.LastError)
184 assert.Len(t, req.pollsCh, 0)
185 })
186 }
187
188 func TestPollingProcessorUsesHTTPClientFactory(t *testing.T) {
189 data := ldservices.NewServerSDKData().Flags(ldservices.FlagOrSegment("my-flag", 2))
190 pollHandler, requestsCh := httphelpers.RecordingHandler(ldservices.ServerSidePollingServiceHandler(data))
191 httphelpers.WithServer(pollHandler, func(ts *httptest.Server) {
192 withMockDataSourceUpdates(func(dataSourceUpdates *mocks.MockDataSourceUpdates) {
193 httpClientFactory := urlAppendingHTTPClientFactory("/transformed")
194 httpConfig := subsystems.HTTPConfiguration{CreateHTTPClient: httpClientFactory}
195 context := sharedtest.NewTestContext(testSDKKey, &httpConfig, nil)
196
197 p := NewPollingProcessor(context, dataSourceUpdates, ts.URL, time.Minute*30)
198 defer p.Close()
199 closeWhenReady := make(chan struct{})
200 p.Start(closeWhenReady)
201
202 r := <-requestsCh
203
204 assert.Equal(t, "/sdk/latest-all/transformed", r.Request.URL.Path)
205 })
206 })
207 }
208
View as plain text