1 package datastore
2
3 import (
4 "fmt"
5 "sync"
6 "time"
7
8 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
9 "github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
10 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
11 st "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
12
13 "github.com/patrickmn/go-cache"
14 "golang.org/x/exp/slices"
15 "golang.org/x/sync/singleflight"
16 )
17
18
19 type persistentDataStoreWrapper struct {
20 core subsystems.PersistentDataStore
21 dataStoreUpdates subsystems.DataStoreUpdateSink
22 statusPoller *dataStoreStatusPoller
23 cache *cache.Cache
24 cacheTTL time.Duration
25 requests singleflight.Group
26 loggers ldlog.Loggers
27 inited bool
28 initLock sync.RWMutex
29 }
30
31 const initCheckedKey = "$initChecked"
32
33
34
35 func NewPersistentDataStoreWrapper(
36 core subsystems.PersistentDataStore,
37 dataStoreUpdates subsystems.DataStoreUpdateSink,
38 cacheTTL time.Duration,
39 loggers ldlog.Loggers,
40 ) subsystems.DataStore {
41 var myCache *cache.Cache
42 if cacheTTL != 0 {
43 myCache = cache.New(cacheTTL, 5*time.Minute)
44
45
46 }
47
48 w := &persistentDataStoreWrapper{
49 core: core,
50 dataStoreUpdates: dataStoreUpdates,
51 cache: myCache,
52 cacheTTL: cacheTTL,
53 loggers: loggers,
54 }
55
56 w.statusPoller = newDataStoreStatusPoller(
57 true,
58 w.pollAvailabilityAfterOutage,
59 dataStoreUpdates.UpdateStatus,
60 myCache == nil || cacheTTL > 0,
61 loggers,
62 )
63
64 return w
65 }
66
67 func (w *persistentDataStoreWrapper) Init(allData []st.Collection) error {
68 err := w.initCore(allData)
69 if w.cache != nil {
70 w.cache.Flush()
71 }
72 if err != nil && !w.hasInfiniteCache() {
73
74
75
76
77
78 return err
79 }
80
81
82 if w.cache != nil {
83 for _, coll := range allData {
84 w.cacheItems(coll.Kind, coll.Items)
85 }
86 }
87 w.initLock.Lock()
88 defer w.initLock.Unlock()
89 w.inited = true
90 return err
91 }
92
93 func (w *persistentDataStoreWrapper) Get(kind st.DataKind, key string) (st.ItemDescriptor, error) {
94 if w.cache == nil {
95 item, err := w.getAndDeserializeItem(kind, key)
96 w.processError(err)
97 return item, err
98 }
99 cacheKey := dataStoreCacheKey(kind, key)
100 if data, present := w.cache.Get(cacheKey); present {
101 if item, ok := data.(st.ItemDescriptor); ok {
102 return item, nil
103 }
104 }
105
106
107 reqKey := fmt.Sprintf("get:%s:%s", kind.GetName(), key)
108 itemIntf, err, _ := w.requests.Do(reqKey, func() (interface{}, error) {
109 item, err := w.getAndDeserializeItem(kind, key)
110 w.processError(err)
111 if err == nil {
112 w.cache.Set(cacheKey, item, cache.DefaultExpiration)
113 return item, nil
114 }
115 return nil, err
116 })
117 if err != nil || itemIntf == nil {
118 return st.ItemDescriptor{}.NotFound(), err
119 }
120 if item, ok := itemIntf.(st.ItemDescriptor); ok {
121 return item, err
122 }
123 w.loggers.Errorf("data store query returned unexpected type %T", itemIntf)
124
125 return st.ItemDescriptor{}.NotFound(), nil
126 }
127
128 func (w *persistentDataStoreWrapper) GetAll(kind st.DataKind) ([]st.KeyedItemDescriptor, error) {
129 if w.cache == nil {
130 items, err := w.getAllAndDeserialize(kind)
131 w.processError(err)
132 return items, err
133 }
134
135 cacheKey := dataStoreAllItemsCacheKey(kind)
136 if data, present := w.cache.Get(cacheKey); present {
137 if items, ok := data.([]st.KeyedItemDescriptor); ok {
138 return items, nil
139 }
140 }
141
142
143 reqKey := fmt.Sprintf("all:%s", kind.GetName())
144 itemsIntf, err, _ := w.requests.Do(reqKey, func() (interface{}, error) {
145 items, err := w.getAllAndDeserialize(kind)
146 w.processError(err)
147 if err == nil {
148 w.cache.Set(cacheKey, items, cache.DefaultExpiration)
149 return items, nil
150 }
151 return nil, err
152 })
153 if err != nil {
154 return nil, err
155 }
156 if items, ok := itemsIntf.([]st.KeyedItemDescriptor); ok {
157 return items, err
158 }
159 w.loggers.Errorf("data store query returned unexpected type %T", itemsIntf)
160
161 return nil, nil
162 }
163
164 func (w *persistentDataStoreWrapper) Upsert(
165 kind st.DataKind,
166 key string,
167 newItem st.ItemDescriptor,
168 ) (bool, error) {
169 serializedItem := w.serialize(kind, newItem)
170 updated, err := w.core.Upsert(kind, key, serializedItem)
171 w.processError(err)
172
173
174
175
176 if err != nil {
177 if !w.hasInfiniteCache() {
178 return updated, err
179 }
180 }
181 if w.cache != nil {
182 cacheKey := dataStoreCacheKey(kind, key)
183 allCacheKey := dataStoreAllItemsCacheKey(kind)
184 if err == nil {
185 if updated {
186 w.cache.Set(cacheKey, newItem, cache.DefaultExpiration)
187
188
189
190
191 if w.hasInfiniteCache() {
192 if data, present := w.cache.Get(allCacheKey); present {
193 if items, ok := data.([]st.KeyedItemDescriptor); ok {
194 w.cache.Set(allCacheKey, updateSingleItem(items, key, newItem), cache.DefaultExpiration)
195 }
196 }
197 } else {
198 w.cache.Delete(allCacheKey)
199 }
200 } else {
201
202 w.cache.Delete(cacheKey)
203 w.cache.Delete(allCacheKey)
204 _, _ = w.Get(kind, key)
205 }
206 } else {
207
208
209
210 if w.hasInfiniteCache() {
211 w.cache.Set(cacheKey, newItem, cache.DefaultExpiration)
212 cachedItems := []st.KeyedItemDescriptor{}
213 if data, present := w.cache.Get(allCacheKey); present {
214 if items, ok := data.([]st.KeyedItemDescriptor); ok {
215 cachedItems = items
216 }
217 }
218 w.cache.Set(allCacheKey, updateSingleItem(cachedItems, key, newItem), cache.DefaultExpiration)
219 }
220 }
221 }
222 return updated, err
223 }
224
225 func (w *persistentDataStoreWrapper) IsInitialized() bool {
226 w.initLock.RLock()
227 previousValue := w.inited
228 w.initLock.RUnlock()
229 if previousValue {
230 return true
231 }
232
233 if w.cache != nil {
234 if _, found := w.cache.Get(initCheckedKey); found {
235 return false
236 }
237 }
238
239 newValue := w.core.IsInitialized()
240 if newValue {
241 w.initLock.Lock()
242 defer w.initLock.Unlock()
243 w.inited = true
244 if w.cache != nil {
245 w.cache.Delete(initCheckedKey)
246 }
247 } else if w.cache != nil {
248 w.cache.Set(initCheckedKey, "", cache.DefaultExpiration)
249 }
250 return newValue
251 }
252
253 func (w *persistentDataStoreWrapper) IsStatusMonitoringEnabled() bool {
254 return true
255 }
256
257 func (w *persistentDataStoreWrapper) Close() error {
258 w.statusPoller.Close()
259 return w.core.Close()
260 }
261
262 func (w *persistentDataStoreWrapper) pollAvailabilityAfterOutage() bool {
263 if !w.core.IsStoreAvailable() {
264 return false
265 }
266 if w.hasInfiniteCache() {
267
268
269
270 kinds := datakinds.AllDataKinds()
271 allData := make([]st.Collection, 0, len(kinds))
272 for _, kind := range kinds {
273 allCacheKey := dataStoreAllItemsCacheKey(kind)
274 if data, present := w.cache.Get(allCacheKey); present {
275 if items, ok := data.([]st.KeyedItemDescriptor); ok {
276 allData = append(allData, st.Collection{Kind: kind, Items: items})
277 }
278 }
279 }
280 err := w.initCore(allData)
281 if err != nil {
282
283
284
285 w.loggers.Errorf("Tried to write cached data to persistent store after a store outage, but failed: %s", err)
286 } else {
287 w.loggers.Warn("Successfully updated persistent store from cached data")
288
289
290 }
291 }
292 return true
293 }
294
295 func (w *persistentDataStoreWrapper) hasInfiniteCache() bool {
296 return w.cache != nil && w.cacheTTL < 0
297 }
298 func dataStoreCacheKey(kind st.DataKind, key string) string {
299 return kind.GetName() + ":" + key
300 }
301
302 func dataStoreAllItemsCacheKey(kind st.DataKind) string {
303 return "all:" + kind.GetName()
304 }
305
306 func (w *persistentDataStoreWrapper) initCore(allData []st.Collection) error {
307 serializedAllData := make([]st.SerializedCollection, 0, len(allData))
308 for _, coll := range allData {
309 serializedAllData = append(serializedAllData, st.SerializedCollection{
310 Kind: coll.Kind,
311 Items: w.serializeAll(coll.Kind, coll.Items),
312 })
313 }
314 err := w.core.Init(serializedAllData)
315 w.processError(err)
316 return err
317 }
318
319 func (w *persistentDataStoreWrapper) getAndDeserializeItem(
320 kind st.DataKind,
321 key string,
322 ) (st.ItemDescriptor, error) {
323 serializedItem, err := w.core.Get(kind, key)
324 if err == nil {
325 return w.deserialize(kind, serializedItem)
326 }
327 return st.ItemDescriptor{}.NotFound(), err
328 }
329
330 func (w *persistentDataStoreWrapper) getAllAndDeserialize(
331 kind st.DataKind,
332 ) ([]st.KeyedItemDescriptor, error) {
333 serializedItems, err := w.core.GetAll(kind)
334 if err == nil {
335 ret := make([]st.KeyedItemDescriptor, 0, len(serializedItems))
336 for _, serializedItem := range serializedItems {
337 item, err := w.deserialize(kind, serializedItem.Item)
338 if err != nil {
339 return nil, err
340 }
341 ret = append(ret, st.KeyedItemDescriptor{Key: serializedItem.Key, Item: item})
342 }
343 return ret, nil
344 }
345 return nil, err
346 }
347
348 func (w *persistentDataStoreWrapper) cacheItems(
349 kind st.DataKind,
350 items []st.KeyedItemDescriptor,
351 ) {
352 if w.cache != nil {
353 copyOfItems := slices.Clone(items)
354 w.cache.Set(dataStoreAllItemsCacheKey(kind), copyOfItems, cache.DefaultExpiration)
355
356 for _, item := range items {
357 w.cache.Set(dataStoreCacheKey(kind, item.Key), item.Item, cache.DefaultExpiration)
358 }
359 }
360 }
361
362 func (w *persistentDataStoreWrapper) serialize(
363 kind st.DataKind,
364 item st.ItemDescriptor,
365 ) st.SerializedItemDescriptor {
366 isDeleted := item.Item == nil
367 return st.SerializedItemDescriptor{
368 Version: item.Version,
369 Deleted: isDeleted,
370 SerializedItem: kind.Serialize(item),
371 }
372 }
373
374 func (w *persistentDataStoreWrapper) serializeAll(
375 kind st.DataKind,
376 items []st.KeyedItemDescriptor,
377 ) []st.KeyedSerializedItemDescriptor {
378 ret := make([]st.KeyedSerializedItemDescriptor, 0, len(items))
379 for _, item := range items {
380 ret = append(ret, st.KeyedSerializedItemDescriptor{
381 Key: item.Key,
382 Item: w.serialize(kind, item.Item),
383 })
384 }
385 return ret
386 }
387
388 func (w *persistentDataStoreWrapper) deserialize(
389 kind st.DataKind,
390 serializedItemDesc st.SerializedItemDescriptor,
391 ) (st.ItemDescriptor, error) {
392 if serializedItemDesc.Deleted || serializedItemDesc.SerializedItem == nil {
393 return st.ItemDescriptor{Version: serializedItemDesc.Version}, nil
394 }
395 deserializedItemDesc, err := kind.Deserialize(serializedItemDesc.SerializedItem)
396 if err != nil {
397 return st.ItemDescriptor{}.NotFound(), err
398 }
399 if serializedItemDesc.Version == 0 || serializedItemDesc.Version == deserializedItemDesc.Version {
400 return deserializedItemDesc, nil
401 }
402
403 return st.ItemDescriptor{Version: serializedItemDesc.Version, Item: deserializedItemDesc.Item}, nil
404 }
405
406 func updateSingleItem(
407 items []st.KeyedItemDescriptor,
408 key string,
409 newItem st.ItemDescriptor,
410 ) []st.KeyedItemDescriptor {
411 found := false
412 ret := make([]st.KeyedItemDescriptor, 0, len(items))
413 for _, item := range items {
414 if item.Key == key {
415 ret = append(ret, st.KeyedItemDescriptor{Key: key, Item: newItem})
416 found = true
417 } else {
418 ret = append(ret, item)
419 }
420 }
421 if !found {
422 ret = append(ret, st.KeyedItemDescriptor{Key: key, Item: newItem})
423 }
424 return ret
425 }
426
427 func (w *persistentDataStoreWrapper) processError(err error) {
428 if err == nil {
429
430
431
432
433 return
434 }
435 w.loggers.Errorf("Data store returned error: %s", err.Error())
436 w.statusPoller.UpdateAvailability(false)
437 }
438
View as plain text