1 package datasource
2
3 import (
4 "testing"
5 "time"
6
7 "github.com/stretchr/testify/assert"
8 "github.com/stretchr/testify/require"
9
10 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
11 intf "github.com/launchdarkly/go-server-sdk/v6/interfaces"
12 "github.com/launchdarkly/go-server-sdk/v6/internal"
13 "github.com/launchdarkly/go-server-sdk/v6/internal/datastore"
14 "github.com/launchdarkly/go-server-sdk/v6/internal/sharedtest"
15 )
16
17 type dataSourceStatusProviderImplTestParams struct {
18 dataSourceStatusProvider interfaces.DataSourceStatusProvider
19 dataSourceUpdates *DataSourceUpdateSinkImpl
20 }
21
22 func dataSourceStatusProviderImplTest(action func(dataSourceStatusProviderImplTestParams)) {
23 p := dataSourceStatusProviderImplTestParams{}
24 statusBroadcaster := internal.NewBroadcaster[interfaces.DataSourceStatus]()
25 defer statusBroadcaster.Close()
26 flagBroadcaster := internal.NewBroadcaster[interfaces.FlagChangeEvent]()
27 defer flagBroadcaster.Close()
28 store := datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())
29 dataStoreStatusProvider := datastore.NewDataStoreStatusProviderImpl(store, nil)
30 p.dataSourceUpdates = NewDataSourceUpdateSinkImpl(store, dataStoreStatusProvider, statusBroadcaster, flagBroadcaster,
31 0, sharedtest.NewTestLoggers())
32 p.dataSourceStatusProvider = NewDataSourceStatusProviderImpl(statusBroadcaster, p.dataSourceUpdates)
33
34 action(p)
35 }
36
37 func makeDataSourceErrorInfo() intf.DataSourceErrorInfo {
38 return intf.DataSourceErrorInfo{Kind: intf.DataSourceErrorKindUnknown, Message: "sorry", Time: time.Now()}
39 }
40
41 func TestDataSourceStatusProviderImpl(t *testing.T) {
42 t.Run("GetStatus", func(t *testing.T) {
43 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
44 errorInfo := makeDataSourceErrorInfo()
45 p.dataSourceUpdates.UpdateStatus(intf.DataSourceStateOff, errorInfo)
46
47 status := p.dataSourceStatusProvider.GetStatus()
48 assert.Equal(t, intf.DataSourceStateOff, status.State)
49 assert.Equal(t, errorInfo, status.LastError)
50 })
51 })
52
53 t.Run("listeners", func(t *testing.T) {
54 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
55 ch1 := p.dataSourceStatusProvider.AddStatusListener()
56 ch2 := p.dataSourceStatusProvider.AddStatusListener()
57 ch3 := p.dataSourceStatusProvider.AddStatusListener()
58 p.dataSourceStatusProvider.RemoveStatusListener(ch2)
59
60 errorInfo := makeDataSourceErrorInfo()
61 p.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
62
63 require.Len(t, ch1, 1)
64 require.Len(t, ch2, 0)
65 require.Len(t, ch3, 1)
66 status1 := <-ch1
67 status3 := <-ch3
68 assert.Equal(t, intf.DataSourceStateOff, status1.State)
69 assert.Equal(t, errorInfo, status1.LastError)
70 assert.Equal(t, status1, status3)
71 })
72 })
73
74 t.Run("WaitFor", func(t *testing.T) {
75 t.Run("returns true immediately when status is already correct", func(t *testing.T) {
76 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
77 p.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
78
79 success := p.dataSourceStatusProvider.WaitFor(interfaces.DataSourceStateValid, 500*time.Millisecond)
80 assert.True(t, success)
81 })
82 })
83
84 t.Run("returns false immediately when status is already Off", func(t *testing.T) {
85 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
86 timeStart := time.Now()
87 p.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
88 success := p.dataSourceStatusProvider.WaitFor(interfaces.DataSourceStateValid, 500*time.Millisecond)
89 assert.False(t, success)
90 assert.True(t, time.Now().Sub(timeStart) < 500*time.Millisecond)
91 })
92 })
93
94 t.Run("succeeds after status change", func(t *testing.T) {
95 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
96 go func() {
97 <-time.After(100 * time.Millisecond)
98 p.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
99 }()
100 success := p.dataSourceStatusProvider.WaitFor(interfaces.DataSourceStateValid, 500*time.Millisecond)
101 assert.True(t, success)
102 })
103 })
104
105 t.Run("times out", func(t *testing.T) {
106 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
107 timeStart := time.Now()
108 success := p.dataSourceStatusProvider.WaitFor(interfaces.DataSourceStateValid, 300*time.Millisecond)
109 assert.False(t, success)
110 assert.True(t, time.Now().Sub(timeStart) >= 270*time.Millisecond)
111 })
112 })
113
114 t.Run("ends if shut down", func(t *testing.T) {
115 dataSourceStatusProviderImplTest(func(p dataSourceStatusProviderImplTestParams) {
116 go func() {
117 <-time.After(100 * time.Millisecond)
118 p.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
119 }()
120 timeStart := time.Now()
121 success := p.dataSourceStatusProvider.WaitFor(interfaces.DataSourceStateValid, 500*time.Millisecond)
122 assert.False(t, success)
123 assert.True(t, time.Now().Sub(timeStart) < 500*time.Millisecond)
124 })
125 })
126 })
127 }
128
View as plain text