...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datasource/streaming_data_source.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datasource

     1  package datasource
     2  
     3  import (
     4  	"net/http"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
     9  	"github.com/launchdarkly/go-sdk-common/v3/ldtime"
    10  	ldevents "github.com/launchdarkly/go-sdk-events/v2"
    11  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
    12  	"github.com/launchdarkly/go-server-sdk/v6/internal"
    13  	"github.com/launchdarkly/go-server-sdk/v6/internal/endpoints"
    14  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    15  	"github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
    16  
    17  	es "github.com/launchdarkly/eventsource"
    18  
    19  	"golang.org/x/exp/maps"
    20  )
    21  
    22  // Implementation of the streaming data source, not including the lower-level SSE implementation which is in
    23  // the eventsource package.
    24  //
    25  // Error handling works as follows:
    26  // 1. If any event is malformed, we must assume the stream is broken and we may have missed updates. Set the
    27  // data source state to INTERRUPTED, with an error kind of INVALID_DATA, and restart the stream.
    28  // 2. If we try to put updates into the data store and we get an error, we must assume something's wrong with the
    29  // data store. We don't have to log this error because it is logged by DataSourceUpdateSinkImpl, which will also set
    30  // our state to INTERRUPTED for us.
    31  // 2a. If the data store supports status notifications (which all persistent stores normally do), then we can
    32  // assume it has entered a failed state and will notify us once it is working again. If and when it recovers, then
    33  // it will tell us whether we need to restart the stream (to ensure that we haven't missed any updates), or
    34  // whether it has already persisted all of the stream updates we received during the outage.
    35  // 2b. If the data store doesn't support status notifications (which is normally only true of the in-memory store)
    36  // then we don't know the significance of the error, but we must assume that updates have been lost, so we'll
    37  // restart the stream.
    38  // 3. If we receive an unrecoverable error like HTTP 401, we close the stream and don't retry, and set the state
    39  // to OFF. Any other HTTP error or network error causes a retry with backoff, with a state of INTERRUPTED.
    40  // 4. We set the Future returned by start() to tell the client initialization logic that initialization has either
    41  // succeeded (we got an initial payload and successfully stored it) or permanently failed (we got a 401, etc.).
    42  // Otherwise, the client initialization method may time out but we will still be retrying in the background, and
    43  // if we succeed then the client can detect that we're initialized now by calling our Initialized method.
    44  
    45  const (
    46  	putEvent                 = "put"
    47  	patchEvent               = "patch"
    48  	deleteEvent              = "delete"
    49  	streamReadTimeout        = 5 * time.Minute // the LaunchDarkly stream should send a heartbeat comment every 3 minutes
    50  	streamMaxRetryDelay      = 30 * time.Second
    51  	streamRetryResetInterval = 60 * time.Second
    52  	streamJitterRatio        = 0.5
    53  	defaultStreamRetryDelay  = 1 * time.Second
    54  
    55  	streamingErrorContext     = "in stream connection"
    56  	streamingWillRetryMessage = "will retry"
    57  )
    58  
    59  // StreamProcessor is the internal implementation of the streaming data source.
    60  //
    61  // This type is exported from internal so that the StreamingDataSourceBuilder tests can verify its
    62  // configuration. All other code outside of this package should interact with it only via the
    63  // DataSource interface.
    64  type StreamProcessor struct {
    65  	dataSourceUpdates          subsystems.DataSourceUpdateSink
    66  	streamURI                  string
    67  	initialReconnectDelay      time.Duration
    68  	client                     *http.Client
    69  	headers                    http.Header
    70  	diagnosticsManager         *ldevents.DiagnosticsManager
    71  	loggers                    ldlog.Loggers
    72  	isInitialized              internal.AtomicBoolean
    73  	halt                       chan struct{}
    74  	storeStatusCh              <-chan interfaces.DataStoreStatus
    75  	connectionAttemptStartTime ldtime.UnixMillisecondTime
    76  	connectionAttemptLock      sync.Mutex
    77  	readyOnce                  sync.Once
    78  	closeOnce                  sync.Once
    79  }
    80  
    81  // NewStreamProcessor creates the internal implementation of the streaming data source.
    82  func NewStreamProcessor(
    83  	context subsystems.ClientContext,
    84  	dataSourceUpdates subsystems.DataSourceUpdateSink,
    85  	streamURI string,
    86  	initialReconnectDelay time.Duration,
    87  ) *StreamProcessor {
    88  	sp := &StreamProcessor{
    89  		dataSourceUpdates:     dataSourceUpdates,
    90  		streamURI:             streamURI,
    91  		initialReconnectDelay: initialReconnectDelay,
    92  		headers:               context.GetHTTP().DefaultHeaders,
    93  		loggers:               context.GetLogging().Loggers,
    94  		halt:                  make(chan struct{}),
    95  	}
    96  	if cci, ok := context.(*internal.ClientContextImpl); ok {
    97  		sp.diagnosticsManager = cci.DiagnosticsManager
    98  	}
    99  
   100  	sp.client = context.GetHTTP().CreateHTTPClient()
   101  	// Client.Timeout isn't just a connect timeout, it will break the connection if a full response
   102  	// isn't received within that time (which, with the stream, it never will be), so we must make
   103  	// sure it's zero and not the usual configured default. What we do want is a *connection* timeout,
   104  	// which is set by Config.newHTTPClient as a property of the Dialer.
   105  	sp.client.Timeout = 0
   106  
   107  	return sp
   108  }
   109  
   110  //nolint:revive // no doc comment for standard method
   111  func (sp *StreamProcessor) IsInitialized() bool {
   112  	return sp.isInitialized.Get()
   113  }
   114  
   115  //nolint:revive // no doc comment for standard method
   116  func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) {
   117  	sp.loggers.Info("Starting LaunchDarkly streaming connection")
   118  	if sp.dataSourceUpdates.GetDataStoreStatusProvider().IsStatusMonitoringEnabled() {
   119  		sp.storeStatusCh = sp.dataSourceUpdates.GetDataStoreStatusProvider().AddStatusListener()
   120  	}
   121  	go sp.subscribe(closeWhenReady)
   122  }
   123  
   124  func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<- struct{}) {
   125  	// Consume remaining Events and Errors so we can garbage collect
   126  	defer func() {
   127  		for range stream.Events {
   128  		} // COVERAGE: no way to cause this condition in unit tests
   129  		if stream.Errors != nil {
   130  			for range stream.Errors { // COVERAGE: no way to cause this condition in unit tests
   131  			}
   132  		}
   133  	}()
   134  
   135  	for {
   136  		select {
   137  		case event, ok := <-stream.Events:
   138  			if !ok {
   139  				// COVERAGE: stream.Events is only closed if the EventSource has been closed. However, that
   140  				// only happens when we have received from sp.halt, in which case we return immediately
   141  				// after calling stream.Close(), terminating the for loop-- so we should not actually reach
   142  				// this point. Still, in case the channel is somehow closed unexpectedly, we do want to
   143  				// terminate the loop.
   144  				return
   145  			}
   146  			sp.logConnectionResult(true)
   147  
   148  			processedEvent := true
   149  			shouldRestart := false
   150  
   151  			gotMalformedEvent := func(event es.Event, err error) {
   152  				sp.loggers.Errorf(
   153  					"Received streaming \"%s\" event with malformed JSON data (%s); will restart stream",
   154  					event.Event(),
   155  					err,
   156  				)
   157  
   158  				errorInfo := interfaces.DataSourceErrorInfo{
   159  					Kind:    interfaces.DataSourceErrorKindInvalidData,
   160  					Message: err.Error(),
   161  					Time:    time.Now(),
   162  				}
   163  				sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
   164  
   165  				shouldRestart = true // scenario 1 in error handling comments at top of file
   166  				processedEvent = false
   167  			}
   168  
   169  			storeUpdateFailed := func(updateDesc string) {
   170  				if sp.storeStatusCh != nil {
   171  					sp.loggers.Errorf("Failed to store %s in data store; will try again once data store is working", updateDesc)
   172  					// scenario 2a in error handling comments at top of file
   173  				} else {
   174  					sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc)
   175  					shouldRestart = true // scenario 2b
   176  					processedEvent = false
   177  				}
   178  			}
   179  
   180  			switch event.Event() {
   181  			case putEvent:
   182  				put, err := parsePutData([]byte(event.Data()))
   183  				if err != nil {
   184  					gotMalformedEvent(event, err)
   185  					break
   186  				}
   187  				if sp.dataSourceUpdates.Init(put.Data) {
   188  					sp.setInitializedAndNotifyClient(true, closeWhenReady)
   189  				} else {
   190  					storeUpdateFailed("initial streaming data")
   191  				}
   192  
   193  			case patchEvent:
   194  				patch, err := parsePatchData([]byte(event.Data()))
   195  				if err != nil {
   196  					gotMalformedEvent(event, err)
   197  					break
   198  				}
   199  				if patch.Kind == nil {
   200  					break // ignore unrecognized item type
   201  				}
   202  				if !sp.dataSourceUpdates.Upsert(patch.Kind, patch.Key, patch.Data) {
   203  					storeUpdateFailed("streaming update of " + patch.Key)
   204  				}
   205  
   206  			case deleteEvent:
   207  				del, err := parseDeleteData([]byte(event.Data()))
   208  				if err != nil {
   209  					gotMalformedEvent(event, err)
   210  					break
   211  				}
   212  				if del.Kind == nil {
   213  					break // ignore unrecognized item type
   214  				}
   215  				deletedItem := ldstoretypes.ItemDescriptor{Version: del.Version, Item: nil}
   216  				if !sp.dataSourceUpdates.Upsert(del.Kind, del.Key, deletedItem) {
   217  					storeUpdateFailed("streaming deletion of " + del.Key)
   218  				}
   219  
   220  			default:
   221  				sp.loggers.Infof("Unexpected event found in stream: %s", event.Event())
   222  			}
   223  
   224  			if processedEvent {
   225  				sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
   226  			}
   227  			if shouldRestart {
   228  				stream.Restart()
   229  			}
   230  
   231  		case newStoreStatus := <-sp.storeStatusCh:
   232  			if sp.loggers.IsDebugEnabled() {
   233  				sp.loggers.Debugf("StreamProcessor received store status update: %+v", newStoreStatus)
   234  			}
   235  			if newStoreStatus.Available {
   236  				// The store has just transitioned from unavailable to available (scenario 2a above)
   237  				if newStoreStatus.NeedsRefresh {
   238  					// The store is telling us that it can't guarantee that all of the latest data was cached.
   239  					// So we'll restart the stream to ensure a full refresh.
   240  					sp.loggers.Warn("Restarting stream to refresh data after data store outage")
   241  					stream.Restart()
   242  				}
   243  				// All of the updates were cached and have been written to the store, so we don't need to
   244  				// restart the stream. We just need to make sure the client knows we're initialized now
   245  				// (in case the initial "put" was not stored).
   246  				sp.setInitializedAndNotifyClient(true, closeWhenReady)
   247  			}
   248  
   249  		case <-sp.halt:
   250  			stream.Close()
   251  			return
   252  		}
   253  	}
   254  }
   255  
   256  func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
   257  	req, reqErr := http.NewRequest("GET", endpoints.AddPath(sp.streamURI, endpoints.StreamingRequestPath), nil)
   258  	if reqErr != nil {
   259  		sp.loggers.Errorf(
   260  			"Unable to create a stream request; this is not a network problem, most likely a bad base URI: %s",
   261  			reqErr,
   262  		)
   263  		sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{
   264  			Kind:    interfaces.DataSourceErrorKindUnknown,
   265  			Message: reqErr.Error(),
   266  			Time:    time.Now(),
   267  		})
   268  		sp.logConnectionResult(false)
   269  		close(closeWhenReady)
   270  		return
   271  	}
   272  	if sp.headers != nil {
   273  		req.Header = maps.Clone(sp.headers)
   274  	}
   275  	sp.loggers.Info("Connecting to LaunchDarkly stream")
   276  
   277  	sp.logConnectionStarted()
   278  
   279  	initialRetryDelay := sp.initialReconnectDelay
   280  	if initialRetryDelay <= 0 { // COVERAGE: can't cause this condition in unit tests
   281  		initialRetryDelay = defaultStreamRetryDelay
   282  	}
   283  
   284  	errorHandler := func(err error) es.StreamErrorHandlerResult {
   285  		sp.logConnectionResult(false)
   286  
   287  		if se, ok := err.(es.SubscriptionError); ok {
   288  			errorInfo := interfaces.DataSourceErrorInfo{
   289  				Kind:       interfaces.DataSourceErrorKindErrorResponse,
   290  				StatusCode: se.Code,
   291  				Time:       time.Now(),
   292  			}
   293  			recoverable := checkIfErrorIsRecoverableAndLog(
   294  				sp.loggers,
   295  				httpErrorDescription(se.Code),
   296  				streamingErrorContext,
   297  				se.Code,
   298  				streamingWillRetryMessage,
   299  			)
   300  			if recoverable {
   301  				sp.logConnectionStarted()
   302  				sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
   303  				return es.StreamErrorHandlerResult{CloseNow: false}
   304  			}
   305  			sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
   306  			return es.StreamErrorHandlerResult{CloseNow: true}
   307  		}
   308  
   309  		checkIfErrorIsRecoverableAndLog(
   310  			sp.loggers,
   311  			err.Error(),
   312  			streamingErrorContext,
   313  			0,
   314  			streamingWillRetryMessage,
   315  		)
   316  		errorInfo := interfaces.DataSourceErrorInfo{
   317  			Kind:    interfaces.DataSourceErrorKindNetworkError,
   318  			Message: err.Error(),
   319  			Time:    time.Now(),
   320  		}
   321  		sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
   322  		sp.logConnectionStarted()
   323  		return es.StreamErrorHandlerResult{CloseNow: false}
   324  	}
   325  
   326  	stream, err := es.SubscribeWithRequestAndOptions(req,
   327  		es.StreamOptionHTTPClient(sp.client),
   328  		es.StreamOptionReadTimeout(streamReadTimeout),
   329  		es.StreamOptionInitialRetry(initialRetryDelay),
   330  		es.StreamOptionUseBackoff(streamMaxRetryDelay),
   331  		es.StreamOptionUseJitter(streamJitterRatio),
   332  		es.StreamOptionRetryResetInterval(streamRetryResetInterval),
   333  		es.StreamOptionErrorHandler(errorHandler),
   334  		es.StreamOptionCanRetryFirstConnection(-1),
   335  		es.StreamOptionLogger(sp.loggers.ForLevel(ldlog.Info)),
   336  	)
   337  
   338  	if err != nil {
   339  		sp.logConnectionResult(false)
   340  
   341  		close(closeWhenReady)
   342  		return
   343  	}
   344  
   345  	sp.consumeStream(stream, closeWhenReady)
   346  }
   347  
   348  func (sp *StreamProcessor) setInitializedAndNotifyClient(success bool, closeWhenReady chan<- struct{}) {
   349  	if success {
   350  		wasAlreadyInitialized := sp.isInitialized.GetAndSet(true)
   351  		if !wasAlreadyInitialized {
   352  			sp.loggers.Info("LaunchDarkly streaming is active")
   353  		}
   354  	}
   355  	sp.readyOnce.Do(func() {
   356  		close(closeWhenReady)
   357  	})
   358  }
   359  
   360  func (sp *StreamProcessor) logConnectionStarted() {
   361  	sp.connectionAttemptLock.Lock()
   362  	defer sp.connectionAttemptLock.Unlock()
   363  	sp.connectionAttemptStartTime = ldtime.UnixMillisNow()
   364  }
   365  
   366  func (sp *StreamProcessor) logConnectionResult(success bool) {
   367  	sp.connectionAttemptLock.Lock()
   368  	startTimeWas := sp.connectionAttemptStartTime
   369  	sp.connectionAttemptStartTime = 0
   370  	sp.connectionAttemptLock.Unlock()
   371  
   372  	if startTimeWas > 0 && sp.diagnosticsManager != nil {
   373  		timestamp := ldtime.UnixMillisNow()
   374  		sp.diagnosticsManager.RecordStreamInit(timestamp, !success, uint64(timestamp-startTimeWas))
   375  	}
   376  }
   377  
   378  //nolint:revive // no doc comment for standard method
   379  func (sp *StreamProcessor) Close() error {
   380  	sp.closeOnce.Do(func() {
   381  		close(sp.halt)
   382  		if sp.storeStatusCh != nil {
   383  			sp.dataSourceUpdates.GetDataStoreStatusProvider().RemoveStatusListener(sp.storeStatusCh)
   384  		}
   385  		sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
   386  	})
   387  	return nil
   388  }
   389  
   390  // GetBaseURI returns the configured streaming base URI, for testing.
   391  func (sp *StreamProcessor) GetBaseURI() string {
   392  	return sp.streamURI
   393  }
   394  
   395  // GetInitialReconnectDelay returns the configured reconnect delay, for testing.
   396  func (sp *StreamProcessor) GetInitialReconnectDelay() time.Duration {
   397  	return sp.initialReconnectDelay
   398  }
   399  

View as plain text