1 package datasource
2
3 import (
4 "net/http"
5 "sync"
6 "time"
7
8 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
9 "github.com/launchdarkly/go-sdk-common/v3/ldtime"
10 ldevents "github.com/launchdarkly/go-sdk-events/v2"
11 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
12 "github.com/launchdarkly/go-server-sdk/v6/internal"
13 "github.com/launchdarkly/go-server-sdk/v6/internal/endpoints"
14 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
15 "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
16
17 es "github.com/launchdarkly/eventsource"
18
19 "golang.org/x/exp/maps"
20 )
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 const (
46 putEvent = "put"
47 patchEvent = "patch"
48 deleteEvent = "delete"
49 streamReadTimeout = 5 * time.Minute
50 streamMaxRetryDelay = 30 * time.Second
51 streamRetryResetInterval = 60 * time.Second
52 streamJitterRatio = 0.5
53 defaultStreamRetryDelay = 1 * time.Second
54
55 streamingErrorContext = "in stream connection"
56 streamingWillRetryMessage = "will retry"
57 )
58
59
60
61
62
63
64 type StreamProcessor struct {
65 dataSourceUpdates subsystems.DataSourceUpdateSink
66 streamURI string
67 initialReconnectDelay time.Duration
68 client *http.Client
69 headers http.Header
70 diagnosticsManager *ldevents.DiagnosticsManager
71 loggers ldlog.Loggers
72 isInitialized internal.AtomicBoolean
73 halt chan struct{}
74 storeStatusCh <-chan interfaces.DataStoreStatus
75 connectionAttemptStartTime ldtime.UnixMillisecondTime
76 connectionAttemptLock sync.Mutex
77 readyOnce sync.Once
78 closeOnce sync.Once
79 }
80
81
82 func NewStreamProcessor(
83 context subsystems.ClientContext,
84 dataSourceUpdates subsystems.DataSourceUpdateSink,
85 streamURI string,
86 initialReconnectDelay time.Duration,
87 ) *StreamProcessor {
88 sp := &StreamProcessor{
89 dataSourceUpdates: dataSourceUpdates,
90 streamURI: streamURI,
91 initialReconnectDelay: initialReconnectDelay,
92 headers: context.GetHTTP().DefaultHeaders,
93 loggers: context.GetLogging().Loggers,
94 halt: make(chan struct{}),
95 }
96 if cci, ok := context.(*internal.ClientContextImpl); ok {
97 sp.diagnosticsManager = cci.DiagnosticsManager
98 }
99
100 sp.client = context.GetHTTP().CreateHTTPClient()
101
102
103
104
105 sp.client.Timeout = 0
106
107 return sp
108 }
109
110
111 func (sp *StreamProcessor) IsInitialized() bool {
112 return sp.isInitialized.Get()
113 }
114
115
116 func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) {
117 sp.loggers.Info("Starting LaunchDarkly streaming connection")
118 if sp.dataSourceUpdates.GetDataStoreStatusProvider().IsStatusMonitoringEnabled() {
119 sp.storeStatusCh = sp.dataSourceUpdates.GetDataStoreStatusProvider().AddStatusListener()
120 }
121 go sp.subscribe(closeWhenReady)
122 }
123
124 func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<- struct{}) {
125
126 defer func() {
127 for range stream.Events {
128 }
129 if stream.Errors != nil {
130 for range stream.Errors {
131 }
132 }
133 }()
134
135 for {
136 select {
137 case event, ok := <-stream.Events:
138 if !ok {
139
140
141
142
143
144 return
145 }
146 sp.logConnectionResult(true)
147
148 processedEvent := true
149 shouldRestart := false
150
151 gotMalformedEvent := func(event es.Event, err error) {
152 sp.loggers.Errorf(
153 "Received streaming \"%s\" event with malformed JSON data (%s); will restart stream",
154 event.Event(),
155 err,
156 )
157
158 errorInfo := interfaces.DataSourceErrorInfo{
159 Kind: interfaces.DataSourceErrorKindInvalidData,
160 Message: err.Error(),
161 Time: time.Now(),
162 }
163 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
164
165 shouldRestart = true
166 processedEvent = false
167 }
168
169 storeUpdateFailed := func(updateDesc string) {
170 if sp.storeStatusCh != nil {
171 sp.loggers.Errorf("Failed to store %s in data store; will try again once data store is working", updateDesc)
172
173 } else {
174 sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc)
175 shouldRestart = true
176 processedEvent = false
177 }
178 }
179
180 switch event.Event() {
181 case putEvent:
182 put, err := parsePutData([]byte(event.Data()))
183 if err != nil {
184 gotMalformedEvent(event, err)
185 break
186 }
187 if sp.dataSourceUpdates.Init(put.Data) {
188 sp.setInitializedAndNotifyClient(true, closeWhenReady)
189 } else {
190 storeUpdateFailed("initial streaming data")
191 }
192
193 case patchEvent:
194 patch, err := parsePatchData([]byte(event.Data()))
195 if err != nil {
196 gotMalformedEvent(event, err)
197 break
198 }
199 if patch.Kind == nil {
200 break
201 }
202 if !sp.dataSourceUpdates.Upsert(patch.Kind, patch.Key, patch.Data) {
203 storeUpdateFailed("streaming update of " + patch.Key)
204 }
205
206 case deleteEvent:
207 del, err := parseDeleteData([]byte(event.Data()))
208 if err != nil {
209 gotMalformedEvent(event, err)
210 break
211 }
212 if del.Kind == nil {
213 break
214 }
215 deletedItem := ldstoretypes.ItemDescriptor{Version: del.Version, Item: nil}
216 if !sp.dataSourceUpdates.Upsert(del.Kind, del.Key, deletedItem) {
217 storeUpdateFailed("streaming deletion of " + del.Key)
218 }
219
220 default:
221 sp.loggers.Infof("Unexpected event found in stream: %s", event.Event())
222 }
223
224 if processedEvent {
225 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
226 }
227 if shouldRestart {
228 stream.Restart()
229 }
230
231 case newStoreStatus := <-sp.storeStatusCh:
232 if sp.loggers.IsDebugEnabled() {
233 sp.loggers.Debugf("StreamProcessor received store status update: %+v", newStoreStatus)
234 }
235 if newStoreStatus.Available {
236
237 if newStoreStatus.NeedsRefresh {
238
239
240 sp.loggers.Warn("Restarting stream to refresh data after data store outage")
241 stream.Restart()
242 }
243
244
245
246 sp.setInitializedAndNotifyClient(true, closeWhenReady)
247 }
248
249 case <-sp.halt:
250 stream.Close()
251 return
252 }
253 }
254 }
255
256 func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
257 req, reqErr := http.NewRequest("GET", endpoints.AddPath(sp.streamURI, endpoints.StreamingRequestPath), nil)
258 if reqErr != nil {
259 sp.loggers.Errorf(
260 "Unable to create a stream request; this is not a network problem, most likely a bad base URI: %s",
261 reqErr,
262 )
263 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{
264 Kind: interfaces.DataSourceErrorKindUnknown,
265 Message: reqErr.Error(),
266 Time: time.Now(),
267 })
268 sp.logConnectionResult(false)
269 close(closeWhenReady)
270 return
271 }
272 if sp.headers != nil {
273 req.Header = maps.Clone(sp.headers)
274 }
275 sp.loggers.Info("Connecting to LaunchDarkly stream")
276
277 sp.logConnectionStarted()
278
279 initialRetryDelay := sp.initialReconnectDelay
280 if initialRetryDelay <= 0 {
281 initialRetryDelay = defaultStreamRetryDelay
282 }
283
284 errorHandler := func(err error) es.StreamErrorHandlerResult {
285 sp.logConnectionResult(false)
286
287 if se, ok := err.(es.SubscriptionError); ok {
288 errorInfo := interfaces.DataSourceErrorInfo{
289 Kind: interfaces.DataSourceErrorKindErrorResponse,
290 StatusCode: se.Code,
291 Time: time.Now(),
292 }
293 recoverable := checkIfErrorIsRecoverableAndLog(
294 sp.loggers,
295 httpErrorDescription(se.Code),
296 streamingErrorContext,
297 se.Code,
298 streamingWillRetryMessage,
299 )
300 if recoverable {
301 sp.logConnectionStarted()
302 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
303 return es.StreamErrorHandlerResult{CloseNow: false}
304 }
305 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
306 return es.StreamErrorHandlerResult{CloseNow: true}
307 }
308
309 checkIfErrorIsRecoverableAndLog(
310 sp.loggers,
311 err.Error(),
312 streamingErrorContext,
313 0,
314 streamingWillRetryMessage,
315 )
316 errorInfo := interfaces.DataSourceErrorInfo{
317 Kind: interfaces.DataSourceErrorKindNetworkError,
318 Message: err.Error(),
319 Time: time.Now(),
320 }
321 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
322 sp.logConnectionStarted()
323 return es.StreamErrorHandlerResult{CloseNow: false}
324 }
325
326 stream, err := es.SubscribeWithRequestAndOptions(req,
327 es.StreamOptionHTTPClient(sp.client),
328 es.StreamOptionReadTimeout(streamReadTimeout),
329 es.StreamOptionInitialRetry(initialRetryDelay),
330 es.StreamOptionUseBackoff(streamMaxRetryDelay),
331 es.StreamOptionUseJitter(streamJitterRatio),
332 es.StreamOptionRetryResetInterval(streamRetryResetInterval),
333 es.StreamOptionErrorHandler(errorHandler),
334 es.StreamOptionCanRetryFirstConnection(-1),
335 es.StreamOptionLogger(sp.loggers.ForLevel(ldlog.Info)),
336 )
337
338 if err != nil {
339 sp.logConnectionResult(false)
340
341 close(closeWhenReady)
342 return
343 }
344
345 sp.consumeStream(stream, closeWhenReady)
346 }
347
348 func (sp *StreamProcessor) setInitializedAndNotifyClient(success bool, closeWhenReady chan<- struct{}) {
349 if success {
350 wasAlreadyInitialized := sp.isInitialized.GetAndSet(true)
351 if !wasAlreadyInitialized {
352 sp.loggers.Info("LaunchDarkly streaming is active")
353 }
354 }
355 sp.readyOnce.Do(func() {
356 close(closeWhenReady)
357 })
358 }
359
360 func (sp *StreamProcessor) logConnectionStarted() {
361 sp.connectionAttemptLock.Lock()
362 defer sp.connectionAttemptLock.Unlock()
363 sp.connectionAttemptStartTime = ldtime.UnixMillisNow()
364 }
365
366 func (sp *StreamProcessor) logConnectionResult(success bool) {
367 sp.connectionAttemptLock.Lock()
368 startTimeWas := sp.connectionAttemptStartTime
369 sp.connectionAttemptStartTime = 0
370 sp.connectionAttemptLock.Unlock()
371
372 if startTimeWas > 0 && sp.diagnosticsManager != nil {
373 timestamp := ldtime.UnixMillisNow()
374 sp.diagnosticsManager.RecordStreamInit(timestamp, !success, uint64(timestamp-startTimeWas))
375 }
376 }
377
378
379 func (sp *StreamProcessor) Close() error {
380 sp.closeOnce.Do(func() {
381 close(sp.halt)
382 if sp.storeStatusCh != nil {
383 sp.dataSourceUpdates.GetDataStoreStatusProvider().RemoveStatusListener(sp.storeStatusCh)
384 }
385 sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
386 })
387 return nil
388 }
389
390
391 func (sp *StreamProcessor) GetBaseURI() string {
392 return sp.streamURI
393 }
394
395
396 func (sp *StreamProcessor) GetInitialReconnectDelay() time.Duration {
397 return sp.initialReconnectDelay
398 }
399
View as plain text