...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datasource/data_source_update_sink_impl.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datasource

     1  package datasource
     2  
     3  import (
     4  	"fmt"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
     9  	intf "github.com/launchdarkly/go-server-sdk/v6/interfaces"
    10  	"github.com/launchdarkly/go-server-sdk/v6/internal"
    11  	"github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
    12  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    13  	st "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
    14  )
    15  
    16  // DataSourceUpdateSinkImpl is the internal implementation of DataSourceUpdateSink. It is exported
    17  // because the actual implementation type, rather than the interface, is required as a dependency
    18  // of other SDK components.
    19  type DataSourceUpdateSinkImpl struct {
    20  	store                       subsystems.DataStore
    21  	dataStoreStatusProvider     intf.DataStoreStatusProvider
    22  	dataSourceStatusBroadcaster *internal.Broadcaster[intf.DataSourceStatus]
    23  	flagChangeEventBroadcaster  *internal.Broadcaster[intf.FlagChangeEvent]
    24  	dependencyTracker           *dependencyTracker
    25  	outageTracker               *outageTracker
    26  	loggers                     ldlog.Loggers
    27  	currentStatus               intf.DataSourceStatus
    28  	lastStoreUpdateFailed       bool
    29  	lock                        sync.Mutex
    30  }
    31  
    32  // NewDataSourceUpdateSinkImpl creates the internal implementation of DataSourceUpdateSink.
    33  func NewDataSourceUpdateSinkImpl(
    34  	store subsystems.DataStore,
    35  	dataStoreStatusProvider intf.DataStoreStatusProvider,
    36  	dataSourceStatusBroadcaster *internal.Broadcaster[intf.DataSourceStatus],
    37  	flagChangeEventBroadcaster *internal.Broadcaster[intf.FlagChangeEvent],
    38  	logDataSourceOutageAsErrorAfter time.Duration,
    39  	loggers ldlog.Loggers,
    40  ) *DataSourceUpdateSinkImpl {
    41  	return &DataSourceUpdateSinkImpl{
    42  		store:                       store,
    43  		dataStoreStatusProvider:     dataStoreStatusProvider,
    44  		dataSourceStatusBroadcaster: dataSourceStatusBroadcaster,
    45  		flagChangeEventBroadcaster:  flagChangeEventBroadcaster,
    46  		dependencyTracker:           newDependencyTracker(),
    47  		outageTracker:               newOutageTracker(logDataSourceOutageAsErrorAfter, loggers),
    48  		loggers:                     loggers,
    49  		currentStatus: intf.DataSourceStatus{
    50  			State:      intf.DataSourceStateInitializing,
    51  			StateSince: time.Now(),
    52  		},
    53  	}
    54  }
    55  
    56  //nolint:revive // no doc comment for standard method
    57  func (d *DataSourceUpdateSinkImpl) Init(allData []st.Collection) bool {
    58  	var oldData map[st.DataKind]map[string]st.ItemDescriptor
    59  
    60  	if d.flagChangeEventBroadcaster.HasListeners() {
    61  		// Query the existing data if any, so that after the update we can send events for whatever was changed
    62  		oldData = make(map[st.DataKind]map[string]st.ItemDescriptor)
    63  		for _, kind := range datakinds.AllDataKinds() {
    64  			if items, err := d.store.GetAll(kind); err == nil {
    65  				m := make(map[string]st.ItemDescriptor)
    66  				for _, item := range items {
    67  					m[item.Key] = item.Item
    68  				}
    69  				oldData[kind] = m
    70  			}
    71  		}
    72  	}
    73  
    74  	err := d.store.Init(sortCollectionsForDataStoreInit(allData))
    75  	updated := d.maybeUpdateError(err)
    76  
    77  	if updated {
    78  		// We must always update the dependency graph even if we don't currently have any event listeners, because if
    79  		// listeners are added later, we don't want to have to reread the whole data store to compute the graph
    80  		d.updateDependencyTrackerFromFullDataSet(allData)
    81  
    82  		// Now, if we previously queried the old data because someone is listening for flag change events, compare
    83  		// the versions of all items and generate events for those (and any other items that depend on them)
    84  		if oldData != nil {
    85  			d.sendChangeEvents(d.computeChangedItemsForFullDataSet(oldData, fullDataSetToMap(allData)))
    86  		}
    87  	}
    88  
    89  	return updated
    90  }
    91  
    92  //nolint:revive // no doc comment for standard method
    93  func (d *DataSourceUpdateSinkImpl) Upsert(
    94  	kind st.DataKind,
    95  	key string,
    96  	item st.ItemDescriptor,
    97  ) bool {
    98  	updated, err := d.store.Upsert(kind, key, item)
    99  	didNotGetError := d.maybeUpdateError(err)
   100  
   101  	if updated {
   102  		d.dependencyTracker.updateDependenciesFrom(kind, key, item)
   103  		if d.flagChangeEventBroadcaster.HasListeners() {
   104  			affectedItems := make(kindAndKeySet)
   105  			d.dependencyTracker.addAffectedItems(affectedItems, kindAndKey{kind, key})
   106  			d.sendChangeEvents(affectedItems)
   107  		}
   108  	}
   109  
   110  	return didNotGetError
   111  }
   112  
   113  func (d *DataSourceUpdateSinkImpl) maybeUpdateError(err error) bool {
   114  	if err == nil {
   115  		d.lock.Lock()
   116  		defer d.lock.Unlock()
   117  		d.lastStoreUpdateFailed = false
   118  		return true
   119  	}
   120  
   121  	d.UpdateStatus(
   122  		intf.DataSourceStateInterrupted,
   123  		intf.DataSourceErrorInfo{
   124  			Kind:    intf.DataSourceErrorKindStoreError,
   125  			Message: err.Error(),
   126  			Time:    time.Now(),
   127  		},
   128  	)
   129  
   130  	shouldLog := false
   131  	d.lock.Lock()
   132  	shouldLog = !d.lastStoreUpdateFailed
   133  	d.lastStoreUpdateFailed = true
   134  	d.lock.Unlock()
   135  	if shouldLog {
   136  		d.loggers.Warnf("Unexpected data store error when trying to store an update received from the data source: %s", err)
   137  	}
   138  
   139  	return false
   140  }
   141  
   142  //nolint:revive // no doc comment for standard method
   143  func (d *DataSourceUpdateSinkImpl) UpdateStatus(
   144  	newState intf.DataSourceState,
   145  	newError intf.DataSourceErrorInfo,
   146  ) {
   147  	if newState == "" {
   148  		return
   149  	}
   150  	if statusToBroadcast, changed := d.maybeUpdateStatus(newState, newError); changed {
   151  		d.dataSourceStatusBroadcaster.Broadcast(statusToBroadcast)
   152  	}
   153  }
   154  
   155  func (d *DataSourceUpdateSinkImpl) maybeUpdateStatus(
   156  	newState intf.DataSourceState,
   157  	newError intf.DataSourceErrorInfo,
   158  ) (intf.DataSourceStatus, bool) {
   159  	d.lock.Lock()
   160  	defer d.lock.Unlock()
   161  
   162  	oldStatus := d.currentStatus
   163  
   164  	if newState == intf.DataSourceStateInterrupted && oldStatus.State == intf.DataSourceStateInitializing {
   165  		newState = intf.DataSourceStateInitializing // see comment on DataSourceUpdateSink.UpdateStatus
   166  	}
   167  
   168  	if newState == oldStatus.State && newError.Kind == "" {
   169  		return intf.DataSourceStatus{}, false
   170  	}
   171  
   172  	stateSince := oldStatus.StateSince
   173  	if newState != oldStatus.State {
   174  		stateSince = time.Now()
   175  	}
   176  	lastError := oldStatus.LastError
   177  	if newError.Kind != "" {
   178  		lastError = newError
   179  	}
   180  	d.currentStatus = intf.DataSourceStatus{
   181  		State:      newState,
   182  		StateSince: stateSince,
   183  		LastError:  lastError,
   184  	}
   185  
   186  	d.outageTracker.trackDataSourceState(newState, newError)
   187  
   188  	return d.currentStatus, true
   189  }
   190  
   191  //nolint:revive // no doc comment for standard method
   192  func (d *DataSourceUpdateSinkImpl) GetDataStoreStatusProvider() intf.DataStoreStatusProvider {
   193  	return d.dataStoreStatusProvider
   194  }
   195  
   196  // GetLastStatus is used internally by SDK components.
   197  func (d *DataSourceUpdateSinkImpl) GetLastStatus() intf.DataSourceStatus {
   198  	d.lock.Lock()
   199  	defer d.lock.Unlock()
   200  	return d.currentStatus
   201  }
   202  
   203  func (d *DataSourceUpdateSinkImpl) waitFor(desiredState intf.DataSourceState, timeout time.Duration) bool {
   204  	d.lock.Lock()
   205  	if d.currentStatus.State == desiredState {
   206  		d.lock.Unlock()
   207  		return true
   208  	}
   209  	if d.currentStatus.State == intf.DataSourceStateOff {
   210  		d.lock.Unlock()
   211  		return false
   212  	}
   213  
   214  	statusCh := d.dataSourceStatusBroadcaster.AddListener()
   215  	defer d.dataSourceStatusBroadcaster.RemoveListener(statusCh)
   216  	d.lock.Unlock()
   217  
   218  	var deadline <-chan time.Time
   219  	if timeout > 0 {
   220  		deadline = time.After(timeout)
   221  	}
   222  
   223  	for {
   224  		select {
   225  		case newStatus := <-statusCh:
   226  			if newStatus.State == desiredState {
   227  				return true
   228  			}
   229  			if newStatus.State == intf.DataSourceStateOff {
   230  				return false
   231  			}
   232  		case <-deadline:
   233  			return false
   234  		}
   235  	}
   236  }
   237  
   238  func (d *DataSourceUpdateSinkImpl) sendChangeEvents(affectedItems kindAndKeySet) {
   239  	for item := range affectedItems {
   240  		if item.kind == datakinds.Features {
   241  			d.flagChangeEventBroadcaster.Broadcast(intf.FlagChangeEvent{Key: item.key})
   242  		}
   243  	}
   244  }
   245  
   246  func (d *DataSourceUpdateSinkImpl) updateDependencyTrackerFromFullDataSet(allData []st.Collection) {
   247  	d.dependencyTracker.reset()
   248  	for _, coll := range allData {
   249  		for _, item := range coll.Items {
   250  			d.dependencyTracker.updateDependenciesFrom(coll.Kind, item.Key, item.Item)
   251  		}
   252  	}
   253  }
   254  
   255  func fullDataSetToMap(allData []st.Collection) map[st.DataKind]map[string]st.ItemDescriptor {
   256  	ret := make(map[st.DataKind]map[string]st.ItemDescriptor, len(allData))
   257  	for _, coll := range allData {
   258  		m := make(map[string]st.ItemDescriptor, len(coll.Items))
   259  		for _, item := range coll.Items {
   260  			m[item.Key] = item.Item
   261  		}
   262  		ret[coll.Kind] = m
   263  	}
   264  	return ret
   265  }
   266  
   267  func (d *DataSourceUpdateSinkImpl) computeChangedItemsForFullDataSet(
   268  	oldDataMap map[st.DataKind]map[string]st.ItemDescriptor,
   269  	newDataMap map[st.DataKind]map[string]st.ItemDescriptor,
   270  ) kindAndKeySet {
   271  	affectedItems := make(kindAndKeySet)
   272  	for _, kind := range datakinds.AllDataKinds() {
   273  		oldItems := oldDataMap[kind]
   274  		newItems := newDataMap[kind]
   275  		allKeys := make([]string, 0, len(oldItems)+len(newItems))
   276  		for key := range oldItems {
   277  			allKeys = append(allKeys, key)
   278  		}
   279  		for key := range newItems {
   280  			if _, found := oldItems[key]; !found {
   281  				allKeys = append(allKeys, key)
   282  			}
   283  		}
   284  		for _, key := range allKeys {
   285  			oldItem, haveOld := oldItems[key]
   286  			newItem, haveNew := newItems[key]
   287  			if haveOld || haveNew {
   288  				if !haveOld || !haveNew || oldItem.Version < newItem.Version {
   289  					d.dependencyTracker.addAffectedItems(affectedItems, kindAndKey{kind, key})
   290  				}
   291  			}
   292  		}
   293  	}
   294  	return affectedItems
   295  }
   296  
   297  type outageTracker struct {
   298  	outageLoggingTimeout time.Duration
   299  	loggers              ldlog.Loggers
   300  	inOutage             bool
   301  	errorCounts          map[intf.DataSourceErrorInfo]int
   302  	timeoutCloser        chan struct{}
   303  	lock                 sync.Mutex
   304  }
   305  
   306  func newOutageTracker(outageLoggingTimeout time.Duration, loggers ldlog.Loggers) *outageTracker {
   307  	return &outageTracker{
   308  		outageLoggingTimeout: outageLoggingTimeout,
   309  		loggers:              loggers,
   310  	}
   311  }
   312  
   313  func (o *outageTracker) trackDataSourceState(newState intf.DataSourceState, newError intf.DataSourceErrorInfo) {
   314  	if o.outageLoggingTimeout == 0 {
   315  		return
   316  	}
   317  
   318  	o.lock.Lock()
   319  	defer o.lock.Unlock()
   320  
   321  	if newState == intf.DataSourceStateInterrupted || newError.Kind != "" ||
   322  		(newState == intf.DataSourceStateInitializing && o.inOutage) {
   323  		// We are in a potentially recoverable outage. If that wasn't the case already, and if we've been
   324  		// configured with a timeout for logging the outage at a higher level, schedule that timeout.
   325  		if o.inOutage {
   326  			// We were already in one - just record this latest error for logging later.
   327  			o.recordError(newError)
   328  		} else {
   329  			// We weren't already in one, so set the timeout and start recording errors.
   330  			o.inOutage = true
   331  			o.errorCounts = make(map[intf.DataSourceErrorInfo]int)
   332  			o.recordError(newError)
   333  			o.timeoutCloser = make(chan struct{})
   334  			go o.awaitTimeout(o.timeoutCloser)
   335  		}
   336  	} else {
   337  		if o.timeoutCloser != nil {
   338  			close(o.timeoutCloser)
   339  			o.timeoutCloser = nil
   340  		}
   341  		o.inOutage = false
   342  	}
   343  }
   344  
   345  func (o *outageTracker) recordError(newError intf.DataSourceErrorInfo) {
   346  	// Accumulate how many times each kind of error has occurred during the outage - use just the basic
   347  	// properties as the key so the map won't expand indefinitely
   348  	basicErrorInfo := intf.DataSourceErrorInfo{Kind: newError.Kind, StatusCode: newError.StatusCode}
   349  	o.errorCounts[basicErrorInfo]++
   350  }
   351  
   352  func (o *outageTracker) awaitTimeout(closer chan struct{}) {
   353  	select {
   354  	case <-closer:
   355  		return
   356  	case <-time.After(o.outageLoggingTimeout):
   357  		break
   358  	}
   359  
   360  	o.lock.Lock()
   361  	if !o.inOutage {
   362  		// COVERAGE: there is no way to make this happen in unit tests; it is a very unlikely race condition
   363  		o.lock.Unlock()
   364  		return
   365  	}
   366  	errorsDesc := o.describeErrors()
   367  	o.timeoutCloser = nil
   368  	o.lock.Unlock()
   369  
   370  	o.loggers.Errorf(
   371  		"LaunchDarkly data source outage - updates have been unavailable for at least %s with the following errors: %s",
   372  		o.outageLoggingTimeout,
   373  		errorsDesc,
   374  	)
   375  }
   376  
   377  func (o *outageTracker) describeErrors() string {
   378  	ret := ""
   379  	for err, count := range o.errorCounts {
   380  		if ret != "" {
   381  			ret += ", "
   382  		}
   383  		times := "times"
   384  		if count == 1 {
   385  			times = "time"
   386  		}
   387  		ret += fmt.Sprintf("%s (%d %s)", err, count, times)
   388  	}
   389  	return ret
   390  }
   391  

View as plain text