...

Source file src/github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoreimpl/big_segment_store_wrapper_test.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoreimpl

     1  package ldstoreimpl
     2  
     3  import (
     4  	"errors"
     5  	"sync/atomic"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
    10  
    11  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    12  	"github.com/launchdarkly/go-sdk-common/v3/ldlogtest"
    13  	"github.com/launchdarkly/go-sdk-common/v3/ldreason"
    14  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
    15  	"github.com/launchdarkly/go-server-sdk/v6/internal/bigsegments"
    16  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    17  
    18  	th "github.com/launchdarkly/go-test-helpers/v3"
    19  
    20  	"github.com/stretchr/testify/assert"
    21  	"github.com/stretchr/testify/require"
    22  )
    23  
    24  func TestBigSegmentStoreWrapper(t *testing.T) {
    25  	t.Run("queries store with hashed user key", testBigSegmentStoreWrapperMembershipQuery)
    26  	t.Run("caches membership state", testBigSegmentStoreWrapperMembershipCaching)
    27  	t.Run("sends status updates", testBigSegmentStoreWrapperStatusUpdates)
    28  	t.Run("control methods", testBigSegmentStoreWrapperControlMethods)
    29  }
    30  
    31  type storeWrapperTestParams struct {
    32  	t        *testing.T
    33  	store    *mocks.MockBigSegmentStore
    34  	wrapper  *BigSegmentStoreWrapper
    35  	config   BigSegmentsConfigurationProperties
    36  	statusCh chan interfaces.BigSegmentStoreStatus
    37  	mockLog  *ldlogtest.MockLog
    38  }
    39  
    40  func storeWrapperTest(t *testing.T) *storeWrapperTestParams {
    41  	mockLog := ldlogtest.NewMockLog()
    42  	mockLog.Loggers.SetMinLevel(ldlog.Debug)
    43  	return &storeWrapperTestParams{
    44  		t:     t,
    45  		store: &mocks.MockBigSegmentStore{},
    46  		config: BigSegmentsConfigurationProperties{
    47  			StatusPollInterval: time.Millisecond * 10,
    48  			StaleAfter:         time.Hour,
    49  			ContextCacheSize:   1000,
    50  			ContextCacheTime:   time.Hour,
    51  			StartPolling:       true,
    52  		},
    53  		statusCh: make(chan interfaces.BigSegmentStoreStatus, 10),
    54  		mockLog:  mockLog,
    55  	}
    56  }
    57  
    58  func (p *storeWrapperTestParams) run(action func(*storeWrapperTestParams)) {
    59  	defer p.mockLog.DumpIfTestFailed(p.t)
    60  	config := p.config
    61  	config.Store = p.store
    62  	p.wrapper = NewBigSegmentStoreWrapperWithConfig(
    63  		config,
    64  		func(status interfaces.BigSegmentStoreStatus) { p.statusCh <- status },
    65  		p.mockLog.Loggers,
    66  	)
    67  	p.store.TestSetMetadataToCurrentTime()
    68  	defer p.wrapper.Close()
    69  	action(p)
    70  }
    71  
    72  func (p *storeWrapperTestParams) assertMembership(userKey string, expected subsystems.BigSegmentMembership) {
    73  	membership, status := p.wrapper.GetMembership(userKey)
    74  	assert.Equal(p.t, ldreason.BigSegmentsHealthy, status)
    75  	assert.Equal(p.t, expected, membership)
    76  }
    77  
    78  func (p *storeWrapperTestParams) assertUserHashesQueried(hashes ...string) {
    79  	assert.Equal(p.t, hashes, p.store.TestGetMembershipQueries())
    80  }
    81  
    82  func testBigSegmentStoreWrapperMembershipQuery(t *testing.T) {
    83  	storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
    84  		userKey := "userkey"
    85  		userHash := bigsegments.HashForContextKey(userKey)
    86  		expectedMembership := NewBigSegmentMembershipFromSegmentRefs([]string{"yes"}, []string{"no"})
    87  		p.store.TestSetMembership(userHash, expectedMembership)
    88  
    89  		p.assertMembership(userKey, expectedMembership)
    90  		p.assertUserHashesQueried(userHash)
    91  	})
    92  }
    93  
    94  func testBigSegmentStoreWrapperMembershipCaching(t *testing.T) {
    95  	t.Run("successful query is cached", func(t *testing.T) {
    96  		storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
    97  			userKey := "userkey"
    98  			userHash := bigsegments.HashForContextKey(userKey)
    99  			expectedMembership := NewBigSegmentMembershipFromSegmentRefs([]string{"yes"}, []string{"no"})
   100  			p.store.TestSetMembership(userHash, expectedMembership)
   101  
   102  			p.assertMembership(userKey, expectedMembership)
   103  			p.assertMembership(userKey, expectedMembership)
   104  			p.assertUserHashesQueried(userHash) // only one query was done
   105  		})
   106  	})
   107  
   108  	t.Run("not-found result is cached", func(t *testing.T) {
   109  		storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
   110  			userKey := "userkey"
   111  			userHash := bigsegments.HashForContextKey(userKey)
   112  
   113  			p.assertMembership(userKey, nil)
   114  			p.assertMembership(userKey, nil)
   115  			p.assertUserHashesQueried(userHash) // only one query was done
   116  		})
   117  	})
   118  
   119  	t.Run("least recent user is evicted from cache", func(t *testing.T) {
   120  		p := storeWrapperTest(t)
   121  		p.config.ContextCacheSize = 2
   122  		p.run(func(p *storeWrapperTestParams) {
   123  			userKey1 := "userkey1"
   124  			userHash1 := bigsegments.HashForContextKey(userKey1)
   125  			expectedMembership1 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes1"}, []string{"no1"})
   126  			p.store.TestSetMembership(userHash1, expectedMembership1)
   127  
   128  			userKey2 := "userkey2"
   129  			userHash2 := bigsegments.HashForContextKey(userKey2)
   130  			expectedMembership2 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes2"}, []string{"no2"})
   131  			p.store.TestSetMembership(userHash2, expectedMembership2)
   132  
   133  			userKey3 := "userkey3"
   134  			userHash3 := bigsegments.HashForContextKey(userKey3)
   135  			expectedMembership3 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes3"}, []string{"no3"})
   136  			p.store.TestSetMembership(userHash3, expectedMembership3)
   137  
   138  			p.assertMembership(userKey1, expectedMembership1)
   139  			p.assertMembership(userKey2, expectedMembership2)
   140  			p.assertMembership(userKey3, expectedMembership3)
   141  
   142  			// Since the capacity is only 2 and userKey1 was the least recently used, that key should be
   143  			// evicted by the userKey3 query. Unfortunately, we have to add a hacky delay here because the
   144  			// LRU behavior of ccache is only eventually consistent - the LRU status is updated by a worker
   145  			// goroutine.
   146  			require.Eventually(t, func() bool {
   147  				return p.wrapper.contextCache.Get(userKey1) == nil
   148  			}, time.Second, time.Millisecond*10, "timed out waiting for LRU eviction")
   149  
   150  			p.assertUserHashesQueried(userHash1, userHash2, userHash3)
   151  
   152  			p.assertMembership(userKey1, expectedMembership1)
   153  
   154  			p.assertUserHashesQueried(userHash1, userHash2, userHash3, userHash1)
   155  		})
   156  	})
   157  }
   158  
   159  func testBigSegmentStoreWrapperStatusUpdates(t *testing.T) {
   160  	t.Run("polling detects store unavailability", func(t *testing.T) {
   161  		storeWrapperTest(t).run(func(p *storeWrapperTestParams) {
   162  			mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
   163  				interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
   164  
   165  			p.store.TestSetMetadataState(subsystems.BigSegmentStoreMetadata{}, errors.New("sorry"))
   166  			mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
   167  				interfaces.BigSegmentStoreStatus{Available: false, Stale: false})
   168  
   169  			p.store.TestSetMetadataToCurrentTime()
   170  			mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
   171  				interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
   172  		})
   173  	})
   174  
   175  	t.Run("polling detects stale status", func(t *testing.T) {
   176  		p := storeWrapperTest(t)
   177  		p.config.StaleAfter = time.Millisecond * 100
   178  		p.run(func(p *storeWrapperTestParams) {
   179  			stopUpdater := make(chan struct{})
   180  			defer close(stopUpdater)
   181  
   182  			var shouldUpdate atomic.Value
   183  			shouldUpdate.Store(true)
   184  
   185  			go func() {
   186  				ticker := time.NewTicker(time.Millisecond * 5)
   187  				for {
   188  					select {
   189  					case <-stopUpdater:
   190  						ticker.Stop()
   191  						return
   192  					case <-ticker.C:
   193  						if shouldUpdate.Load() == true {
   194  							p.store.TestSetMetadataToCurrentTime()
   195  						}
   196  					}
   197  				}
   198  			}()
   199  
   200  			mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Second,
   201  				interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
   202  
   203  			shouldUpdate.Store(false)
   204  			mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Millisecond*200,
   205  				interfaces.BigSegmentStoreStatus{Available: true, Stale: true})
   206  
   207  			shouldUpdate.Store(true)
   208  			mocks.ExpectBigSegmentStoreStatus(t, p.statusCh, p.wrapper.GetStatus, time.Millisecond*200,
   209  				interfaces.BigSegmentStoreStatus{Available: true, Stale: false})
   210  		})
   211  	})
   212  }
   213  
   214  func testBigSegmentStoreWrapperControlMethods(t *testing.T) {
   215  	t.Run("can turn polling on after initially paused", func(t *testing.T) {
   216  		p := storeWrapperTest(t)
   217  		queriesCh := p.store.TestGetMetadataQueriesCh()
   218  		p.config.StartPolling = false
   219  		p.run(func(p *storeWrapperTestParams) {
   220  			if !th.AssertNoMoreValues(t, queriesCh, time.Millisecond*100, "got unexpected status poll") {
   221  				t.FailNow()
   222  			}
   223  
   224  			p.wrapper.SetPollingActive(true)
   225  
   226  			th.RequireValue(t, queriesCh, time.Millisecond*500, "timed out waiting for status poll")
   227  		})
   228  	})
   229  
   230  	t.Run("can clear cache", func(t *testing.T) {
   231  		p := storeWrapperTest(t)
   232  		p.run(func(p *storeWrapperTestParams) {
   233  			userKey := "userkey"
   234  			userHash := bigsegments.HashForContextKey(userKey)
   235  
   236  			expectedMembership1 := NewBigSegmentMembershipFromSegmentRefs([]string{"yes"}, []string{"no"})
   237  			p.store.TestSetMembership(userHash, expectedMembership1)
   238  
   239  			p.assertMembership(userKey, expectedMembership1)
   240  			p.assertMembership(userKey, expectedMembership1)
   241  
   242  			p.assertUserHashesQueried(userHash) // only one query was done
   243  
   244  			expectedMembership2 := NewBigSegmentMembershipFromSegmentRefs([]string{"maybe"}, []string{"no"})
   245  			p.store.TestSetMembership(userHash, expectedMembership2)
   246  
   247  			p.wrapper.ClearCache()
   248  
   249  			p.assertMembership(userKey, expectedMembership2)
   250  			p.assertUserHashesQueried(userHash, userHash) // a second query was done
   251  		})
   252  	})
   253  }
   254  

View as plain text