1 package ldevents
2
3 import (
4 "encoding/json"
5 "testing"
6 "time"
7
8 "github.com/launchdarkly/go-sdk-common/v3/ldcontext"
9 "github.com/launchdarkly/go-sdk-common/v3/ldreason"
10 "github.com/launchdarkly/go-sdk-common/v3/ldtime"
11 "github.com/launchdarkly/go-sdk-common/v3/lduser"
12 "github.com/launchdarkly/go-sdk-common/v3/ldvalue"
13
14 "github.com/launchdarkly/go-test-helpers/v3/jsonhelpers"
15 m "github.com/launchdarkly/go-test-helpers/v3/matchers"
16
17 "github.com/stretchr/testify/assert"
18 "github.com/stretchr/testify/require"
19 )
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 func withAndWithoutPrivateAttrs(t *testing.T, action func(*testing.T, EventsConfiguration)) {
45 t.Run("without private attributes", func(t *testing.T) {
46 action(t, basicConfigWithoutPrivateAttrs())
47 })
48
49 t.Run("with private attributes", func(t *testing.T) {
50 config := basicConfigWithoutPrivateAttrs()
51 config.AllAttributesPrivate = true
52 action(t, config)
53 })
54 }
55
56 func withFeatureEventOrCustomEvent(
57 t *testing.T,
58 action func(
59 t *testing.T,
60 sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher),
61 finalEventMatchers []m.Matcher),
62 ) {
63 t.Run("from feature event", func(t *testing.T) {
64 flag := FlagEventProperties{Key: "flagkey", Version: 11}
65 action(t,
66 func(ep EventProcessor, context EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher) {
67 fe := defaultEventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
68 ep.RecordEvaluation(fe)
69 return fe, fe.CreationDate, nil
70 },
71 []m.Matcher{anySummaryEvent()})
72 })
73
74 t.Run("from custom event", func(t *testing.T) {
75 action(t,
76 func(ep EventProcessor, context EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher) {
77 ce := defaultEventFactory.NewCustomEventData("eventkey", context, ldvalue.Null(), false, 0)
78 ep.RecordCustomEvent(ce)
79 return ce, ce.CreationDate, []m.Matcher{anyCustomEvent()}
80 },
81 nil)
82 })
83 }
84
85 func TestIdentifyEventProperties(t *testing.T) {
86 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
87 ep, es := createEventProcessorAndSender(config)
88 defer ep.Close()
89
90 context := basicContext()
91 ie := defaultEventFactory.NewIdentifyEventData(context)
92 ep.RecordIdentifyEvent(ie)
93 ep.Flush()
94
95 assertEventsReceived(t, es, m.JSONEqual(map[string]interface{}{
96 "kind": "identify",
97 "creationDate": ie.CreationDate,
98 "context": contextJSON(context, config),
99 }))
100 es.assertNoMoreEvents(t)
101 })
102 }
103
104 func TestFeatureEventIsSummarizedAndNotTrackedByDefault(t *testing.T) {
105 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
106 ep, es := createEventProcessorAndSender(config)
107 defer ep.Close()
108
109 flag := FlagEventProperties{Key: "flagkey", Version: 11}
110 fe := defaultEventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
111 ep.RecordEvaluation(fe)
112 ep.Flush()
113
114 assertEventsReceived(t, es,
115 anyIndexEvent(),
116 summaryEventWithFlag(flag, summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
117 )
118 es.assertNoMoreEvents(t)
119 })
120 }
121
122 func TestIndividualFeatureEventIsQueuedWhenTrackEventsIsTrue(t *testing.T) {
123 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
124 ep, es := createEventProcessorAndSender(config)
125 defer ep.Close()
126
127 flag := FlagEventProperties{Key: "flagkey", Version: 11, RequireFullEvent: true}
128 fe := defaultEventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
129 ep.RecordEvaluation(fe)
130 ep.Flush()
131
132 assertEventsReceived(t, es,
133 anyIndexEvent(),
134 featureEventWithAllProperties(fe, flag),
135
136 summaryEventWithFlag(flag,
137 summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
138 )
139 es.assertNoMoreEvents(t)
140 })
141 }
142
143 func TestIndexEventProperties(t *testing.T) {
144 withFeatureEventOrCustomEvent(t,
145 func(t *testing.T, sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher), finalEventMatchers []m.Matcher) {
146 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
147 ep, es := createEventProcessorAndSender(config)
148 defer ep.Close()
149
150 context := basicContext()
151
152 _, creationDate, allEventMatchers := sendEventFn(ep, context)
153 ep.Flush()
154
155 allEventMatchers = append(allEventMatchers,
156 m.JSONEqual(map[string]interface{}{
157 "kind": "index",
158 "creationDate": creationDate,
159 "context": contextJSON(context, config),
160 }))
161 allEventMatchers = append(allEventMatchers, finalEventMatchers...)
162 assertEventsReceived(t, es, allEventMatchers...)
163 es.assertNoMoreEvents(t)
164 })
165 })
166 }
167
168 func TestIndexEventContextKeysAreDeduplicatedForSameKind(t *testing.T) {
169 withFeatureEventOrCustomEvent(t,
170 func(t *testing.T, sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher), finalEventMatchers []m.Matcher) {
171 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
172 ep, es := createEventProcessorAndSender(config)
173 defer ep.Close()
174
175 context := Context(ldcontext.New("my-key"))
176
177 _, creationDate, allEventMatchers := sendEventFn(ep, context)
178 _, _, moreMatchers := sendEventFn(ep, context)
179 allEventMatchers = append(allEventMatchers, moreMatchers...)
180 ep.Flush()
181
182 allEventMatchers = append(allEventMatchers,
183 m.JSONEqual(map[string]interface{}{
184 "kind": "index",
185 "creationDate": creationDate,
186 "context": contextJSON(context, config),
187 }))
188 allEventMatchers = append(allEventMatchers, finalEventMatchers...)
189 assertEventsReceived(t, es, allEventMatchers...)
190 es.assertNoMoreEvents(t)
191 })
192 })
193 }
194
195 func TestIndexEventContextKeysAreDeduplicatedSeparatelyForDifferentKinds(t *testing.T) {
196 withFeatureEventOrCustomEvent(t,
197 func(t *testing.T, sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher), finalEventMatchers []m.Matcher) {
198 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
199 ep, es := createEventProcessorAndSender(config)
200 defer ep.Close()
201
202 key := "my-key"
203 context1 := Context(ldcontext.New(key))
204 context2 := Context(ldcontext.NewWithKind("org", key))
205 context3 := Context(ldcontext.NewMulti(ldcontext.New(key), ldcontext.NewWithKind("other", key)))
206
207 _, creationDate1, allEventMatchers := sendEventFn(ep, context1)
208 _, creationDate2, moreMatchers := sendEventFn(ep, context2)
209 allEventMatchers = append(allEventMatchers, moreMatchers...)
210 _, creationDate3, moreMatchers := sendEventFn(ep, context3)
211 allEventMatchers = append(allEventMatchers, moreMatchers...)
212 ep.Flush()
213
214 allEventMatchers = append(allEventMatchers,
215 m.JSONEqual(map[string]interface{}{
216 "kind": "index",
217 "creationDate": creationDate1,
218 "context": contextJSON(context1, config),
219 }),
220 m.JSONEqual(map[string]interface{}{
221 "kind": "index",
222 "creationDate": creationDate2,
223 "context": contextJSON(context2, config),
224 }),
225 m.JSONEqual(map[string]interface{}{
226 "kind": "index",
227 "creationDate": creationDate3,
228 "context": contextJSON(context3, config),
229 }))
230 allEventMatchers = append(allEventMatchers, finalEventMatchers...)
231 assertEventsReceived(t, es, allEventMatchers...)
232 es.assertNoMoreEvents(t)
233 })
234 })
235 }
236
237 func TestDebugEventProperties(t *testing.T) {
238 withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
239 ep, es := createEventProcessorAndSender(config)
240 defer ep.Close()
241
242 context := basicContext()
243 flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: ldtime.UnixMillisNow() + 1000000}
244 fe := defaultEventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
245 ep.RecordEvaluation(fe)
246 ep.Flush()
247
248 assertEventsReceived(t, es,
249 anyIndexEvent(),
250 debugEventWithAllProperties(fe, flag, contextJSON(context, config)),
251 anySummaryEvent(),
252 )
253 es.assertNoMoreEvents(t)
254 })
255 }
256
257 func TestFeatureEventCanContainReason(t *testing.T) {
258 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
259 defer ep.Close()
260
261 flag := FlagEventProperties{Key: "flagkey", Version: 11, RequireFullEvent: true}
262 fe := defaultEventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
263 fe.Reason = ldreason.NewEvalReasonFallthrough()
264 ep.RecordEvaluation(fe)
265 ep.Flush()
266
267 assertEventsReceived(t, es,
268 anyIndexEvent(),
269 featureEventWithAllProperties(fe, flag),
270 anySummaryEvent(),
271 )
272 es.assertNoMoreEvents(t)
273 }
274
275 func TestDebugEventIsAddedIfFlagIsTemporarilyInDebugMode(t *testing.T) {
276 fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
277 config := basicConfigWithoutPrivateAttrs()
278 config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
279 eventFactory := NewEventFactory(false, config.currentTimeProvider)
280
281 ep, es := createEventProcessorAndSender(config)
282 defer ep.Close()
283
284 context := basicContext()
285 futureTime := fakeTimeNow + 100
286 flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: futureTime}
287 fe := eventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
288 ep.RecordEvaluation(fe)
289 ep.Flush()
290
291 assertEventsReceived(t, es,
292 anyIndexEvent(),
293 debugEventWithAllProperties(fe, flag, contextJSON(context, config)),
294 summaryEventWithFlag(flag, summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
295 )
296 es.assertNoMoreEvents(t)
297 }
298
299 func TestEventCanBeBothTrackedAndDebugged(t *testing.T) {
300 fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
301 config := basicConfigWithoutPrivateAttrs()
302 config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
303 eventFactory := NewEventFactory(false, config.currentTimeProvider)
304
305 ep, es := createEventProcessorAndSender(config)
306 defer ep.Close()
307
308 context := basicContext()
309 futureTime := fakeTimeNow + 100
310 flag := FlagEventProperties{Key: "flagkey", Version: 11, RequireFullEvent: true, DebugEventsUntilDate: futureTime}
311 fe := eventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
312 ep.RecordEvaluation(fe)
313 ep.Flush()
314
315 assertEventsReceived(t, es,
316 anyIndexEvent(),
317 featureEventWithAllProperties(fe, flag),
318 debugEventWithAllProperties(fe, flag, contextJSON(context, config)),
319 summaryEventWithFlag(flag, summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
320 )
321 es.assertNoMoreEvents(t)
322 }
323
324 func TestDebugModeExpiresBasedOnClientTimeIfClientTimeIsLater(t *testing.T) {
325 fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
326 config := basicConfigWithoutPrivateAttrs()
327 config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
328 eventFactory := NewEventFactory(false, config.currentTimeProvider)
329
330 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
331 defer ep.Close()
332
333
334 serverTime := fakeTimeNow - 20000
335 es.result = EventSenderResult{Success: true, TimeFromServer: serverTime}
336
337
338 ie := eventFactory.NewIdentifyEventData(basicContext())
339 ep.RecordIdentifyEvent(ie)
340 ep.Flush()
341 assertEventsReceived(t, es, anyIdentifyEvent())
342
343
344
345 debugUntil := serverTime + 1000
346 flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: debugUntil}
347 fe := eventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
348 ep.RecordEvaluation(fe)
349 ep.Flush()
350
351
352 assertEventsReceived(t, es, anySummaryEvent())
353 es.assertNoMoreEvents(t)
354 }
355
356 func TestDebugModeExpiresBasedOnServerTimeIfServerTimeIsLater(t *testing.T) {
357 fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
358 config := basicConfigWithoutPrivateAttrs()
359 config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
360 eventFactory := NewEventFactory(false, config.currentTimeProvider)
361
362 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
363 defer ep.Close()
364
365
366 serverTime := fakeTimeNow + 20000
367 es.result = EventSenderResult{Success: true, TimeFromServer: serverTime}
368
369
370 ie := eventFactory.NewIdentifyEventData(basicContext())
371 ep.RecordIdentifyEvent(ie)
372 ep.Flush()
373 assertEventsReceived(t, es, anyIdentifyEvent())
374
375
376
377 debugUntil := serverTime - 1000
378 flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: debugUntil}
379 fe := eventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
380 ep.RecordEvaluation(fe)
381 ep.Flush()
382
383
384 assertEventsReceived(t, es, anySummaryEvent())
385 es.assertNoMoreEvents(t)
386 }
387
388 func TestNonTrackedEventsAreSummarized(t *testing.T) {
389 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
390 defer ep.Close()
391
392 context := basicContext()
393 flag1 := FlagEventProperties{Key: "flagkey1", Version: 11}
394 flag2 := FlagEventProperties{Key: "flagkey2", Version: 22}
395 flag1Eval := ldreason.NewEvaluationDetail(ldvalue.String("value1"), 2, noReason)
396 flag2Eval := ldreason.NewEvaluationDetail(ldvalue.String("value2"), 3, noReason)
397 fe1 := defaultEventFactory.NewEvaluationData(flag1, context, flag1Eval, false, ldvalue.Null(), "")
398 fe2 := defaultEventFactory.NewEvaluationData(flag2, context, flag2Eval, false, ldvalue.Null(), "")
399 fe3 := defaultEventFactory.NewEvaluationData(flag2, context, flag2Eval, false, ldvalue.Null(), "")
400 ep.RecordEvaluation(fe1)
401 ep.RecordEvaluation(fe2)
402 ep.RecordEvaluation(fe3)
403 ep.Flush()
404
405 assertEventsReceived(t, es, anyIndexEvent())
406
407 assertEventsReceived(t, es, m.AllOf(
408 m.JSONProperty("startDate").Should(equalNumericTime(fe1.CreationDate)),
409 m.JSONProperty("endDate").Should(equalNumericTime(fe3.CreationDate)),
410 summaryEventWithFlag(flag1, summaryCounterPropsFromEval(flag1Eval, 1)),
411 summaryEventWithFlag(flag2, summaryCounterPropsFromEval(flag2Eval, 2)),
412 ))
413
414 es.assertNoMoreEvents(t)
415 }
416
417 func TestCustomEventProperties(t *testing.T) {
418 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
419 defer ep.Close()
420
421 context := basicContext()
422 data := ldvalue.ObjectBuild().SetString("thing", "stuff").Build()
423 ce := defaultEventFactory.NewCustomEventData("eventkey", context, data, false, 0)
424 ep.RecordCustomEvent(ce)
425 ep.Flush()
426
427 customEventMatcher := m.JSONEqual(map[string]interface{}{
428 "kind": "custom",
429 "creationDate": ce.CreationDate,
430 "key": ce.Key,
431 "data": data,
432 "contextKeys": expectedContextKeys(context.context),
433 })
434 assertEventsReceived(t, es,
435 anyIndexEvent(),
436 customEventMatcher,
437 )
438 es.assertNoMoreEvents(t)
439 }
440
441 func TestCustomEventCanHaveMetricValue(t *testing.T) {
442 config := basicConfigWithoutPrivateAttrs()
443 ep, es := createEventProcessorAndSender(config)
444 defer ep.Close()
445
446 context := basicContext()
447 data := ldvalue.ObjectBuild().SetString("thing", "stuff").Build()
448 metric := float64(2.5)
449 ce := defaultEventFactory.NewCustomEventData("eventkey", context, data, true, metric)
450 ep.RecordCustomEvent(ce)
451 ep.Flush()
452
453 customEventMatcher := m.JSONEqual(map[string]interface{}{
454 "kind": "custom",
455 "creationDate": ce.CreationDate,
456 "key": ce.Key,
457 "data": data,
458 "metricValue": metric,
459 "contextKeys": expectedContextKeys(context.context),
460 })
461 assertEventsReceived(t, es,
462 anyIndexEvent(),
463 customEventMatcher,
464 )
465 es.assertNoMoreEvents(t)
466 }
467
468 func TestRawEventIsQueued(t *testing.T) {
469 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
470 defer ep.Close()
471
472 rawData := json.RawMessage(`{"kind":"alias","arbitrary":["we","don't","care","what's","in","here"]}`)
473 ep.RecordRawEvent(rawData)
474 ep.Flush()
475 ep.waitUntilInactive()
476
477 assertEventsReceived(t, es, m.JSONEqual(rawData))
478 es.assertNoMoreEvents(t)
479 }
480
481 func TestPeriodicFlush(t *testing.T) {
482 config := basicConfigWithoutPrivateAttrs()
483 config.FlushInterval = 10 * time.Millisecond
484 ep, es := createEventProcessorAndSender(config)
485 defer ep.Close()
486
487 context := basicContext()
488 ie := defaultEventFactory.NewIdentifyEventData(context)
489 ep.RecordIdentifyEvent(ie)
490
491 assertEventsReceived(t, es, identifyEventForContextKey(context.context.Key()))
492 es.assertNoMoreEvents(t)
493 }
494
495 func TestBlockingFlush(t *testing.T) {
496 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
497 defer ep.Close()
498
499 senderGateCh := make(chan struct{}, 1)
500 senderWaitingCh := make(chan struct{}, 1)
501 es.setGate(senderGateCh, senderWaitingCh)
502
503 didFlush := make(chan struct{}, 1)
504 go func() {
505 <-senderWaitingCh
506 time.Sleep(time.Millisecond * 100)
507 didFlush <- struct{}{}
508 senderGateCh <- struct{}{}
509 }()
510
511 ep.RecordIdentifyEvent(defaultEventFactory.NewIdentifyEventData(basicContext()))
512 success := ep.FlushBlocking(time.Second)
513
514 assert.True(t, success)
515 assert.NotEqual(t, 0, len(didFlush))
516 }
517
518 func TestBlockingFlushTimeout(t *testing.T) {
519 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
520 defer ep.Close()
521
522 senderGateCh := make(chan struct{}, 1)
523 senderWaitingCh := make(chan struct{}, 1)
524 es.setGate(senderGateCh, senderWaitingCh)
525
526 didFlush := make(chan struct{}, 1)
527 go func() {
528 <-senderWaitingCh
529 time.Sleep(time.Millisecond * 500)
530 didFlush <- struct{}{}
531 senderGateCh <- struct{}{}
532 }()
533
534 ep.RecordIdentifyEvent(defaultEventFactory.NewIdentifyEventData(basicContext()))
535 success := ep.FlushBlocking(time.Millisecond * 50)
536
537 assert.False(t, success)
538 assert.Equal(t, 0, len(didFlush))
539 }
540
541 func TestClosingEventProcessorForcesSynchronousFlush(t *testing.T) {
542 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
543 defer ep.Close()
544
545 context := basicContext()
546 ie := defaultEventFactory.NewIdentifyEventData(context)
547 ep.RecordIdentifyEvent(ie)
548 ep.Close()
549
550 assertEventsReceived(t, es, identifyEventForContextKey(context.context.Key()))
551 es.assertNoMoreEvents(t)
552 }
553
554 func TestPeriodicUserKeysFlush(t *testing.T) {
555
556
557 config := basicConfigWithoutPrivateAttrs()
558 config.UserKeysFlushInterval = time.Millisecond * 100
559 ep, es := createEventProcessorAndSender(config)
560 defer ep.Close()
561
562 context := basicContext()
563 event1 := defaultEventFactory.NewCustomEventData("event1", context, ldvalue.Null(), false, 0)
564 event2 := defaultEventFactory.NewCustomEventData("event2", context, ldvalue.Null(), false, 0)
565 ep.RecordCustomEvent(event1)
566 ep.RecordCustomEvent(event2)
567 ep.Flush()
568
569
570
571 assertEventsReceived(t, es,
572 indexEventForContextKey(context.context.Key()),
573 customEventWithEventKey("event1"),
574 customEventWithEventKey("event2"),
575 )
576
577
578 <-time.After(200 * time.Millisecond)
579
580
581 event3 := defaultEventFactory.NewCustomEventData("event3", context, ldvalue.Null(), false, 0)
582 ep.RecordCustomEvent(event3)
583 ep.Flush()
584 assertEventsReceived(t, es,
585 indexEventForContextKey(context.context.Key()),
586 customEventWithEventKey("event3"),
587 )
588 es.assertNoMoreEvents(t)
589 }
590
591 func TestNothingIsSentIfThereAreNoEvents(t *testing.T) {
592 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
593 defer ep.Close()
594
595 ep.FlushBlocking(time.Second)
596
597 es.assertNoMoreEvents(t)
598 }
599
600 func TestEventProcessorStopsSendingEventsAfterUnrecoverableError(t *testing.T) {
601 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
602 defer ep.Close()
603
604 es.result = EventSenderResult{MustShutDown: true}
605
606 ie := defaultEventFactory.NewIdentifyEventData(basicContext())
607 ep.RecordIdentifyEvent(ie)
608 ep.Flush()
609 es.awaitEvent(t)
610
611 ep.RecordIdentifyEvent(ie)
612 ep.FlushBlocking(time.Second)
613
614 es.assertNoMoreEvents(t)
615 }
616
617 func TestDiagnosticInitEventIsSent(t *testing.T) {
618 id := NewDiagnosticID("sdkkey")
619 startTime := time.Now()
620 diagnosticsManager := NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), startTime, nil)
621 config := basicConfigWithoutPrivateAttrs()
622 config.DiagnosticsManager = diagnosticsManager
623
624 ep, es := createEventProcessorAndSender(config)
625 defer ep.Close()
626
627 event := es.awaitDiagnosticEvent(t)
628 m.In(t).Assert(event, m.AllOf(
629 eventKindIs("diagnostic-init"),
630 m.JSONProperty("creationDate").Should(equalNumericTime(ldtime.UnixMillisFromTime(startTime))),
631 ))
632 es.assertNoMoreDiagnosticEvents(t)
633 }
634
635 func TestDiagnosticPeriodicEventsAreSent(t *testing.T) {
636 id := NewDiagnosticID("sdkkey")
637 startTime := time.Now()
638 diagnosticsManager := NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), startTime, nil)
639 config := basicConfigWithoutPrivateAttrs()
640 config.DiagnosticsManager = diagnosticsManager
641 config.forceDiagnosticRecordingInterval = 100 * time.Millisecond
642
643 ep, es := createEventProcessorAndSender(config)
644 defer ep.Close()
645
646
647 initEvent := es.awaitDiagnosticEvent(t)
648 m.In(t).Assert(initEvent, eventKindIs("diagnostic-init"))
649 time0 := requireCreationDate(t, initEvent)
650
651 event1 := es.awaitDiagnosticEvent(t)
652 m.In(t).Assert(event1, eventKindIs("diagnostic"))
653 time1 := requireCreationDate(t, event1)
654 assert.True(t, time1-time0 >= 70, "event times should follow configured interval: %d, %d", time0, time1)
655
656 event2 := es.awaitDiagnosticEvent(t)
657 m.In(t).Assert(event2, eventKindIs("diagnostic"))
658 time2 := requireCreationDate(t, event2)
659 assert.True(t, time2-time1 >= 70, "event times should follow configured interval: %d, %d", time1, time2)
660 }
661
662 func TestDiagnosticPeriodicEventHasEventCounters(t *testing.T) {
663 id := NewDiagnosticID("sdkkey")
664 config := basicConfigWithoutPrivateAttrs()
665 config.Capacity = 3
666 config.forceDiagnosticRecordingInterval = 100 * time.Millisecond
667 periodicEventGate := make(chan struct{})
668
669 diagnosticsManager := NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), time.Now(), periodicEventGate)
670 config.DiagnosticsManager = diagnosticsManager
671
672 ep, es := createEventProcessorAndSender(config)
673 defer ep.Close()
674
675 initEvent := es.awaitDiagnosticEvent(t)
676 m.In(t).Assert(initEvent, eventKindIs("diagnostic-init"))
677
678 context := Context(lduser.NewUser("userkey"))
679 ep.RecordCustomEvent(defaultEventFactory.NewCustomEventData("key", context, ldvalue.Null(), false, 0))
680 ep.RecordCustomEvent(defaultEventFactory.NewCustomEventData("key", context, ldvalue.Null(), false, 0))
681 ep.RecordCustomEvent(defaultEventFactory.NewCustomEventData("key", context, ldvalue.Null(), false, 0))
682 ep.Flush()
683
684 periodicEventGate <- struct{}{}
685
686 event1 := es.awaitDiagnosticEvent(t)
687 m.In(t).Assert(event1, m.AllOf(
688 eventKindIs("diagnostic"),
689 m.JSONProperty("eventsInLastBatch").Should(m.Equal(3)),
690 m.JSONProperty("droppedEvents").Should(m.Equal(1)),
691 m.JSONProperty("deduplicatedUsers").Should(m.Equal(2)),
692 ))
693
694 periodicEventGate <- struct{}{}
695
696 event2 := es.awaitDiagnosticEvent(t)
697 m.In(t).Assert(event2, m.AllOf(
698 eventKindIs("diagnostic"),
699 m.JSONProperty("eventsInLastBatch").Should(m.Equal(0)),
700 m.JSONProperty("droppedEvents").Should(m.Equal(0)),
701 m.JSONProperty("deduplicatedUsers").Should(m.Equal(0)),
702 ))
703 }
704
705 func TestEventsAreKeptInBufferIfAllFlushWorkersAreBusy(t *testing.T) {
706
707
708
709
710
711
712 user1 := Context(lduser.NewUser("user1"))
713 user2 := Context(lduser.NewUser("user2"))
714 user3 := Context(lduser.NewUser("user3"))
715
716 ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
717 defer ep.Close()
718
719 senderGateCh := make(chan struct{}, maxFlushWorkers)
720 senderWaitingCh := make(chan struct{}, maxFlushWorkers)
721 es.setGate(senderGateCh, senderWaitingCh)
722
723 arbitraryContext := Context(ldcontext.New("other"))
724 for i := 0; i < maxFlushWorkers; i++ {
725 ep.RecordIdentifyEvent(defaultEventFactory.NewIdentifyEventData(arbitraryContext))
726 ep.Flush()
727 _ = es.awaitEvent(t)
728 }
729
730
731
732 for i := 0; i < maxFlushWorkers; i++ {
733 <-senderWaitingCh
734 }
735 es.assertNoMoreEvents(t)
736 assert.Equal(t, maxFlushWorkers, es.getPayloadCount())
737
738
739
740
741 extraEvent1 := defaultEventFactory.NewIdentifyEventData(user1)
742 ep.RecordIdentifyEvent(extraEvent1)
743 ep.Flush()
744
745
746
747
748 extraEvent2 := defaultEventFactory.NewIdentifyEventData(user2)
749 ep.RecordIdentifyEvent(extraEvent2)
750 ep.Flush()
751 <-time.After(100 * time.Millisecond)
752 es.assertNoMoreEvents(t)
753
754
755 extraEvent3 := defaultEventFactory.NewIdentifyEventData(user3)
756 ep.RecordIdentifyEvent(extraEvent3)
757
758
759 for i := 0; i < maxFlushWorkers; i++ {
760 senderGateCh <- struct{}{}
761 }
762
763
764 senderGateCh <- struct{}{}
765 assertEventsReceived(t, es, identifyEventForContextKey(user1.context.Key()))
766
767
768 senderGateCh <- struct{}{}
769 ep.Flush()
770 assertEventsReceived(t, es,
771 identifyEventForContextKey(user2.context.Key()),
772 identifyEventForContextKey(user3.context.Key()),
773 )
774 assert.Equal(t, maxFlushWorkers+2, es.getPayloadCount())
775 }
776
777
778 func (ep *defaultEventProcessor) waitUntilInactive() {
779 m := syncEventsMessage{replyCh: make(chan struct{})}
780 ep.inboxCh <- m
781 <-m.replyCh
782 }
783
784 func createEventProcessorAndSender(config EventsConfiguration) (*defaultEventProcessor, *mockEventSender) {
785 sender := newMockEventSender()
786 config.EventSender = sender
787 ep := NewDefaultEventProcessor(config)
788 return ep.(*defaultEventProcessor), sender
789 }
790
791 func assertEventsReceived(t *testing.T, es *mockEventSender, matchers ...m.Matcher) {
792 t.Helper()
793 received := make([]json.RawMessage, 0, len(matchers))
794 for range matchers {
795 if event, ok := es.tryAwaitEvent(); ok {
796 received = append(received, event)
797 } else {
798 require.Fail(t, "timed out waiting for analytics event(s)", "wanted %d event(s); got: %s",
799 len(matchers), jsonhelpers.ToJSONString(received))
800 }
801 }
802
803 m.In(t).Assert(received, m.ItemsInAnyOrder(matchers...))
804 }
805
806 func requireCreationDate(t *testing.T, eventData json.RawMessage) ldtime.UnixMillisecondTime {
807 m.In(t).Require(eventData, m.JSONProperty("creationDate").Should(valueIsPositiveNonZeroInteger()))
808 return ldtime.UnixMillisecondTime(ldvalue.Parse(eventData).GetByKey("creationDate").Float64Value())
809 }
810
View as plain text