...

Source file src/github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoreimpl/big_segment_store_wrapper.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoreimpl

     1  package ldstoreimpl
     2  
     3  import (
     4  	"sync"
     5  	"time"
     6  
     7  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
     8  	"github.com/launchdarkly/go-sdk-common/v3/ldreason"
     9  	"github.com/launchdarkly/go-sdk-common/v3/ldtime"
    10  	ldeval "github.com/launchdarkly/go-server-sdk-evaluation/v2"
    11  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
    12  	"github.com/launchdarkly/go-server-sdk/v6/internal/bigsegments"
    13  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    14  
    15  	"github.com/launchdarkly/ccache"
    16  
    17  	"golang.org/x/sync/singleflight"
    18  )
    19  
    20  // BigSegmentStoreWrapper is a component that adds status polling and caching to a BigSegmentStore,
    21  // and provides integration with the evaluation engine.
    22  //
    23  // This component is exposed in the SDK's public API because it needs to be used by the LaunchDarkly
    24  // Relay Proxy as well (or any other component that is calling the evaluation engine in
    25  // go-server-sdk-evaluation directly and needs to provide the same Big Segment functionality as the
    26  // SDK). It implements the BigSegmentProvider interface that go-server-sdk-evaluation uses to query
    27  // Big Segments (similar to how ldstoreimpl.NewDataStoreEvaluatorDataProvider provides an
    28  // implementation of the DataProvider interface). It is also responsible for caching membership
    29  // queries and polling the store's metadata to make sure it is not stale.
    30  //
    31  // To avoid unnecessarily exposing implementation details that are subject to change, this type
    32  // should not have any public methods that are not strictly necessary for its use by the SDK and by
    33  // the Relay Proxy.
    34  type BigSegmentStoreWrapper struct {
    35  	store          subsystems.BigSegmentStore
    36  	statusUpdateFn func(interfaces.BigSegmentStoreStatus)
    37  	staleTime      time.Duration
    38  	contextCache   *ccache.Cache
    39  	cacheTTL       time.Duration
    40  	pollInterval   time.Duration
    41  	haveStatus     bool
    42  	lastStatus     interfaces.BigSegmentStoreStatus
    43  	requests       singleflight.Group
    44  	pollCloser     chan struct{}
    45  	pollingActive  bool
    46  	loggers        ldlog.Loggers
    47  	lock           sync.RWMutex
    48  }
    49  
    50  // NewBigSegmentStoreWrapperWithConfig creates a BigSegmentStoreWrapper.
    51  //
    52  // It also immediately begins polling the store status, unless config.StatusPollingInitiallyPaused
    53  // is true.
    54  //
    55  // The BigSegmentStoreWrapper takes ownership of the BigSegmentStore's lifecycle at this point, so
    56  // calling Close on the BigSegmentStoreWrapper will also close the store.
    57  //
    58  // If not nil, statusUpdateFn will be called whenever the store status has changed.
    59  func NewBigSegmentStoreWrapperWithConfig(
    60  	config BigSegmentsConfigurationProperties,
    61  	statusUpdateFn func(interfaces.BigSegmentStoreStatus),
    62  	loggers ldlog.Loggers,
    63  ) *BigSegmentStoreWrapper {
    64  	pollCloser := make(chan struct{})
    65  	w := &BigSegmentStoreWrapper{
    66  		store:          config.Store,
    67  		statusUpdateFn: statusUpdateFn,
    68  		staleTime:      config.StaleAfter,
    69  		contextCache:   ccache.New(ccache.Configure().MaxSize(int64(config.ContextCacheSize))),
    70  		cacheTTL:       config.ContextCacheTime,
    71  		pollInterval:   config.StatusPollInterval,
    72  		pollCloser:     pollCloser,
    73  		pollingActive:  config.StartPolling,
    74  		loggers:        loggers,
    75  	}
    76  
    77  	if config.StartPolling {
    78  		go w.runPollTask(config.StatusPollInterval, pollCloser)
    79  	}
    80  
    81  	return w
    82  }
    83  
    84  // Close shuts down the manager, the store, and the polling task.
    85  func (w *BigSegmentStoreWrapper) Close() {
    86  	w.lock.Lock()
    87  	if w.pollCloser != nil {
    88  		close(w.pollCloser)
    89  		w.pollCloser = nil
    90  	}
    91  	if w.contextCache != nil {
    92  		w.contextCache.Stop()
    93  		w.contextCache = nil
    94  	}
    95  	w.lock.Unlock()
    96  
    97  	_ = w.store.Close()
    98  }
    99  
   100  // GetMembership is called by the evaluator when it needs to get the Big Segment membership
   101  // state for an evaluation context.
   102  //
   103  // If there is a cached membership state for the context, it returns the cached state. Otherwise,
   104  // it converts the context key into the hash string used by the BigSegmentStore, queries the store,
   105  // and caches the result. The returned status value indicates whether the query succeeded, and
   106  // whether the result (regardless of whether it was from a new query or the cache) should be
   107  // considered "stale".
   108  //
   109  // We do not need to know the context kind, because each big segment can only be for one kind.
   110  // Thus, if the memberships for context key "x" include segments A and B, it is OK if segment A
   111  // is referring to the context {"kind": "user", "key": x"} while segment B is referring to the
   112  // context {"kind": "org", "key": "x"}; even though those are two different contexts, there is
   113  // no ambiguity when it comes to checking against either of those segments.
   114  func (w *BigSegmentStoreWrapper) GetMembership(
   115  	contextKey string,
   116  ) (ldeval.BigSegmentMembership, ldreason.BigSegmentsStatus) {
   117  	entry := w.safeCacheGet(contextKey)
   118  	var result ldeval.BigSegmentMembership
   119  	if entry == nil || entry.Expired() {
   120  		// Use singleflight to ensure that we'll only do this query once even if multiple goroutines are
   121  		// requesting it
   122  		value, err, _ := w.requests.Do(contextKey, func() (interface{}, error) {
   123  			hash := bigsegments.HashForContextKey(contextKey)
   124  			w.loggers.Debugf("querying Big Segment state for context hash %q", hash)
   125  			return w.store.GetMembership(hash)
   126  		})
   127  		if err != nil {
   128  			w.loggers.Errorf("Big Segment store returned error: %s", err)
   129  			return nil, ldreason.BigSegmentsStoreError
   130  		}
   131  		if value == nil {
   132  			w.safeCacheSet(contextKey, nil, w.cacheTTL) // we cache the "not found" status
   133  			return nil, ldreason.BigSegmentsHealthy
   134  		}
   135  		if membership, ok := value.(subsystems.BigSegmentMembership); ok {
   136  			w.safeCacheSet(contextKey, membership, w.cacheTTL)
   137  			result = membership
   138  		} else {
   139  			w.loggers.Error("BigSegmentStoreWrapper got wrong value type from request - this should not be possible")
   140  			return nil, ldreason.BigSegmentsStoreError
   141  		}
   142  	} else if entry.Value() != nil { // nil is a cached "not found" state
   143  		if membership, ok := entry.Value().(subsystems.BigSegmentMembership); ok {
   144  			result = membership
   145  		} else {
   146  			w.loggers.Error("BigSegmentStoreWrapper got wrong value type from cache - this should not be possible")
   147  			return nil, ldreason.BigSegmentsStoreError // COVERAGE: can't cause this condition in unit tests
   148  		}
   149  	}
   150  
   151  	status := ldreason.BigSegmentsHealthy
   152  	if w.GetStatus().Stale {
   153  		status = ldreason.BigSegmentsStale
   154  	}
   155  
   156  	return result, status
   157  }
   158  
   159  // GetStatus returns a BigSegmentStoreStatus describing whether the store seems to be available
   160  // (that is, the last query to it did not return an error) and whether it is stale (that is, the last
   161  // known update time is too far in the past).
   162  //
   163  // If we have not yet obtained that information (the poll task has not executed yet), then this method
   164  // immediately does a metadata query and waits for it to succeed or fail. This means that if an
   165  // application using Big Segments evaluates a feature flag immediately after creating the SDK
   166  // client, before the first status poll has happened, that evaluation may block for however long it
   167  // takes to query the store.
   168  func (w *BigSegmentStoreWrapper) GetStatus() interfaces.BigSegmentStoreStatus {
   169  	w.lock.RLock()
   170  	status := w.lastStatus
   171  	haveStatus := w.haveStatus
   172  	w.lock.RUnlock()
   173  
   174  	if haveStatus {
   175  		return status
   176  	}
   177  
   178  	return w.pollStoreAndUpdateStatus()
   179  }
   180  
   181  // ClearCache invalidates the cache of per-context Big Segment state, so subsequent queries will get
   182  // the latest data.
   183  //
   184  // This is used by the Relay Proxy, but is not currently used by the SDK otherwise.
   185  func (w *BigSegmentStoreWrapper) ClearCache() {
   186  	w.lock.Lock()
   187  	if w.contextCache != nil {
   188  		w.contextCache.Clear()
   189  	}
   190  	w.lock.Unlock()
   191  	w.loggers.Debug("invalidated cache")
   192  }
   193  
   194  // SetPollingActive switches the polling task on or off.
   195  //
   196  // This is used by the Relay Proxy, but is not currently used by the SDK otherwise.
   197  func (w *BigSegmentStoreWrapper) SetPollingActive(active bool) {
   198  	w.lock.Lock()
   199  	defer w.lock.Unlock()
   200  	if w.pollingActive != active {
   201  		w.pollingActive = active
   202  		if active {
   203  			w.pollCloser = make(chan struct{})
   204  			go w.runPollTask(w.pollInterval, w.pollCloser)
   205  		} else if w.pollCloser != nil {
   206  			close(w.pollCloser)
   207  			w.pollCloser = nil
   208  		}
   209  		w.loggers.Debugf("setting status polling to %t", active)
   210  	}
   211  }
   212  
   213  func (w *BigSegmentStoreWrapper) pollStoreAndUpdateStatus() interfaces.BigSegmentStoreStatus {
   214  	var newStatus interfaces.BigSegmentStoreStatus
   215  	w.loggers.Debug("querying Big Segment store metadata")
   216  	metadata, err := w.store.GetMetadata()
   217  
   218  	w.lock.Lock()
   219  	if err == nil {
   220  		newStatus.Available = true
   221  		newStatus.Stale = w.isStale(metadata.LastUpToDate)
   222  		w.loggers.Debugf("Big Segment store was last updated at %d", metadata.LastUpToDate)
   223  	} else {
   224  		w.loggers.Errorf("Big Segment store status query returned error: %s", err)
   225  		newStatus.Available = false
   226  	}
   227  	oldStatus := w.lastStatus
   228  	w.lastStatus = newStatus
   229  	hadStatus := w.haveStatus
   230  	w.haveStatus = true
   231  	w.lock.Unlock()
   232  
   233  	if !hadStatus || (newStatus != oldStatus) {
   234  		w.loggers.Debugf(
   235  			"Big Segment store status has changed from %+v to %+v",
   236  			oldStatus,
   237  			newStatus,
   238  		)
   239  		if w.statusUpdateFn != nil {
   240  			w.statusUpdateFn(newStatus)
   241  		}
   242  	}
   243  
   244  	return newStatus
   245  }
   246  
   247  func (w *BigSegmentStoreWrapper) isStale(updateTime ldtime.UnixMillisecondTime) bool {
   248  	age := time.Duration(uint64(ldtime.UnixMillisNow())-uint64(updateTime)) * time.Millisecond
   249  	return age >= w.staleTime
   250  }
   251  
   252  func (w *BigSegmentStoreWrapper) runPollTask(pollInterval time.Duration, pollCloser <-chan struct{}) {
   253  	if pollInterval > w.staleTime {
   254  		pollInterval = w.staleTime // COVERAGE: not really unit-testable due to scheduling indeterminacy
   255  	}
   256  	ticker := time.NewTicker(pollInterval)
   257  	for {
   258  		select {
   259  		case <-pollCloser:
   260  			ticker.Stop()
   261  			return
   262  		case <-ticker.C:
   263  			_ = w.pollStoreAndUpdateStatus()
   264  		}
   265  	}
   266  }
   267  
   268  // safeCacheGet and safeCacheSet are necessary because trying to use a ccache.Cache after it's been shut
   269  // down can cause a panic, so we nil it out on Close() and guard it with our lock.
   270  func (w *BigSegmentStoreWrapper) safeCacheGet(key string) *ccache.Item {
   271  	var ret *ccache.Item
   272  	w.lock.RLock()
   273  	if w.contextCache != nil {
   274  		ret = w.contextCache.Get(key)
   275  	}
   276  	w.lock.RUnlock()
   277  	return ret
   278  }
   279  
   280  func (w *BigSegmentStoreWrapper) safeCacheSet(key string, value interface{}, ttl time.Duration) {
   281  	w.lock.RLock()
   282  	if w.contextCache != nil {
   283  		w.contextCache.Set(key, value, ttl)
   284  	}
   285  	w.lock.RUnlock()
   286  }
   287  

View as plain text