1 package ldredis
2
3 import (
4 "net/url"
5 "time"
6
7 r "github.com/gomodule/redigo/redis"
8
9 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
10 "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
11 )
12
13
14 type redisDataStoreImpl struct {
15 prefix string
16 pool Pool
17 loggers ldlog.Loggers
18 testTxHook func()
19 }
20
21 func newPool(url string, dialOptions []r.DialOption) *r.Pool {
22 pool := &r.Pool{
23 MaxIdle: 20,
24 MaxActive: 16,
25 Wait: true,
26 IdleTimeout: 300 * time.Second,
27 Dial: func() (c r.Conn, err error) {
28 c, err = r.DialURL(url, dialOptions...)
29 return
30 },
31 TestOnBorrow: func(c r.Conn, t time.Time) error {
32 _, err := c.Do("PING")
33 return err
34 },
35 }
36 return pool
37 }
38
39 const initedKey = "$inited"
40
41 func newRedisDataStoreImpl(
42 builder builderOptions,
43 loggers ldlog.Loggers,
44 ) *redisDataStoreImpl {
45 impl := &redisDataStoreImpl{
46 prefix: builder.prefix,
47 pool: builder.pool,
48 loggers: loggers,
49 }
50 impl.loggers.SetPrefix("RedisDataStore:")
51
52 if impl.pool == nil {
53 logRedisURL(loggers, builder.url)
54 impl.pool = newPool(builder.url, builder.dialOptions)
55 }
56 return impl
57 }
58
59 func logRedisURL(loggers ldlog.Loggers, redisURL string) {
60 if parsed, err := url.Parse(redisURL); err == nil {
61 loggers.Infof("Using URL: %s", parsed.Redacted())
62 } else {
63 loggers.Errorf("Invalid Redis URL: %s", redisURL)
64 }
65 }
66
67 func (store *redisDataStoreImpl) Init(allData []ldstoretypes.SerializedCollection) error {
68 c := store.getConn()
69 defer c.Close()
70
71 _ = c.Send("MULTI")
72
73 totalCount := 0
74
75 for _, coll := range allData {
76 baseKey := store.featuresKey(coll.Kind)
77
78 _ = c.Send("DEL", baseKey)
79
80 totalCount += len(coll.Items)
81 for _, keyedItem := range coll.Items {
82 _ = c.Send("HSET", baseKey, keyedItem.Key, keyedItem.Item.SerializedItem)
83 }
84 }
85
86 _ = c.Send("SET", store.initedKey(), "")
87
88 _, err := c.Do("EXEC")
89
90 if err == nil {
91 store.loggers.Infof("Initialized with %d items", totalCount)
92 }
93
94 return err
95 }
96
97 func (store *redisDataStoreImpl) Get(
98 kind ldstoretypes.DataKind,
99 key string,
100 ) (ldstoretypes.SerializedItemDescriptor, error) {
101 c := store.getConn()
102 defer c.Close()
103
104 jsonStr, err := r.String(c.Do("HGET", store.featuresKey(kind), key))
105
106 if err != nil {
107 if err == r.ErrNil {
108 if store.loggers.IsDebugEnabled() {
109 store.loggers.Debugf("Key: %s not found in \"%s\"", key, kind.GetName())
110 }
111 return ldstoretypes.SerializedItemDescriptor{}.NotFound(), nil
112 }
113 return ldstoretypes.SerializedItemDescriptor{}.NotFound(), err
114 }
115
116 return ldstoretypes.SerializedItemDescriptor{Version: 0, SerializedItem: []byte(jsonStr)}, nil
117 }
118
119 func (store *redisDataStoreImpl) GetAll(
120 kind ldstoretypes.DataKind,
121 ) ([]ldstoretypes.KeyedSerializedItemDescriptor, error) {
122 c := store.getConn()
123 defer c.Close()
124
125 values, err := r.StringMap(c.Do("HGETALL", store.featuresKey(kind)))
126
127 if err != nil && err != r.ErrNil {
128 return nil, err
129 }
130
131 results := make([]ldstoretypes.KeyedSerializedItemDescriptor, 0, len(values))
132 for k, v := range values {
133 results = append(results, ldstoretypes.KeyedSerializedItemDescriptor{
134 Key: k,
135 Item: ldstoretypes.SerializedItemDescriptor{Version: 0, SerializedItem: []byte(v)},
136 })
137 }
138 return results, nil
139 }
140
141 func (store *redisDataStoreImpl) Upsert(
142 kind ldstoretypes.DataKind,
143 key string,
144 newItem ldstoretypes.SerializedItemDescriptor,
145 ) (bool, error) {
146 baseKey := store.featuresKey(kind)
147 for {
148
149 c := store.getConn()
150 defer c.Close()
151
152 _, err := c.Do("WATCH", baseKey)
153 if err != nil {
154 return false, err
155 }
156
157 defer c.Send("UNWATCH")
158
159 if store.testTxHook != nil {
160 store.testTxHook()
161 }
162
163 oldItem, err := store.Get(kind, key)
164 if err != nil {
165 return false, err
166 }
167
168
169 oldVersion := oldItem.Version
170 if oldItem.SerializedItem != nil {
171 parsed, _ := kind.Deserialize(oldItem.SerializedItem)
172 oldVersion = parsed.Version
173 }
174
175 if oldVersion >= newItem.Version {
176 updateOrDelete := "update"
177 if newItem.Deleted {
178 updateOrDelete = "delete"
179 }
180 if store.loggers.IsDebugEnabled() {
181 store.loggers.Debugf(`Attempted to %s key: %s version: %d in "%s" with a version that is the same or older: %d`,
182 updateOrDelete, key, oldVersion, kind, newItem.Version)
183 }
184 return false, nil
185 }
186
187 _ = c.Send("MULTI")
188 err = c.Send("HSET", baseKey, key, newItem.SerializedItem)
189 if err == nil {
190 var result interface{}
191 result, err = c.Do("EXEC")
192 if err == nil {
193 if result == nil {
194
195 if store.loggers.IsDebugEnabled() {
196 store.loggers.Debug("Concurrent modification detected, retrying")
197 }
198 continue
199 }
200 }
201 return true, nil
202 }
203 return false, err
204 }
205 }
206
207 func (store *redisDataStoreImpl) IsInitialized() bool {
208 c := store.getConn()
209 defer c.Close()
210 inited, _ := r.Bool(c.Do("EXISTS", store.initedKey()))
211 return inited
212 }
213
214 func (store *redisDataStoreImpl) IsStoreAvailable() bool {
215 c := store.getConn()
216 defer c.Close()
217 _, err := r.Bool(c.Do("EXISTS", store.initedKey()))
218 return err == nil
219 }
220
221 func (store *redisDataStoreImpl) Close() error {
222 return store.pool.Close()
223 }
224
225 func (store *redisDataStoreImpl) featuresKey(kind ldstoretypes.DataKind) string {
226 return store.prefix + ":" + kind.GetName()
227 }
228
229 func (store *redisDataStoreImpl) initedKey() string {
230 return store.prefix + ":" + initedKey
231 }
232
233 func (store *redisDataStoreImpl) getConn() r.Conn {
234 return store.pool.Get()
235 }
236
View as plain text