...

Source file src/github.com/launchdarkly/go-server-sdk-redis-redigo/v2/redis_impl.go

Documentation: github.com/launchdarkly/go-server-sdk-redis-redigo/v2

     1  package ldredis
     2  
     3  import (
     4  	"net/url"
     5  	"time"
     6  
     7  	r "github.com/gomodule/redigo/redis"
     8  
     9  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    10  	"github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
    11  )
    12  
    13  // Internal implementation of the PersistentDataStore interface for Redis.
    14  type redisDataStoreImpl struct {
    15  	prefix     string
    16  	pool       Pool
    17  	loggers    ldlog.Loggers
    18  	testTxHook func()
    19  }
    20  
    21  func newPool(url string, dialOptions []r.DialOption) *r.Pool {
    22  	pool := &r.Pool{
    23  		MaxIdle:     20,
    24  		MaxActive:   16,
    25  		Wait:        true,
    26  		IdleTimeout: 300 * time.Second,
    27  		Dial: func() (c r.Conn, err error) {
    28  			c, err = r.DialURL(url, dialOptions...)
    29  			return
    30  		},
    31  		TestOnBorrow: func(c r.Conn, t time.Time) error {
    32  			_, err := c.Do("PING")
    33  			return err
    34  		},
    35  	}
    36  	return pool
    37  }
    38  
    39  const initedKey = "$inited"
    40  
    41  func newRedisDataStoreImpl(
    42  	builder builderOptions,
    43  	loggers ldlog.Loggers,
    44  ) *redisDataStoreImpl {
    45  	impl := &redisDataStoreImpl{
    46  		prefix:  builder.prefix,
    47  		pool:    builder.pool,
    48  		loggers: loggers,
    49  	}
    50  	impl.loggers.SetPrefix("RedisDataStore:")
    51  
    52  	if impl.pool == nil {
    53  		logRedisURL(loggers, builder.url)
    54  		impl.pool = newPool(builder.url, builder.dialOptions)
    55  	}
    56  	return impl
    57  }
    58  
    59  func logRedisURL(loggers ldlog.Loggers, redisURL string) {
    60  	if parsed, err := url.Parse(redisURL); err == nil {
    61  		loggers.Infof("Using URL: %s", parsed.Redacted())
    62  	} else {
    63  		loggers.Errorf("Invalid Redis URL: %s", redisURL) // we can assume that the Redis client will also fail
    64  	}
    65  }
    66  
    67  func (store *redisDataStoreImpl) Init(allData []ldstoretypes.SerializedCollection) error {
    68  	c := store.getConn()
    69  	defer c.Close() // nolint:errcheck
    70  
    71  	_ = c.Send("MULTI")
    72  
    73  	totalCount := 0
    74  
    75  	for _, coll := range allData {
    76  		baseKey := store.featuresKey(coll.Kind)
    77  
    78  		_ = c.Send("DEL", baseKey)
    79  
    80  		totalCount += len(coll.Items)
    81  		for _, keyedItem := range coll.Items {
    82  			_ = c.Send("HSET", baseKey, keyedItem.Key, keyedItem.Item.SerializedItem)
    83  		}
    84  	}
    85  
    86  	_ = c.Send("SET", store.initedKey(), "")
    87  
    88  	_, err := c.Do("EXEC")
    89  
    90  	if err == nil {
    91  		store.loggers.Infof("Initialized with %d items", totalCount)
    92  	}
    93  
    94  	return err
    95  }
    96  
    97  func (store *redisDataStoreImpl) Get(
    98  	kind ldstoretypes.DataKind,
    99  	key string,
   100  ) (ldstoretypes.SerializedItemDescriptor, error) {
   101  	c := store.getConn()
   102  	defer c.Close() // nolint:errcheck
   103  
   104  	jsonStr, err := r.String(c.Do("HGET", store.featuresKey(kind), key))
   105  
   106  	if err != nil {
   107  		if err == r.ErrNil {
   108  			if store.loggers.IsDebugEnabled() { // COVERAGE: tests don't verify debug logging
   109  				store.loggers.Debugf("Key: %s not found in \"%s\"", key, kind.GetName())
   110  			}
   111  			return ldstoretypes.SerializedItemDescriptor{}.NotFound(), nil
   112  		}
   113  		return ldstoretypes.SerializedItemDescriptor{}.NotFound(), err
   114  	}
   115  
   116  	return ldstoretypes.SerializedItemDescriptor{Version: 0, SerializedItem: []byte(jsonStr)}, nil
   117  }
   118  
   119  func (store *redisDataStoreImpl) GetAll(
   120  	kind ldstoretypes.DataKind,
   121  ) ([]ldstoretypes.KeyedSerializedItemDescriptor, error) {
   122  	c := store.getConn()
   123  	defer c.Close() // nolint:errcheck
   124  
   125  	values, err := r.StringMap(c.Do("HGETALL", store.featuresKey(kind)))
   126  
   127  	if err != nil && err != r.ErrNil {
   128  		return nil, err
   129  	}
   130  
   131  	results := make([]ldstoretypes.KeyedSerializedItemDescriptor, 0, len(values))
   132  	for k, v := range values {
   133  		results = append(results, ldstoretypes.KeyedSerializedItemDescriptor{
   134  			Key:  k,
   135  			Item: ldstoretypes.SerializedItemDescriptor{Version: 0, SerializedItem: []byte(v)},
   136  		})
   137  	}
   138  	return results, nil
   139  }
   140  
   141  func (store *redisDataStoreImpl) Upsert(
   142  	kind ldstoretypes.DataKind,
   143  	key string,
   144  	newItem ldstoretypes.SerializedItemDescriptor,
   145  ) (bool, error) {
   146  	baseKey := store.featuresKey(kind)
   147  	for {
   148  		// We accept that we can acquire multiple connections here and defer inside loop but we don't expect many
   149  		c := store.getConn()
   150  		defer c.Close() // nolint:errcheck
   151  
   152  		_, err := c.Do("WATCH", baseKey)
   153  		if err != nil {
   154  			return false, err
   155  		}
   156  
   157  		defer c.Send("UNWATCH") // nolint:errcheck // this should always succeed
   158  
   159  		if store.testTxHook != nil { // instrumentation for unit tests
   160  			store.testTxHook()
   161  		}
   162  
   163  		oldItem, err := store.Get(kind, key)
   164  		if err != nil { // COVERAGE: can't cause an error here in unit tests
   165  			return false, err
   166  		}
   167  
   168  		// In this implementation, we have to parse the existing item in order to determine its version.
   169  		oldVersion := oldItem.Version
   170  		if oldItem.SerializedItem != nil {
   171  			parsed, _ := kind.Deserialize(oldItem.SerializedItem)
   172  			oldVersion = parsed.Version
   173  		}
   174  
   175  		if oldVersion >= newItem.Version {
   176  			updateOrDelete := "update"
   177  			if newItem.Deleted {
   178  				updateOrDelete = "delete"
   179  			}
   180  			if store.loggers.IsDebugEnabled() { // COVERAGE: tests don't verify debug logging
   181  				store.loggers.Debugf(`Attempted to %s key: %s version: %d in "%s" with a version that is the same or older: %d`,
   182  					updateOrDelete, key, oldVersion, kind, newItem.Version)
   183  			}
   184  			return false, nil
   185  		}
   186  
   187  		_ = c.Send("MULTI")
   188  		err = c.Send("HSET", baseKey, key, newItem.SerializedItem)
   189  		if err == nil {
   190  			var result interface{}
   191  			result, err = c.Do("EXEC")
   192  			if err == nil {
   193  				if result == nil {
   194  					// if exec returned nothing, it means the watch was triggered and we should retry
   195  					if store.loggers.IsDebugEnabled() { // COVERAGE: tests don't verify debug logging
   196  						store.loggers.Debug("Concurrent modification detected, retrying")
   197  					}
   198  					continue
   199  				}
   200  			}
   201  			return true, nil
   202  		}
   203  		return false, err // COVERAGE: can't cause an error here in unit tests
   204  	}
   205  }
   206  
   207  func (store *redisDataStoreImpl) IsInitialized() bool {
   208  	c := store.getConn()
   209  	defer c.Close() // nolint:errcheck
   210  	inited, _ := r.Bool(c.Do("EXISTS", store.initedKey()))
   211  	return inited
   212  }
   213  
   214  func (store *redisDataStoreImpl) IsStoreAvailable() bool {
   215  	c := store.getConn()
   216  	defer c.Close() // nolint:errcheck
   217  	_, err := r.Bool(c.Do("EXISTS", store.initedKey()))
   218  	return err == nil
   219  }
   220  
   221  func (store *redisDataStoreImpl) Close() error {
   222  	return store.pool.Close()
   223  }
   224  
   225  func (store *redisDataStoreImpl) featuresKey(kind ldstoretypes.DataKind) string {
   226  	return store.prefix + ":" + kind.GetName()
   227  }
   228  
   229  func (store *redisDataStoreImpl) initedKey() string {
   230  	return store.prefix + ":" + initedKey
   231  }
   232  
   233  func (store *redisDataStoreImpl) getConn() r.Conn {
   234  	return store.pool.Get()
   235  }
   236  

View as plain text