...
1 package datasource
2
3 import (
4 "sync"
5 "time"
6
7 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
8 "github.com/launchdarkly/go-server-sdk/v6/interfaces"
9 "github.com/launchdarkly/go-server-sdk/v6/internal"
10 "github.com/launchdarkly/go-server-sdk/v6/subsystems"
11 )
12
13 const (
14 pollingErrorContext = "on polling request"
15 pollingWillRetryMessage = "will retry at next scheduled poll interval"
16 )
17
18
19
20
21
22
23 type PollingProcessor struct {
24 dataSourceUpdates subsystems.DataSourceUpdateSink
25 requestor requestor
26 pollInterval time.Duration
27 loggers ldlog.Loggers
28 setInitializedOnce sync.Once
29 isInitialized internal.AtomicBoolean
30 quit chan struct{}
31 closeOnce sync.Once
32 }
33
34
35 func NewPollingProcessor(
36 context subsystems.ClientContext,
37 dataSourceUpdates subsystems.DataSourceUpdateSink,
38 baseURI string,
39 pollInterval time.Duration,
40 ) *PollingProcessor {
41 requestor := newRequestorImpl(context, context.GetHTTP().CreateHTTPClient(), baseURI)
42 return newPollingProcessor(context, dataSourceUpdates, requestor, pollInterval)
43 }
44
45 func newPollingProcessor(
46 context subsystems.ClientContext,
47 dataSourceUpdates subsystems.DataSourceUpdateSink,
48 requestor requestor,
49 pollInterval time.Duration,
50 ) *PollingProcessor {
51 pp := &PollingProcessor{
52 dataSourceUpdates: dataSourceUpdates,
53 requestor: requestor,
54 pollInterval: pollInterval,
55 loggers: context.GetLogging().Loggers,
56 quit: make(chan struct{}),
57 }
58
59 return pp
60 }
61
62
63 func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
64 pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)
65
66 ticker := newTickerWithInitialTick(pp.pollInterval)
67
68 go func() {
69 defer ticker.Stop()
70
71 var readyOnce sync.Once
72 notifyReady := func() {
73 readyOnce.Do(func() {
74 close(closeWhenReady)
75 })
76 }
77
78 defer notifyReady()
79
80 for {
81 select {
82 case <-pp.quit:
83 return
84 case <-ticker.C:
85 if err := pp.poll(); err != nil {
86 if hse, ok := err.(httpStatusError); ok {
87 errorInfo := interfaces.DataSourceErrorInfo{
88 Kind: interfaces.DataSourceErrorKindErrorResponse,
89 StatusCode: hse.Code,
90 Time: time.Now(),
91 }
92 recoverable := checkIfErrorIsRecoverableAndLog(
93 pp.loggers,
94 httpErrorDescription(hse.Code),
95 pollingErrorContext,
96 hse.Code,
97 pollingWillRetryMessage,
98 )
99 if recoverable {
100 pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
101 } else {
102 pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
103 notifyReady()
104 return
105 }
106 } else {
107 errorInfo := interfaces.DataSourceErrorInfo{
108 Kind: interfaces.DataSourceErrorKindNetworkError,
109 Message: err.Error(),
110 Time: time.Now(),
111 }
112 if _, ok := err.(malformedJSONError); ok {
113 errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData
114 }
115 checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
116 pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
117 }
118 continue
119 }
120 pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
121 pp.setInitializedOnce.Do(func() {
122 pp.isInitialized.Set(true)
123 pp.loggers.Info("First polling request successful")
124 notifyReady()
125 })
126 }
127 }
128 }()
129 }
130
131 func (pp *PollingProcessor) poll() error {
132 allData, cached, err := pp.requestor.requestAll()
133
134 if err != nil {
135 return err
136 }
137
138
139 if !cached {
140 pp.dataSourceUpdates.Init(allData)
141 }
142 return nil
143 }
144
145
146 func (pp *PollingProcessor) Close() error {
147 pp.closeOnce.Do(func() {
148 close(pp.quit)
149 })
150 return nil
151 }
152
153
154 func (pp *PollingProcessor) IsInitialized() bool {
155 return pp.isInitialized.Get()
156 }
157
158
159 func (pp *PollingProcessor) GetBaseURI() string {
160 return (pp.requestor.(*requestorImpl)).baseURI
161 }
162
163
164 func (pp *PollingProcessor) GetPollInterval() time.Duration {
165 return pp.pollInterval
166 }
167
168 type tickerWithInitialTick struct {
169 *time.Ticker
170 C <-chan time.Time
171 }
172
173 func newTickerWithInitialTick(interval time.Duration) *tickerWithInitialTick {
174 c := make(chan time.Time)
175 ticker := time.NewTicker(interval)
176 t := &tickerWithInitialTick{
177 C: c,
178 Ticker: ticker,
179 }
180 go func() {
181 c <- time.Now()
182 for tt := range ticker.C {
183 c <- tt
184 }
185 }()
186 return t
187 }
188
View as plain text