1 package ldstoreimpl
2
3 import (
4 "errors"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
10
11 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
12 "github.com/launchdarkly/go-sdk-common/v3/ldlogtest"
13 "github.com/launchdarkly/go-sdk-common/v3/ldreason"
14 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
15 "github.com/launchdarkly/go-server-sdk/v6/internal/bigsegments"
16 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
17
18 th "github.com/launchdarkly/go-test-helpers/v3"
19
20 "github.com/stretchr/testify/assert"
21 "github.com/stretchr/testify/require"
22 )
23
24 func TestBigSegmentStoreWrapper(t *testing.T) {
25 t.Run("queries store with hashed user key", testBigSegmentStoreWrapperMembershipQuery)
26 t.Run("caches membership state", testBigSegmentStoreWrapperMembershipCaching)
27 t.Run("sends status updates", testBigSegmentStoreWrapperStatusUpdates)
28 t.Run("control methods", testBigSegmentStoreWrapperControlMethods)
29 }
30
31 type storeWrapperTestParams struct {
32 t *testing.T
33 store *mocks.MockBigSegmentStore
34 wrapper *BigSegmentStoreWrapper
35 config BigSegmentsConfigurationProperties
36 statusCh chan interfaces.BigSegmentStoreStatus
37 mockLog *ldlogtest.MockLog
38 }
39
40 func storeWrapperTest(t *testing.T) *storeWrapperTestParams {
41 mockLog := ldlogtest.NewMockLog()
42 mockLog.Loggers.SetMinLevel(ldlog.Debug)
43 return &storeWrapperTestParams{
44 t: t,
45 store: &mocks.MockBigSegmentStore{},
46 config: BigSegmentsConfigurationProperties{
47 StatusPollInterval: time.Millisecond * 10,
48 StaleAfter: time.Hour,
49 ContextCacheSize: 1000,
50 ContextCacheTime: time.Hour,
51 StartPolling: true,
52 },
53 statusCh: make(chan interfaces.BigSegmentStoreStatus, 10),
54 mockLog: mockLog,
55 }
56 }
57
58 func (p *storeWrapperTestParams) run(action func(*storeWrapperTestParams)) {
59 defer p.mockLog.DumpIfTestFailed(p.t)
60 config := p.config
61 config.Store = p.store
62 p.wrapper = NewBigSegmentStoreWrapperWithConfig(
63 config,
64 func(status interfaces.BigSegmentStoreStatus) { p.statusCh <- status },
65 p.mockLog.Loggers,
66 )
67 p.store.TestSetMetadataToCurrentTime()
68 defer p.wrapper.Close()
69 action(p)
70 }
71
72 func (p *storeWrapperTestParams) assertMembership(userKey string, expected subsystems.BigSegmentMembership) {
73 membership, status := p.wrapper.GetMembership(userKey)
74 assert.Equal(p.t, ldreason.BigSegmentsHealthy, status)
75 assert.Equal(p.t, expected, membership)
76 }
77
78 func (p *storeWrapperTestParams) assertUserHashesQueried(hashes ...string) {
79 assert.Equal(p.t, hashes, p.store.TestGetMembershipQueries())
80 }
81
82 func testBigSegmentStoreWrapperMembershipQuery(t *testing.T) {
83 storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
84 userKey := "userkey"
85 userHash := bigsegments.HashForContextKey(userKey)
86 expectedMembership := NewBigSegmentMembershipFromSegmentRefs([]string{"yes"}, []string{"no"})
87 p.store.TestSetMembership(userHash, expectedMembership)
88
89 p.assertMembership(userKey, expectedMembership)
90 p.assertUserHashesQueried(userHash)
91 })
92 }
93
94 func testBigSegmentStoreWrapperMembershipCaching(t *testing.T) {
95 t.Run("successful query is cached", func(t *testing.T) {
96 storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
97 userKey := "userkey"
98 userHash := bigsegments.HashForContextKey(userKey)
99 expectedMembership := NewBigSegmentMembershipFromSegmentRefs([]string{"yes"}, []string{"no"})
100 p.store.TestSetMembership(userHash, expectedMembership)
101
102 p.assertMembership(userKey, expectedMembership)
103 p.assertMembership(userKey, expectedMembership)
104 p.assertUserHashesQueried(userHash)
105 })
106 })
107
108 t.Run("not-found result is cached", func(t *testing.T) {
109 storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
110 userKey := "userkey"
111 userHash := bigsegments.HashForContextKey(userKey)
112
113 p.assertMembership(userKey, nil)
114 p.assertMembership(userKey, nil)
115 p.assertUserHashesQueried(userHash)
116 })
117 })
118
119 t.Run("least recent user is evicted from cache", func(t *testing.T) {
120 p := storeWrapperTest(t)
121 p.config.ContextCacheSize = 2
122 p.run(func(p *storeWrapperTestParams) {
123 userKey1 := "userkey1"
124 userHash1 := bigsegments.HashForContextKey(userKey1)
125 expectedMembership1 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes1"}, []string{"no1"})
126 p.store.TestSetMembership(userHash1, expectedMembership1)
127
128 userKey2 := "userkey2"
129 userHash2 := bigsegments.HashForContextKey(userKey2)
130 expectedMembership2 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes2"}, []string{"no2"})
131 p.store.TestSetMembership(userHash2, expectedMembership2)
132
133 userKey3 := "userkey3"
134 userHash3 := bigsegments.HashForContextKey(userKey3)
135 expectedMembership3 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes3"}, []string{"no3"})
136 p.store.TestSetMembership(userHash3, expectedMembership3)
137
138 p.assertMembership(userKey1, expectedMembership1)
139 p.assertMembership(userKey2, expectedMembership2)
140 p.assertMembership(userKey3, expectedMembership3)
141
142
143
144
145
146 require.Eventually(t, func() bool {
147 return p.wrapper.contextCache.Get(userKey1) == nil
148 }, time.Second, time.Millisecond*10, "timed out waiting for LRU eviction")
149
150 p.assertUserHashesQueried(userHash1, userHash2, userHash3)
151
152 p.assertMembership(userKey1, expectedMembership1)
153
154 p.assertUserHashesQueried(userHash1, userHash2, userHash3, userHash1)
155 })
156 })
157 }
158
159 func testBigSegmentStoreWrapperStatusUpdates(t *testing.T) {
160 t.Run("polling detects store unavailability", func(t *testing.T) {
161 storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
162 mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
163 interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
164
165 p.store.TestSetMetadataState(subsystems.BigSegmentStoreMetadata{}, errors.New("sorry"))
166 mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
167 interfaces.BigSegmentStoreStatus{Available: false, Stale: false})
168
169 p.store.TestSetMetadataToCurrentTime()
170 mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
171 interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
172 })
173 })
174
175 t.Run("polling detects stale status", func(t *testing.T) {
176 p := storeWrapperTest(t)
177 p.config.StaleAfter = time.Millisecond * 100
178 p.run(func(p *storeWrapperTestParams) {
179 stopUpdater := make(chan struct{})
180 defer close(stopUpdater)
181
182 var shouldUpdate atomic.Value
183 shouldUpdate.Store(true)
184
185 go func() {
186 ticker := time.NewTicker(time.Millisecond * 5)
187 for {
188 select {
189 case <-stopUpdater:
190 ticker.Stop()
191 return
192 case <-ticker.C:
193 if shouldUpdate.Load() == true {
194 p.store.TestSetMetadataToCurrentTime()
195 }
196 }
197 }
198 }()
199
200 mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
201 interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
202
203 shouldUpdate.Store(false)
204 mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Millisecond*200,
205 interfaces.BigSegmentStoreStatus{Available: true, Stale: true})
206
207 shouldUpdate.Store(true)
208 mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Millisecond*200,
209 interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
210 })
211 })
212 }
213
214 func testBigSegmentStoreWrapperControlMethods(t *testing.T) {
215 t.Run("can turn polling on after initially paused", func(t *testing.T) {
216 p := storeWrapperTest(t)
217 queriesCh := p.store.TestGetMetadataQueriesCh()
218 p.config.StartPolling = false
219 p.run(func(p *storeWrapperTestParams) {
220 if !th.AssertNoMoreValues(t, queriesCh, time.Millisecond*100, "got unexpected status poll") {
221 t.FailNow()
222 }
223
224 p.wrapper.SetPollingActive(true)
225
226 th.RequireValue(t, queriesCh, time.Millisecond*500, "timed out waiting for status poll")
227 })
228 })
229
230 t.Run("can clear cache", func(t *testing.T) {
231 p := storeWrapperTest(t)
232 p.run(func(p *storeWrapperTestParams) {
233 userKey := "userkey"
234 userHash := bigsegments.HashForContextKey(userKey)
235
236 expectedMembership1 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes"}, []string{"no"})
237 p.store.TestSetMembership(userHash, expectedMembership1)
238
239 p.assertMembership(userKey, expectedMembership1)
240 p.assertMembership(userKey, expectedMembership1)
241
242 p.assertUserHashesQueried(userHash)
243
244 expectedMembership2 := NewBigSegmentMembershipFromSegmentRefs([]string{"maybe"}, []string{"no"})
245 p.store.TestSetMembership(userHash, expectedMembership2)
246
247 p.wrapper.ClearCache()
248
249 p.assertMembership(userKey, expectedMembership2)
250 p.assertUserHashesQueried(userHash, userHash)
251 })
252 })
253 }
254
View as plain text