...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datastore/persistent_data_store_wrapper.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datastore

     1  package datastore
     2  
     3  import (
     4  	"fmt"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
     9  	"github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
    10  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    11  	st "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
    12  
    13  	"github.com/patrickmn/go-cache"
    14  	"golang.org/x/exp/slices"
    15  	"golang.org/x/sync/singleflight"
    16  )
    17  
    18  // persistentDataStoreWrapper is the implementation of DataStore that we use for all persistent data stores.
    19  type persistentDataStoreWrapper struct {
    20  	core             subsystems.PersistentDataStore
    21  	dataStoreUpdates subsystems.DataStoreUpdateSink
    22  	statusPoller     *dataStoreStatusPoller
    23  	cache            *cache.Cache
    24  	cacheTTL         time.Duration
    25  	requests         singleflight.Group
    26  	loggers          ldlog.Loggers
    27  	inited           bool
    28  	initLock         sync.RWMutex
    29  }
    30  
    31  const initCheckedKey = "$initChecked"
    32  
    33  // NewPersistentDataStoreWrapper creates the implementation of DataStore that we use for all persistent data
    34  // stores. This is not visible in the public API; it is always called through ldcomponents.PersistentDataStore().
    35  func NewPersistentDataStoreWrapper(
    36  	core subsystems.PersistentDataStore,
    37  	dataStoreUpdates subsystems.DataStoreUpdateSink,
    38  	cacheTTL time.Duration,
    39  	loggers ldlog.Loggers,
    40  ) subsystems.DataStore {
    41  	var myCache *cache.Cache
    42  	if cacheTTL != 0 {
    43  		myCache = cache.New(cacheTTL, 5*time.Minute)
    44  		// Note that the documented behavior of go-cache is that if cacheTTL is negative, the
    45  		// cache never expires. That is consistent with we've defined the parameter.
    46  	}
    47  
    48  	w := &persistentDataStoreWrapper{
    49  		core:             core,
    50  		dataStoreUpdates: dataStoreUpdates,
    51  		cache:            myCache,
    52  		cacheTTL:         cacheTTL,
    53  		loggers:          loggers,
    54  	}
    55  
    56  	w.statusPoller = newDataStoreStatusPoller(
    57  		true,
    58  		w.pollAvailabilityAfterOutage,
    59  		dataStoreUpdates.UpdateStatus,
    60  		myCache == nil || cacheTTL > 0, // needsRefresh=true unless we're in infinite cache mode
    61  		loggers,
    62  	)
    63  
    64  	return w
    65  }
    66  
    67  func (w *persistentDataStoreWrapper) Init(allData []st.Collection) error {
    68  	err := w.initCore(allData)
    69  	if w.cache != nil {
    70  		w.cache.Flush()
    71  	}
    72  	if err != nil && !w.hasInfiniteCache() {
    73  		// If the underlying store failed to do the update, and we've got an expiring cache, then:
    74  		// 1) We shouldn't update the cache, and
    75  		// 2) We shouldn't be considered initialized.
    76  		// The rationale is that it's better to stay in a consistent state of having old data than to act
    77  		// like we have new data, but then suddenly fall back to old data when the cache expires.
    78  		return err
    79  	}
    80  	// However, if the cache TTL is infinite, then it makes sense to update the cache regardless of the
    81  	// initialization result of the underlying store.
    82  	if w.cache != nil {
    83  		for _, coll := range allData {
    84  			w.cacheItems(coll.Kind, coll.Items)
    85  		}
    86  	}
    87  	w.initLock.Lock()
    88  	defer w.initLock.Unlock()
    89  	w.inited = true
    90  	return err
    91  }
    92  
    93  func (w *persistentDataStoreWrapper) Get(kind st.DataKind, key string) (st.ItemDescriptor, error) {
    94  	if w.cache == nil {
    95  		item, err := w.getAndDeserializeItem(kind, key)
    96  		w.processError(err)
    97  		return item, err
    98  	}
    99  	cacheKey := dataStoreCacheKey(kind, key)
   100  	if data, present := w.cache.Get(cacheKey); present {
   101  		if item, ok := data.(st.ItemDescriptor); ok {
   102  			return item, nil
   103  		}
   104  	}
   105  	// Item was not cached or cached value was not valid. Use singleflight to ensure that we'll only
   106  	// do this core query once even if multiple goroutines are requesting it
   107  	reqKey := fmt.Sprintf("get:%s:%s", kind.GetName(), key)
   108  	itemIntf, err, _ := w.requests.Do(reqKey, func() (interface{}, error) {
   109  		item, err := w.getAndDeserializeItem(kind, key)
   110  		w.processError(err)
   111  		if err == nil {
   112  			w.cache.Set(cacheKey, item, cache.DefaultExpiration)
   113  			return item, nil
   114  		}
   115  		return nil, err
   116  	})
   117  	if err != nil || itemIntf == nil {
   118  		return st.ItemDescriptor{}.NotFound(), err
   119  	}
   120  	if item, ok := itemIntf.(st.ItemDescriptor); ok { // singleflight.Group.Do returns value as interface{}
   121  		return item, err
   122  	}
   123  	w.loggers.Errorf("data store query returned unexpected type %T", itemIntf)
   124  	// COVERAGE: there is no way to simulate this condition in unit tests; it should be impossible
   125  	return st.ItemDescriptor{}.NotFound(), nil
   126  }
   127  
   128  func (w *persistentDataStoreWrapper) GetAll(kind st.DataKind) ([]st.KeyedItemDescriptor, error) {
   129  	if w.cache == nil {
   130  		items, err := w.getAllAndDeserialize(kind)
   131  		w.processError(err)
   132  		return items, err
   133  	}
   134  	// Check whether we have a cache item for the entire data set
   135  	cacheKey := dataStoreAllItemsCacheKey(kind)
   136  	if data, present := w.cache.Get(cacheKey); present {
   137  		if items, ok := data.([]st.KeyedItemDescriptor); ok {
   138  			return items, nil
   139  		}
   140  	}
   141  	// Data set was not cached or cached value was not valid. Use singleflight to ensure that we'll only
   142  	// do this core query once even if multiple goroutines are requesting it
   143  	reqKey := fmt.Sprintf("all:%s", kind.GetName())
   144  	itemsIntf, err, _ := w.requests.Do(reqKey, func() (interface{}, error) {
   145  		items, err := w.getAllAndDeserialize(kind)
   146  		w.processError(err)
   147  		if err == nil {
   148  			w.cache.Set(cacheKey, items, cache.DefaultExpiration)
   149  			return items, nil
   150  		}
   151  		return nil, err
   152  	})
   153  	if err != nil {
   154  		return nil, err
   155  	}
   156  	if items, ok := itemsIntf.([]st.KeyedItemDescriptor); ok { // singleflight.Group.Do returns value as interface{}
   157  		return items, err
   158  	}
   159  	w.loggers.Errorf("data store query returned unexpected type %T", itemsIntf)
   160  	// COVERAGE: there is no way to simulate this condition in unit tests; it should be impossible
   161  	return nil, nil
   162  }
   163  
   164  func (w *persistentDataStoreWrapper) Upsert(
   165  	kind st.DataKind,
   166  	key string,
   167  	newItem st.ItemDescriptor,
   168  ) (bool, error) {
   169  	serializedItem := w.serialize(kind, newItem)
   170  	updated, err := w.core.Upsert(kind, key, serializedItem)
   171  	w.processError(err)
   172  	// Normally, if the underlying store failed to do the update, we do not want to update the cache -
   173  	// the idea being that it's better to stay in a consistent state of having old data than to act
   174  	// like we have new data but then suddenly fall back to old data when the cache expires. However,
   175  	// if the cache TTL is infinite, then it makes sense to update the cache always.
   176  	if err != nil {
   177  		if !w.hasInfiniteCache() {
   178  			return updated, err
   179  		}
   180  	}
   181  	if w.cache != nil {
   182  		cacheKey := dataStoreCacheKey(kind, key)
   183  		allCacheKey := dataStoreAllItemsCacheKey(kind)
   184  		if err == nil {
   185  			if updated {
   186  				w.cache.Set(cacheKey, newItem, cache.DefaultExpiration)
   187  				// If the cache has a finite TTL, then we should remove the "all items" cache entry to force
   188  				// a reread the next time All is called. However, if it's an infinite TTL, we need to just
   189  				// update the item within the existing "all items" entry (since we want things to still work
   190  				// even if the underlying store is unavailable).
   191  				if w.hasInfiniteCache() {
   192  					if data, present := w.cache.Get(allCacheKey); present {
   193  						if items, ok := data.([]st.KeyedItemDescriptor); ok {
   194  							w.cache.Set(allCacheKey, updateSingleItem(items, key, newItem), cache.DefaultExpiration)
   195  						}
   196  					}
   197  				} else {
   198  					w.cache.Delete(allCacheKey)
   199  				}
   200  			} else {
   201  				// there was a concurrent modification elsewhere - update the cache to get the new state
   202  				w.cache.Delete(cacheKey)
   203  				w.cache.Delete(allCacheKey)
   204  				_, _ = w.Get(kind, key) // doing this query repopulates the cache
   205  			}
   206  		} else {
   207  			// The underlying store returned an error. If the cache has an infinite TTL, then we should go
   208  			// ahead and update the cache so that it always has the latest data; we may be able to use the
   209  			// cached data to repopulate the store later if it starts working again.
   210  			if w.hasInfiniteCache() {
   211  				w.cache.Set(cacheKey, newItem, cache.DefaultExpiration)
   212  				cachedItems := []st.KeyedItemDescriptor{}
   213  				if data, present := w.cache.Get(allCacheKey); present {
   214  					if items, ok := data.([]st.KeyedItemDescriptor); ok {
   215  						cachedItems = items
   216  					}
   217  				}
   218  				w.cache.Set(allCacheKey, updateSingleItem(cachedItems, key, newItem), cache.DefaultExpiration)
   219  			}
   220  		}
   221  	}
   222  	return updated, err
   223  }
   224  
   225  func (w *persistentDataStoreWrapper) IsInitialized() bool {
   226  	w.initLock.RLock()
   227  	previousValue := w.inited
   228  	w.initLock.RUnlock()
   229  	if previousValue {
   230  		return true
   231  	}
   232  
   233  	if w.cache != nil {
   234  		if _, found := w.cache.Get(initCheckedKey); found {
   235  			return false
   236  		}
   237  	}
   238  
   239  	newValue := w.core.IsInitialized()
   240  	if newValue {
   241  		w.initLock.Lock()
   242  		defer w.initLock.Unlock()
   243  		w.inited = true
   244  		if w.cache != nil {
   245  			w.cache.Delete(initCheckedKey)
   246  		}
   247  	} else if w.cache != nil {
   248  		w.cache.Set(initCheckedKey, "", cache.DefaultExpiration)
   249  	}
   250  	return newValue
   251  }
   252  
   253  func (w *persistentDataStoreWrapper) IsStatusMonitoringEnabled() bool {
   254  	return true
   255  }
   256  
   257  func (w *persistentDataStoreWrapper) Close() error {
   258  	w.statusPoller.Close()
   259  	return w.core.Close()
   260  }
   261  
   262  func (w *persistentDataStoreWrapper) pollAvailabilityAfterOutage() bool {
   263  	if !w.core.IsStoreAvailable() {
   264  		return false
   265  	}
   266  	if w.hasInfiniteCache() {
   267  		// If we're in infinite cache mode, then we can assume the cache has a full set of current
   268  		// flag data (since presumably the data source has still been running) and we can just
   269  		// write the contents of the cache to the underlying data store.
   270  		kinds := datakinds.AllDataKinds()
   271  		allData := make([]st.Collection, 0, len(kinds))
   272  		for _, kind := range kinds {
   273  			allCacheKey := dataStoreAllItemsCacheKey(kind)
   274  			if data, present := w.cache.Get(allCacheKey); present {
   275  				if items, ok := data.([]st.KeyedItemDescriptor); ok {
   276  					allData = append(allData, st.Collection{Kind: kind, Items: items})
   277  				}
   278  			}
   279  		}
   280  		err := w.initCore(allData)
   281  		if err != nil {
   282  			// We failed to write the cached data to the underlying store. In this case,
   283  			// w.initCore() has already put us back into the failed state. The only further
   284  			// thing we can do is to log a note about what just happened.
   285  			w.loggers.Errorf("Tried to write cached data to persistent store after a store outage, but failed: %s", err)
   286  		} else {
   287  			w.loggers.Warn("Successfully updated persistent store from cached data")
   288  			// Note that w.inited should have already been set when InitInternal was originally called -
   289  			// in infinite cache mode, we set it even if the database update failed.
   290  		}
   291  	}
   292  	return true
   293  }
   294  
   295  func (w *persistentDataStoreWrapper) hasInfiniteCache() bool {
   296  	return w.cache != nil && w.cacheTTL < 0
   297  }
   298  func dataStoreCacheKey(kind st.DataKind, key string) string {
   299  	return kind.GetName() + ":" + key
   300  }
   301  
   302  func dataStoreAllItemsCacheKey(kind st.DataKind) string {
   303  	return "all:" + kind.GetName()
   304  }
   305  
   306  func (w *persistentDataStoreWrapper) initCore(allData []st.Collection) error {
   307  	serializedAllData := make([]st.SerializedCollection, 0, len(allData))
   308  	for _, coll := range allData {
   309  		serializedAllData = append(serializedAllData, st.SerializedCollection{
   310  			Kind:  coll.Kind,
   311  			Items: w.serializeAll(coll.Kind, coll.Items),
   312  		})
   313  	}
   314  	err := w.core.Init(serializedAllData)
   315  	w.processError(err)
   316  	return err
   317  }
   318  
   319  func (w *persistentDataStoreWrapper) getAndDeserializeItem(
   320  	kind st.DataKind,
   321  	key string,
   322  ) (st.ItemDescriptor, error) {
   323  	serializedItem, err := w.core.Get(kind, key)
   324  	if err == nil {
   325  		return w.deserialize(kind, serializedItem)
   326  	}
   327  	return st.ItemDescriptor{}.NotFound(), err
   328  }
   329  
   330  func (w *persistentDataStoreWrapper) getAllAndDeserialize(
   331  	kind st.DataKind,
   332  ) ([]st.KeyedItemDescriptor, error) {
   333  	serializedItems, err := w.core.GetAll(kind)
   334  	if err == nil {
   335  		ret := make([]st.KeyedItemDescriptor, 0, len(serializedItems))
   336  		for _, serializedItem := range serializedItems {
   337  			item, err := w.deserialize(kind, serializedItem.Item)
   338  			if err != nil {
   339  				return nil, err
   340  			}
   341  			ret = append(ret, st.KeyedItemDescriptor{Key: serializedItem.Key, Item: item})
   342  		}
   343  		return ret, nil
   344  	}
   345  	return nil, err
   346  }
   347  
   348  func (w *persistentDataStoreWrapper) cacheItems(
   349  	kind st.DataKind,
   350  	items []st.KeyedItemDescriptor,
   351  ) {
   352  	if w.cache != nil {
   353  		copyOfItems := slices.Clone(items)
   354  		w.cache.Set(dataStoreAllItemsCacheKey(kind), copyOfItems, cache.DefaultExpiration)
   355  
   356  		for _, item := range items {
   357  			w.cache.Set(dataStoreCacheKey(kind, item.Key), item.Item, cache.DefaultExpiration)
   358  		}
   359  	}
   360  }
   361  
   362  func (w *persistentDataStoreWrapper) serialize(
   363  	kind st.DataKind,
   364  	item st.ItemDescriptor,
   365  ) st.SerializedItemDescriptor {
   366  	isDeleted := item.Item == nil
   367  	return st.SerializedItemDescriptor{
   368  		Version:        item.Version,
   369  		Deleted:        isDeleted,
   370  		SerializedItem: kind.Serialize(item),
   371  	}
   372  }
   373  
   374  func (w *persistentDataStoreWrapper) serializeAll(
   375  	kind st.DataKind,
   376  	items []st.KeyedItemDescriptor,
   377  ) []st.KeyedSerializedItemDescriptor {
   378  	ret := make([]st.KeyedSerializedItemDescriptor, 0, len(items))
   379  	for _, item := range items {
   380  		ret = append(ret, st.KeyedSerializedItemDescriptor{
   381  			Key:  item.Key,
   382  			Item: w.serialize(kind, item.Item),
   383  		})
   384  	}
   385  	return ret
   386  }
   387  
   388  func (w *persistentDataStoreWrapper) deserialize(
   389  	kind st.DataKind,
   390  	serializedItemDesc st.SerializedItemDescriptor,
   391  ) (st.ItemDescriptor, error) {
   392  	if serializedItemDesc.Deleted || serializedItemDesc.SerializedItem == nil {
   393  		return st.ItemDescriptor{Version: serializedItemDesc.Version}, nil
   394  	}
   395  	deserializedItemDesc, err := kind.Deserialize(serializedItemDesc.SerializedItem)
   396  	if err != nil {
   397  		return st.ItemDescriptor{}.NotFound(), err
   398  	}
   399  	if serializedItemDesc.Version == 0 || serializedItemDesc.Version == deserializedItemDesc.Version {
   400  		return deserializedItemDesc, nil
   401  	}
   402  	// If the store gave us a version number that isn't what was encoded in the object, trust it
   403  	return st.ItemDescriptor{Version: serializedItemDesc.Version, Item: deserializedItemDesc.Item}, nil
   404  }
   405  
   406  func updateSingleItem(
   407  	items []st.KeyedItemDescriptor,
   408  	key string,
   409  	newItem st.ItemDescriptor,
   410  ) []st.KeyedItemDescriptor {
   411  	found := false
   412  	ret := make([]st.KeyedItemDescriptor, 0, len(items))
   413  	for _, item := range items {
   414  		if item.Key == key {
   415  			ret = append(ret, st.KeyedItemDescriptor{Key: key, Item: newItem})
   416  			found = true
   417  		} else {
   418  			ret = append(ret, item)
   419  		}
   420  	}
   421  	if !found {
   422  		ret = append(ret, st.KeyedItemDescriptor{Key: key, Item: newItem})
   423  	}
   424  	return ret
   425  }
   426  
   427  func (w *persistentDataStoreWrapper) processError(err error) {
   428  	if err == nil {
   429  		// If we're waiting to recover after a failure, we'll let the polling routine take care
   430  		// of signaling success. Even if we could signal success a little earlier based on the
   431  		// success of whatever operation we just did, we'd rather avoid the overhead of acquiring
   432  		// w.statusLock every time we do anything. So we'll just do nothing here.
   433  		return
   434  	}
   435  	w.loggers.Errorf("Data store returned error: %s", err.Error())
   436  	w.statusPoller.UpdateAvailability(false)
   437  }
   438  

View as plain text