...
1 package ldevents
2
3 import (
4 "encoding/json"
5 "sync"
6 "time"
7
8 "github.com/launchdarkly/go-jsonstream/v3/jwriter"
9 "github.com/launchdarkly/go-sdk-common/v3/ldlog"
10 "github.com/launchdarkly/go-sdk-common/v3/ldtime"
11 "github.com/launchdarkly/go-sdk-common/v3/ldvalue"
12 )
13
14
15
16 type anyEventInput interface{}
17 type anyEventOutput interface{}
18
19 type defaultEventProcessor struct {
20 inboxCh chan eventDispatcherMessage
21 inboxFullOnce sync.Once
22 closeOnce sync.Once
23 loggers ldlog.Loggers
24 }
25
26 type eventDispatcher struct {
27 config EventsConfiguration
28 outbox *eventsOutbox
29 flushCh chan *flushPayload
30 workersGroup *sync.WaitGroup
31 userKeys lruCache
32 lastKnownPastTime ldtime.UnixMillisecondTime
33 deduplicatedContexts int
34 eventsInLastBatch int
35 disabled bool
36 currentTimestampFn func() ldtime.UnixMillisecondTime
37 stateLock sync.Mutex
38 }
39
40 type flushPayload struct {
41 diagnosticEvent ldvalue.Value
42 events []anyEventOutput
43 summary eventSummary
44 }
45
46
47 type eventDispatcherMessage interface{}
48
49 type sendEventMessage struct {
50 event anyEventInput
51 }
52
53 type flushEventsMessage struct {
54 replyCh chan struct{}
55 }
56
57 type shutdownEventsMessage struct {
58 replyCh chan struct{}
59 }
60
61 type syncEventsMessage struct {
62 replyCh chan struct{}
63 }
64
65 const (
66 maxFlushWorkers = 5
67 )
68
69
70 func NewDefaultEventProcessor(config EventsConfiguration) EventProcessor {
71 inboxCh := make(chan eventDispatcherMessage, config.Capacity)
72 startEventDispatcher(config, inboxCh)
73 return &defaultEventProcessor{
74 inboxCh: inboxCh,
75 loggers: config.Loggers,
76 }
77 }
78
79 func (ep *defaultEventProcessor) RecordEvaluation(ed EvaluationData) {
80 ep.postNonBlockingMessageToInbox(sendEventMessage{event: ed})
81 }
82
83 func (ep *defaultEventProcessor) RecordIdentifyEvent(e IdentifyEventData) {
84 ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
85 }
86
87 func (ep *defaultEventProcessor) RecordCustomEvent(e CustomEventData) {
88 ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
89 }
90
91 func (ep *defaultEventProcessor) RecordRawEvent(data json.RawMessage) {
92 ep.postNonBlockingMessageToInbox(sendEventMessage{event: rawEvent{data: data}})
93 }
94
95 func (ep *defaultEventProcessor) Flush() {
96 ep.postNonBlockingMessageToInbox(flushEventsMessage{})
97 }
98
99 func (ep *defaultEventProcessor) FlushBlocking(timeout time.Duration) bool {
100 replyCh := make(chan struct{}, 1)
101 m := flushEventsMessage{replyCh: replyCh}
102 ep.inboxCh <- m
103 var deadline <-chan time.Time
104 if timeout > 0 {
105 timer := time.NewTimer(timeout)
106 defer timer.Stop()
107 deadline = timer.C
108 }
109 select {
110 case <-m.replyCh:
111 return true
112 case <-deadline:
113 return false
114 }
115 }
116
117 func (ep *defaultEventProcessor) postNonBlockingMessageToInbox(e eventDispatcherMessage) {
118 select {
119 case ep.inboxCh <- e:
120 return
121 default:
122 }
123
124
125
126
127 ep.inboxFullOnce.Do(func() {
128 ep.loggers.Warn("Events are being produced faster than they can be processed; some events will be dropped")
129 })
130 }
131
132 func (ep *defaultEventProcessor) Close() error {
133 ep.closeOnce.Do(func() {
134
135
136
137 ep.inboxCh <- flushEventsMessage{}
138 m := shutdownEventsMessage{replyCh: make(chan struct{})}
139 ep.inboxCh <- m
140 <-m.replyCh
141 })
142 return nil
143 }
144
145 func startEventDispatcher(
146 config EventsConfiguration,
147 inboxCh <-chan eventDispatcherMessage,
148 ) {
149 ed := &eventDispatcher{
150 config: config,
151 outbox: newEventsOutbox(config.Capacity, config.Loggers),
152 flushCh: make(chan *flushPayload, 1),
153 workersGroup: &sync.WaitGroup{},
154 userKeys: newLruCache(config.UserKeysCapacity),
155 currentTimestampFn: config.currentTimeProvider,
156 }
157
158 if ed.currentTimestampFn == nil {
159 ed.currentTimestampFn = ldtime.UnixMillisNow
160 }
161
162 formatter := &eventOutputFormatter{
163 contextFormatter: newEventContextFormatter(config),
164 config: config,
165 }
166
167
168
169 for i := 0; i < maxFlushWorkers; i++ {
170 go runFlushTask(config, formatter, ed.flushCh, ed.workersGroup, ed.handleResult)
171 }
172 if config.DiagnosticsManager != nil {
173 event := config.DiagnosticsManager.CreateInitEvent()
174 ed.sendDiagnosticsEvent(event)
175 }
176 go ed.runMainLoop(inboxCh)
177 }
178
179 func (ed *eventDispatcher) runMainLoop(
180 inboxCh <-chan eventDispatcherMessage,
181 ) {
182 if err := recover(); err != nil {
183 ed.config.Loggers.Errorf("Unexpected panic in event processing thread: %+v", err)
184 }
185
186 flushInterval := ed.config.FlushInterval
187 if flushInterval <= 0 {
188 flushInterval = DefaultFlushInterval
189 }
190 userKeysFlushInterval := ed.config.UserKeysFlushInterval
191 if userKeysFlushInterval <= 0 {
192 userKeysFlushInterval = DefaultUserKeysFlushInterval
193 }
194 flushTicker := time.NewTicker(flushInterval)
195 usersResetTicker := time.NewTicker(userKeysFlushInterval)
196
197 var diagnosticsTicker *time.Ticker
198 var diagnosticsTickerCh <-chan time.Time
199 diagnosticsManager := ed.config.DiagnosticsManager
200 if diagnosticsManager != nil {
201 interval := ed.config.DiagnosticRecordingInterval
202 if interval > 0 {
203 if interval < MinimumDiagnosticRecordingInterval {
204 interval = DefaultDiagnosticRecordingInterval
205 }
206 } else {
207 if ed.config.forceDiagnosticRecordingInterval > 0 {
208 interval = ed.config.forceDiagnosticRecordingInterval
209 } else {
210 interval = DefaultDiagnosticRecordingInterval
211 }
212 }
213 diagnosticsTicker = time.NewTicker(interval)
214 diagnosticsTickerCh = diagnosticsTicker.C
215 }
216
217 for {
218
219
220 select {
221 case message := <-inboxCh:
222 switch m := message.(type) {
223 case sendEventMessage:
224 ed.processEvent(m.event)
225 case flushEventsMessage:
226 ed.triggerFlush()
227 if m.replyCh != nil {
228 ed.workersGroup.Wait()
229 m.replyCh <- struct{}{}
230 }
231 case syncEventsMessage:
232 ed.workersGroup.Wait()
233 m.replyCh <- struct{}{}
234 case shutdownEventsMessage:
235 flushTicker.Stop()
236 usersResetTicker.Stop()
237 if diagnosticsTicker != nil {
238 diagnosticsTicker.Stop()
239 }
240 ed.workersGroup.Wait()
241 close(ed.flushCh)
242 m.replyCh <- struct{}{}
243 return
244 }
245 case <-flushTicker.C:
246 ed.triggerFlush()
247 case <-usersResetTicker.C:
248 ed.userKeys.clear()
249 case <-diagnosticsTickerCh:
250 if diagnosticsManager == nil || !diagnosticsManager.CanSendStatsEvent() {
251
252 break
253 }
254 event := diagnosticsManager.CreateStatsEventAndReset(
255 ed.outbox.droppedEvents,
256 ed.deduplicatedContexts,
257 ed.eventsInLastBatch,
258 )
259 ed.outbox.droppedEvents = 0
260 ed.deduplicatedContexts = 0
261 ed.eventsInLastBatch = 0
262 ed.sendDiagnosticsEvent(event)
263 }
264 }
265 }
266
267 func (ed *eventDispatcher) processEvent(evt anyEventInput) {
268
269
270 willAddFullEvent := true
271 var debugEvent anyEventInput
272 inlinedUser := false
273 var eventContext EventInputContext
274 var creationDate ldtime.UnixMillisecondTime
275 switch evt := evt.(type) {
276 case EvaluationData:
277 eventContext = evt.Context
278 creationDate = evt.CreationDate
279 ed.outbox.addToSummary(evt)
280 willAddFullEvent = evt.RequireFullEvent
281 if ed.shouldDebugEvent(&evt) {
282 de := evt
283 de.debug = true
284 debugEvent = de
285 }
286 case IdentifyEventData:
287 eventContext = evt.Context
288 creationDate = evt.CreationDate
289 inlinedUser = true
290 case CustomEventData:
291 eventContext = evt.Context
292 creationDate = evt.CreationDate
293 default:
294 ed.outbox.addEvent(evt)
295 return
296 }
297
298
299 alreadySeenUser := ed.userKeys.add(eventContext.context.FullyQualifiedKey())
300 if !(willAddFullEvent && inlinedUser) {
301 if alreadySeenUser {
302 ed.deduplicatedContexts++
303 } else {
304 indexEvent := indexEvent{
305 BaseEvent{CreationDate: creationDate, Context: eventContext},
306 }
307 ed.outbox.addEvent(indexEvent)
308 }
309 }
310 if willAddFullEvent {
311 ed.outbox.addEvent(evt)
312 }
313 if debugEvent != nil {
314 ed.outbox.addEvent(debugEvent)
315 }
316 }
317
318 func (ed *eventDispatcher) shouldDebugEvent(evt *EvaluationData) bool {
319 if evt.DebugEventsUntilDate == 0 {
320 return false
321 }
322
323
324
325
326 ed.stateLock.Lock()
327 defer ed.stateLock.Unlock()
328 return evt.DebugEventsUntilDate > ed.lastKnownPastTime &&
329 evt.DebugEventsUntilDate > ed.currentTimestampFn()
330 }
331
332
333 func (ed *eventDispatcher) triggerFlush() {
334 if ed.isDisabled() {
335 ed.outbox.clear()
336 return
337 }
338
339 payload := ed.outbox.getPayload()
340 totalEventCount := len(payload.events)
341 if payload.summary.hasCounters() {
342 totalEventCount++
343 }
344 if totalEventCount == 0 {
345 ed.eventsInLastBatch = 0
346 return
347 }
348 ed.workersGroup.Add(1)
349 select {
350 case ed.flushCh <- &payload:
351
352
353
354 ed.eventsInLastBatch = totalEventCount
355 ed.outbox.clear()
356 default:
357
358
359 ed.workersGroup.Done()
360 }
361 }
362
363 func (ed *eventDispatcher) isDisabled() bool {
364
365 ed.stateLock.Lock()
366 defer ed.stateLock.Unlock()
367 return ed.disabled
368 }
369
370 func (ed *eventDispatcher) handleResult(result EventSenderResult) {
371 if result.MustShutDown {
372 ed.stateLock.Lock()
373 defer ed.stateLock.Unlock()
374 ed.disabled = true
375 } else if result.TimeFromServer > 0 {
376 ed.stateLock.Lock()
377 defer ed.stateLock.Unlock()
378 ed.lastKnownPastTime = result.TimeFromServer
379 }
380 }
381
382 func (ed *eventDispatcher) sendDiagnosticsEvent(
383 event ldvalue.Value,
384 ) {
385 payload := flushPayload{diagnosticEvent: event}
386 ed.workersGroup.Add(1)
387 select {
388 case ed.flushCh <- &payload:
389
390
391 default:
392
393
394
395
396 ed.workersGroup.Done()
397 }
398 }
399
400 func runFlushTask(config EventsConfiguration, formatter *eventOutputFormatter, flushCh <-chan *flushPayload,
401 workersGroup *sync.WaitGroup, resultFn func(EventSenderResult)) {
402 for {
403 payload, more := <-flushCh
404 if !more {
405
406 break
407 }
408 if !payload.diagnosticEvent.IsNull() {
409 w := jwriter.NewWriter()
410 payload.diagnosticEvent.WriteToJSONWriter(&w)
411 bytes := w.Bytes()
412 _ = config.EventSender.SendEventData(DiagnosticEventDataKind, bytes, 1)
413 } else {
414 bytes, count := formatter.makeOutputEvents(payload.events, payload.summary)
415 if len(bytes) > 0 {
416 result := config.EventSender.SendEventData(AnalyticsEventDataKind, bytes, count)
417 resultFn(result)
418 }
419 }
420 workersGroup.Done()
421 }
422 }
423
View as plain text