...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datasource/data_source_update_sink_impl_test.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datasource

     1  package datasource
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"strings"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest/mocks"
    11  
    12  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    13  	"github.com/launchdarkly/go-sdk-common/v3/ldlogtest"
    14  	"github.com/launchdarkly/go-server-sdk-evaluation/v2/ldbuilders"
    15  
    16  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
    17  	intf "github.com/launchdarkly/go-server-sdk/v6/interfaces"
    18  	"github.com/launchdarkly/go-server-sdk/v6/internal"
    19  	"github.com/launchdarkly/go-server-sdk/v6/internal/datakinds"
    20  	"github.com/launchdarkly/go-server-sdk/v6/internal/datastore"
    21  	"github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
    22  	st "github.com/launchdarkly/go-server-sdk/v6/subsystems/ldstoretypes"
    23  
    24  	th "github.com/launchdarkly/go-test-helpers/v3"
    25  
    26  	"github.com/stretchr/testify/assert"
    27  	"github.com/stretchr/testify/require"
    28  )
    29  
    30  const testDataSourceOutageTimeout = 200 * time.Millisecond
    31  
    32  type dataSourceUpdateSinkImplTestParams struct {
    33  	store                   *mocks.CapturingDataStore
    34  	dataStoreStatusProvider intf.DataStoreStatusProvider
    35  	dataSourceUpdates       *DataSourceUpdateSinkImpl
    36  	flagChangeBroadcaster   *internal.Broadcaster[interfaces.FlagChangeEvent]
    37  	mockLoggers             *ldlogtest.MockLog
    38  }
    39  
    40  func dataSourceUpdateSinkImplTest(action func(dataSourceUpdateSinkImplTestParams)) {
    41  	p := dataSourceUpdateSinkImplTestParams{}
    42  	p.mockLoggers = ldlogtest.NewMockLog()
    43  	p.store = mocks.NewCapturingDataStore(datastore.NewInMemoryDataStore(p.mockLoggers.Loggers))
    44  	dataStoreUpdates := datastore.NewDataStoreUpdateSinkImpl(nil)
    45  	p.dataStoreStatusProvider = datastore.NewDataStoreStatusProviderImpl(p.store, dataStoreUpdates)
    46  	dataSourceStatusBroadcaster := internal.NewBroadcaster[interfaces.DataSourceStatus]()
    47  	defer dataSourceStatusBroadcaster.Close()
    48  	p.flagChangeBroadcaster = internal.NewBroadcaster[interfaces.FlagChangeEvent]()
    49  	defer p.flagChangeBroadcaster.Close()
    50  	p.dataSourceUpdates = NewDataSourceUpdateSinkImpl(
    51  		p.store,
    52  		p.dataStoreStatusProvider,
    53  		dataSourceStatusBroadcaster,
    54  		p.flagChangeBroadcaster,
    55  		testDataSourceOutageTimeout,
    56  		p.mockLoggers.Loggers,
    57  	)
    58  
    59  	action(p)
    60  }
    61  
    62  func TestDataSourceUpdateSinkImpl(t *testing.T) {
    63  	storeError := errors.New("sorry")
    64  	expectedStoreErrorMessage := "Unexpected data store error when trying to store an update received from the data source: sorry"
    65  
    66  	t.Run("Init", func(t *testing.T) {
    67  		t.Run("passes data to store", func(t *testing.T) {
    68  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
    69  				inputData := sharedtest.NewDataSetBuilder().Flags(ldbuilders.NewFlagBuilder("a").Build())
    70  
    71  				result := p.dataSourceUpdates.Init(inputData.Build())
    72  				assert.True(t, result)
    73  
    74  				p.store.WaitForInit(t, inputData.ToServerSDKData(), time.Second)
    75  			})
    76  		})
    77  
    78  		t.Run("detects error from store", func(t *testing.T) {
    79  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
    80  				p.store.SetFakeError(storeError)
    81  
    82  				result := p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build())
    83  				assert.False(t, result)
    84  				assert.Equal(t, intf.DataSourceErrorKindStoreError, p.dataSourceUpdates.GetLastStatus().LastError.Kind)
    85  
    86  				log1 := p.mockLoggers.GetOutput(ldlog.Warn)
    87  				assert.Equal(t, []string{expectedStoreErrorMessage}, log1)
    88  
    89  				// does not log a redundant message if the next update also fails
    90  				assert.False(t, p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build()))
    91  				log2 := p.mockLoggers.GetOutput(ldlog.Warn)
    92  				assert.Equal(t, log1, log2)
    93  
    94  				// does log the message again if there's another failure later after a success
    95  				p.store.SetFakeError(nil)
    96  				assert.True(t, p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build()))
    97  				p.store.SetFakeError(storeError)
    98  				assert.False(t, p.dataSourceUpdates.Init(sharedtest.NewDataSetBuilder().Build()))
    99  				log3 := p.mockLoggers.GetOutput(ldlog.Warn)
   100  				assert.Equal(t, []string{expectedStoreErrorMessage, expectedStoreErrorMessage}, log3)
   101  			})
   102  		})
   103  
   104  		t.Run("sorts the data set", testDataSourceUpdatesImplSortsInitData)
   105  	})
   106  
   107  	t.Run("Upsert", func(t *testing.T) {
   108  		t.Run("passes data to store", func(t *testing.T) {
   109  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   110  				flag := ldbuilders.NewFlagBuilder("key").Version(1).Build()
   111  				itemDesc := st.ItemDescriptor{Version: 1, Item: &flag}
   112  				result := p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc)
   113  				assert.True(t, result)
   114  
   115  				p.store.WaitForUpsert(t, datakinds.Features, flag.Key, itemDesc.Version, time.Second)
   116  			})
   117  		})
   118  
   119  		t.Run("detects error from store", func(t *testing.T) {
   120  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   121  				p.store.SetFakeError(storeError)
   122  
   123  				flag := ldbuilders.NewFlagBuilder("key").Version(1).Build()
   124  				itemDesc := st.ItemDescriptor{Version: 1, Item: &flag}
   125  				result := p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc)
   126  				assert.False(t, result)
   127  				assert.Equal(t, intf.DataSourceErrorKindStoreError, p.dataSourceUpdates.GetLastStatus().LastError.Kind)
   128  
   129  				log1 := p.mockLoggers.GetOutput(ldlog.Warn)
   130  				assert.Equal(t, []string{expectedStoreErrorMessage}, log1)
   131  
   132  				// does not log a redundant message if the next update also fails
   133  				assert.False(t, p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc))
   134  				log2 := p.mockLoggers.GetOutput(ldlog.Warn)
   135  				assert.Equal(t, log1, log2)
   136  
   137  				// does log the message again if there's another failure later after a success
   138  				p.store.SetFakeError(nil)
   139  				assert.True(t, p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc))
   140  				p.store.SetFakeError(storeError)
   141  				assert.False(t, p.dataSourceUpdates.Upsert(datakinds.Features, flag.Key, itemDesc))
   142  				log3 := p.mockLoggers.GetOutput(ldlog.Warn)
   143  				assert.Equal(t, []string{expectedStoreErrorMessage, expectedStoreErrorMessage}, log3)
   144  			})
   145  		})
   146  	})
   147  
   148  	t.Run("UpdateStatus", func(t *testing.T) {
   149  		// broadcaster behavior is covered by DataSourceStatusProviderImpl tests
   150  
   151  		t.Run("does not update status if state is the same and errorInfo is empty", func(t *testing.T) {
   152  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   153  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   154  				status1 := p.dataSourceUpdates.currentStatus
   155  				<-time.After(time.Millisecond) // so time is different
   156  
   157  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   158  				status2 := p.dataSourceUpdates.currentStatus
   159  				assert.Equal(t, status1, status2)
   160  			})
   161  		})
   162  
   163  		t.Run("does not update status if new state is empty", func(t *testing.T) {
   164  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   165  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   166  				status1 := p.dataSourceUpdates.currentStatus
   167  
   168  				p.dataSourceUpdates.UpdateStatus("", intf.DataSourceErrorInfo{})
   169  				status2 := p.dataSourceUpdates.currentStatus
   170  				assert.Equal(t, status1, status2)
   171  			})
   172  		})
   173  
   174  		t.Run("updates status if state is the same and errorInfo is not empty", func(t *testing.T) {
   175  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   176  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   177  				status1 := p.dataSourceUpdates.currentStatus
   178  				<-time.After(time.Millisecond) // so time is different
   179  
   180  				errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
   181  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, errorInfo)
   182  				status2 := p.dataSourceUpdates.currentStatus
   183  				assert.NotEqual(t, status1, status2)
   184  				assert.Equal(t, status1.State, status2.State)
   185  				assert.Equal(t, errorInfo, status2.LastError)
   186  			})
   187  		})
   188  
   189  		t.Run("updates status if state is not the same", func(t *testing.T) {
   190  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   191  				errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
   192  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, errorInfo)
   193  
   194  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, intf.DataSourceErrorInfo{})
   195  				status := p.dataSourceUpdates.currentStatus
   196  				assert.Equal(t, intf.DataSourceStateInterrupted, status.State)
   197  				assert.Equal(t, errorInfo, status.LastError)
   198  			})
   199  		})
   200  
   201  		t.Run("Initialized is used instead of Interrupted during startup", func(t *testing.T) {
   202  			dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   203  				errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
   204  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo)
   205  				status1 := p.dataSourceUpdates.currentStatus
   206  				assert.Equal(t, intf.DataSourceStateInitializing, status1.State)
   207  				assert.Equal(t, errorInfo, status1.LastError)
   208  
   209  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   210  				status2 := p.dataSourceUpdates.currentStatus
   211  				assert.Equal(t, intf.DataSourceStateValid, status2.State)
   212  
   213  				p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, intf.DataSourceErrorInfo{})
   214  				status3 := p.dataSourceUpdates.currentStatus
   215  				assert.Equal(t, intf.DataSourceStateInterrupted, status3.State)
   216  				assert.Equal(t, errorInfo, status3.LastError)
   217  			})
   218  		})
   219  
   220  		t.Run("can log outage at Error level after timeout", TestDataSourceOutageLoggingTimeout)
   221  	})
   222  
   223  	t.Run("GetDataStoreStatusProvider", func(t *testing.T) {
   224  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   225  			assert.Equal(t, p.dataStoreStatusProvider, p.dataSourceUpdates.GetDataStoreStatusProvider())
   226  		})
   227  	})
   228  }
   229  
   230  func testDataSourceUpdatesImplSortsInitData(t *testing.T) {
   231  	// The logic for this is already tested in data_model_dependencies_test, but here we are verifying
   232  	// that DataSourceUpdatesImpl is actually using that logic.
   233  	dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   234  		inputData := makeDependencyOrderingDataSourceTestData()
   235  
   236  		result := p.dataSourceUpdates.Init(inputData)
   237  		require.True(t, result)
   238  
   239  		receivedData := p.store.WaitForNextInit(t, time.Second)
   240  
   241  		verifySortedData(t, receivedData, inputData)
   242  	})
   243  }
   244  
   245  func TestDataSourceUpdatesImplFlagChangeEvents(t *testing.T) {
   246  	t.Run("sends events on init for newly added flags", func(t *testing.T) {
   247  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   248  			builder := sharedtest.NewDataSetBuilder().
   249  				Flags(ldbuilders.NewFlagBuilder("flag1").Version(1).Build()).
   250  				Segments(ldbuilders.NewSegmentBuilder("segment1").Version(1).Build())
   251  
   252  			p.dataSourceUpdates.Init(builder.Build())
   253  
   254  			ch := p.flagChangeBroadcaster.AddListener()
   255  
   256  			builder.Flags(ldbuilders.NewFlagBuilder("flag2").Version(1).Build()).
   257  				Segments(ldbuilders.NewSegmentBuilder("segment2").Version(1).Build())
   258  			// the new segment triggers no events since nothing is using it
   259  
   260  			p.dataSourceUpdates.Init(builder.Build())
   261  
   262  			sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
   263  		})
   264  	})
   265  
   266  	t.Run("sends event on update for newly added flag", func(t *testing.T) {
   267  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   268  			builder := sharedtest.NewDataSetBuilder().
   269  				Flags(ldbuilders.NewFlagBuilder("flag1").Version(1).Build()).
   270  				Segments(ldbuilders.NewSegmentBuilder("segment1").Version(1).Build())
   271  
   272  			p.dataSourceUpdates.Init(builder.Build())
   273  
   274  			ch := p.flagChangeBroadcaster.AddListener()
   275  
   276  			flag2 := ldbuilders.NewFlagBuilder("flag2").Version(1).Build()
   277  			p.dataSourceUpdates.Upsert(datakinds.Features, flag2.Key, st.ItemDescriptor{Version: flag2.Version, Item: &flag2})
   278  
   279  			sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
   280  		})
   281  	})
   282  
   283  	t.Run("sends events on init for updated flags", func(t *testing.T) {
   284  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   285  			builder := sharedtest.NewDataSetBuilder().
   286  				Flags(
   287  					ldbuilders.NewFlagBuilder("flag1").Version(1).Build(),
   288  					ldbuilders.NewFlagBuilder("flag2").Version(1).Build(),
   289  				).
   290  				Segments(
   291  					ldbuilders.NewSegmentBuilder("segment1").Version(1).Build(),
   292  					ldbuilders.NewSegmentBuilder("segment2").Version(1).Build(),
   293  				)
   294  
   295  			p.dataSourceUpdates.Init(builder.Build())
   296  
   297  			ch := p.flagChangeBroadcaster.AddListener()
   298  
   299  			builder.Flags(
   300  				ldbuilders.NewFlagBuilder("flag2").Version(2).Build(), // modified flag
   301  			).Segments(
   302  				ldbuilders.NewSegmentBuilder("segment2").Version(2).Build(), // modified segment, but no one is using it
   303  			)
   304  
   305  			p.dataSourceUpdates.Init(builder.Build())
   306  
   307  			sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
   308  		})
   309  	})
   310  
   311  	t.Run("sends event on update for updated flag", func(t *testing.T) {
   312  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   313  			builder := sharedtest.NewDataSetBuilder().
   314  				Flags(
   315  					ldbuilders.NewFlagBuilder("flag1").Version(1).Build(),
   316  					ldbuilders.NewFlagBuilder("flag2").Version(1).Build(),
   317  				).
   318  				Segments(ldbuilders.NewSegmentBuilder("segment1").Version(1).Build())
   319  
   320  			p.dataSourceUpdates.Init(builder.Build())
   321  
   322  			ch := p.flagChangeBroadcaster.AddListener()
   323  
   324  			flag2 := ldbuilders.NewFlagBuilder("flag2").Version(2).Build()
   325  			p.dataSourceUpdates.Upsert(datakinds.Features, flag2.Key, st.ItemDescriptor{Version: flag2.Version, Item: &flag2})
   326  
   327  			sharedtest.ExpectFlagChangeEvents(t, ch, "flag2")
   328  		})
   329  	})
   330  
   331  	t.Run("does not send event on update if item was not really updated", func(t *testing.T) {
   332  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   333  			builder := sharedtest.NewDataSetBuilder().
   334  				Flags(
   335  					ldbuilders.NewFlagBuilder("flag1").Version(1).Build(),
   336  					ldbuilders.NewFlagBuilder("flag2").Version(1).Build(),
   337  				)
   338  
   339  			p.dataSourceUpdates.Init(builder.Build())
   340  
   341  			ch := p.flagChangeBroadcaster.AddListener()
   342  
   343  			flag2 := ldbuilders.NewFlagBuilder("flag2").Version(1).Build()
   344  			p.dataSourceUpdates.Upsert(datakinds.Features, flag2.Key, st.ItemDescriptor{Version: flag2.Version, Item: &flag2})
   345  
   346  			th.AssertNoMoreValues(t, ch, time.Millisecond*100)
   347  		})
   348  	})
   349  }
   350  
   351  func TestDataSourceOutageLoggingTimeout(t *testing.T) {
   352  	t.Run("does not log error if data source recovers before timeout", func(t *testing.T) {
   353  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   354  			errorInfo := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown}
   355  			p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo)
   356  			p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   357  
   358  			<-time.After(testDataSourceOutageTimeout)
   359  
   360  			assert.Len(t, p.mockLoggers.GetOutput(ldlog.Error), 0)
   361  		})
   362  	})
   363  
   364  	t.Run("logs error if data source does not recover before timeout", func(t *testing.T) {
   365  		dataSourceUpdateSinkImplTest(func(p dataSourceUpdateSinkImplTestParams) {
   366  			// simulate a series of consecutive errors
   367  			errorInfo1 := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown, Time: time.Now()}
   368  			p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo1)
   369  			errorInfo2 := intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindErrorResponse, StatusCode: 500, Time: time.Now()}
   370  			p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateInterrupted, errorInfo2)
   371  
   372  			<-time.After(testDataSourceOutageTimeout + (100 * time.Millisecond))
   373  
   374  			p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateValid, intf.DataSourceErrorInfo{})
   375  
   376  			<-time.After(testDataSourceOutageTimeout)
   377  
   378  			require.Len(t, p.mockLoggers.GetOutput(ldlog.Error), 1)
   379  			message := p.mockLoggers.GetOutput(ldlog.Error)[0]
   380  			assert.True(t, strings.HasPrefix(
   381  				message,
   382  				fmt.Sprintf(
   383  					"LaunchDarkly data source outage - updates have been unavailable for at least %s with the following errors:",
   384  					testDataSourceOutageTimeout,
   385  				)))
   386  			assert.Contains(t, message, "UNKNOWN (1 time)")
   387  			assert.Contains(t, message, "ERROR_RESPONSE(500) (1 time)")
   388  		})
   389  	})
   390  }
   391  

View as plain text