...

Source file src/github.com/launchdarkly/go-sdk-events/v2/event_processor_test.go

Documentation: github.com/launchdarkly/go-sdk-events/v2

     1  package ldevents
     2  
     3  import (
     4  	"encoding/json"
     5  	"testing"
     6  	"time"
     7  
     8  	"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
     9  	"github.com/launchdarkly/go-sdk-common/v3/ldreason"
    10  	"github.com/launchdarkly/go-sdk-common/v3/ldtime"
    11  	"github.com/launchdarkly/go-sdk-common/v3/lduser"
    12  	"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
    13  
    14  	"github.com/launchdarkly/go-test-helpers/v3/jsonhelpers"
    15  	m "github.com/launchdarkly/go-test-helpers/v3/matchers"
    16  
    17  	"github.com/stretchr/testify/assert"
    18  	"github.com/stretchr/testify/require"
    19  )
    20  
    21  // Note about the structure of these tests:
    22  //
    23  // 1. It's desirable to keep each test as specific as possible, so that we're not making assertions
    24  // about many details that are extraneous to the main subject of that test, as long as those details
    25  // are more specifically covered by another test. So, for instance, tests that are about feature events
    26  // or custom events are expected to also generate an index event as a side effect, but we should just
    27  // assert that there is one, rather than checking every property of the index event - since we have
    28  // TestIndexEventProperties for that purpose. That way, if there is a bug causing an index event
    29  // property to be wrong, it will show up clearly in that test, rather than causing many failures
    30  // all over the place.
    31  //
    32  // 2. For any tests where the full context JSON will appear in an event, we should use the
    33  // withAndWithoutPrivateAttrs helper to run the test twice, first with a default configuration and
    34  // then with an "all attributes private" configuration. This just verifies that it really is using
    35  // the eventOutputFormatter and eventContextFormatter with the designated configuration when it
    36  // serializes a context. More specific details of private attribute behavior are covered in the tests for
    37  // eventOutputFormatter and eventContextFormatter.
    38  //
    39  // 3. It's preferable to use the matchers and combinators from the matchers package rather than
    40  // the assert and require packages whenever there is (a) an assertion involving JSON values or (b)
    41  // a set of related assertions like "property X equals ___, property Y equals ___" because they
    42  // provide better failure output.
    43  
    44  func withAndWithoutPrivateAttrs(t *testing.T, action func(*testing.T, EventsConfiguration)) {
    45  	t.Run("without private attributes", func(t *testing.T) {
    46  		action(t, basicConfigWithoutPrivateAttrs())
    47  	})
    48  
    49  	t.Run("with private attributes", func(t *testing.T) {
    50  		config := basicConfigWithoutPrivateAttrs()
    51  		config.AllAttributesPrivate = true
    52  		action(t, config)
    53  	})
    54  }
    55  
    56  func withFeatureEventOrCustomEvent(
    57  	t *testing.T,
    58  	action func(
    59  		t *testing.T,
    60  		sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher),
    61  		finalEventMatchers []m.Matcher),
    62  ) {
    63  	t.Run("from feature event", func(t *testing.T) {
    64  		flag := FlagEventProperties{Key: "flagkey", Version: 11}
    65  		action(t,
    66  			func(ep EventProcessor, context EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher) {
    67  				fe := defaultEventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
    68  				ep.RecordEvaluation(fe)
    69  				return fe, fe.CreationDate, nil
    70  			},
    71  			[]m.Matcher{anySummaryEvent()})
    72  	})
    73  
    74  	t.Run("from custom event", func(t *testing.T) {
    75  		action(t,
    76  			func(ep EventProcessor, context EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher) {
    77  				ce := defaultEventFactory.NewCustomEventData("eventkey", context, ldvalue.Null(), false, 0)
    78  				ep.RecordCustomEvent(ce)
    79  				return ce, ce.CreationDate, []m.Matcher{anyCustomEvent()}
    80  			},
    81  			nil)
    82  	})
    83  }
    84  
    85  func TestIdentifyEventProperties(t *testing.T) {
    86  	withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
    87  		ep, es := createEventProcessorAndSender(config)
    88  		defer ep.Close()
    89  
    90  		context := basicContext()
    91  		ie := defaultEventFactory.NewIdentifyEventData(context)
    92  		ep.RecordIdentifyEvent(ie)
    93  		ep.Flush()
    94  
    95  		assertEventsReceived(t, es, m.JSONEqual(map[string]interface{}{
    96  			"kind":         "identify",
    97  			"creationDate": ie.CreationDate,
    98  			"context":      contextJSON(context, config),
    99  		}))
   100  		es.assertNoMoreEvents(t)
   101  	})
   102  }
   103  
   104  func TestFeatureEventIsSummarizedAndNotTrackedByDefault(t *testing.T) {
   105  	withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
   106  		ep, es := createEventProcessorAndSender(config)
   107  		defer ep.Close()
   108  
   109  		flag := FlagEventProperties{Key: "flagkey", Version: 11}
   110  		fe := defaultEventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   111  		ep.RecordEvaluation(fe)
   112  		ep.Flush()
   113  
   114  		assertEventsReceived(t, es,
   115  			anyIndexEvent(),
   116  			summaryEventWithFlag(flag, summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
   117  		)
   118  		es.assertNoMoreEvents(t)
   119  	})
   120  }
   121  
   122  func TestIndividualFeatureEventIsQueuedWhenTrackEventsIsTrue(t *testing.T) {
   123  	withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
   124  		ep, es := createEventProcessorAndSender(config)
   125  		defer ep.Close()
   126  
   127  		flag := FlagEventProperties{Key: "flagkey", Version: 11, RequireFullEvent: true}
   128  		fe := defaultEventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   129  		ep.RecordEvaluation(fe)
   130  		ep.Flush()
   131  
   132  		assertEventsReceived(t, es,
   133  			anyIndexEvent(),
   134  			featureEventWithAllProperties(fe, flag),
   135  			// Here we also check that the summary count is still the same regardless of TrackEvents
   136  			summaryEventWithFlag(flag,
   137  				summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
   138  		)
   139  		es.assertNoMoreEvents(t)
   140  	})
   141  }
   142  
   143  func TestIndexEventProperties(t *testing.T) {
   144  	withFeatureEventOrCustomEvent(t,
   145  		func(t *testing.T, sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher), finalEventMatchers []m.Matcher) {
   146  			withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
   147  				ep, es := createEventProcessorAndSender(config)
   148  				defer ep.Close()
   149  
   150  				context := basicContext()
   151  
   152  				_, creationDate, allEventMatchers := sendEventFn(ep, context)
   153  				ep.Flush()
   154  
   155  				allEventMatchers = append(allEventMatchers,
   156  					m.JSONEqual(map[string]interface{}{
   157  						"kind":         "index",
   158  						"creationDate": creationDate,
   159  						"context":      contextJSON(context, config),
   160  					}))
   161  				allEventMatchers = append(allEventMatchers, finalEventMatchers...)
   162  				assertEventsReceived(t, es, allEventMatchers...)
   163  				es.assertNoMoreEvents(t)
   164  			})
   165  		})
   166  }
   167  
   168  func TestIndexEventContextKeysAreDeduplicatedForSameKind(t *testing.T) {
   169  	withFeatureEventOrCustomEvent(t,
   170  		func(t *testing.T, sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher), finalEventMatchers []m.Matcher) {
   171  			withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
   172  				ep, es := createEventProcessorAndSender(config)
   173  				defer ep.Close()
   174  
   175  				context := Context(ldcontext.New("my-key"))
   176  
   177  				_, creationDate, allEventMatchers := sendEventFn(ep, context)
   178  				_, _, moreMatchers := sendEventFn(ep, context)
   179  				allEventMatchers = append(allEventMatchers, moreMatchers...)
   180  				ep.Flush()
   181  
   182  				allEventMatchers = append(allEventMatchers,
   183  					m.JSONEqual(map[string]interface{}{
   184  						"kind":         "index",
   185  						"creationDate": creationDate,
   186  						"context":      contextJSON(context, config),
   187  					}))
   188  				allEventMatchers = append(allEventMatchers, finalEventMatchers...)
   189  				assertEventsReceived(t, es, allEventMatchers...)
   190  				es.assertNoMoreEvents(t)
   191  			})
   192  		})
   193  }
   194  
   195  func TestIndexEventContextKeysAreDeduplicatedSeparatelyForDifferentKinds(t *testing.T) {
   196  	withFeatureEventOrCustomEvent(t,
   197  		func(t *testing.T, sendEventFn func(EventProcessor, EventInputContext) (anyEventInput, ldtime.UnixMillisecondTime, []m.Matcher), finalEventMatchers []m.Matcher) {
   198  			withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
   199  				ep, es := createEventProcessorAndSender(config)
   200  				defer ep.Close()
   201  
   202  				key := "my-key"
   203  				context1 := Context(ldcontext.New(key))
   204  				context2 := Context(ldcontext.NewWithKind("org", key))
   205  				context3 := Context(ldcontext.NewMulti(ldcontext.New(key), ldcontext.NewWithKind("other", key)))
   206  
   207  				_, creationDate1, allEventMatchers := sendEventFn(ep, context1)
   208  				_, creationDate2, moreMatchers := sendEventFn(ep, context2)
   209  				allEventMatchers = append(allEventMatchers, moreMatchers...)
   210  				_, creationDate3, moreMatchers := sendEventFn(ep, context3)
   211  				allEventMatchers = append(allEventMatchers, moreMatchers...)
   212  				ep.Flush()
   213  
   214  				allEventMatchers = append(allEventMatchers,
   215  					m.JSONEqual(map[string]interface{}{
   216  						"kind":         "index",
   217  						"creationDate": creationDate1,
   218  						"context":      contextJSON(context1, config),
   219  					}),
   220  					m.JSONEqual(map[string]interface{}{
   221  						"kind":         "index",
   222  						"creationDate": creationDate2,
   223  						"context":      contextJSON(context2, config),
   224  					}),
   225  					m.JSONEqual(map[string]interface{}{
   226  						"kind":         "index",
   227  						"creationDate": creationDate3,
   228  						"context":      contextJSON(context3, config),
   229  					}))
   230  				allEventMatchers = append(allEventMatchers, finalEventMatchers...)
   231  				assertEventsReceived(t, es, allEventMatchers...)
   232  				es.assertNoMoreEvents(t)
   233  			})
   234  		})
   235  }
   236  
   237  func TestDebugEventProperties(t *testing.T) {
   238  	withAndWithoutPrivateAttrs(t, func(t *testing.T, config EventsConfiguration) {
   239  		ep, es := createEventProcessorAndSender(config)
   240  		defer ep.Close()
   241  
   242  		context := basicContext()
   243  		flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: ldtime.UnixMillisNow() + 1000000}
   244  		fe := defaultEventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   245  		ep.RecordEvaluation(fe)
   246  		ep.Flush()
   247  
   248  		assertEventsReceived(t, es,
   249  			anyIndexEvent(),
   250  			debugEventWithAllProperties(fe, flag, contextJSON(context, config)),
   251  			anySummaryEvent(),
   252  		)
   253  		es.assertNoMoreEvents(t)
   254  	})
   255  }
   256  
   257  func TestFeatureEventCanContainReason(t *testing.T) {
   258  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   259  	defer ep.Close()
   260  
   261  	flag := FlagEventProperties{Key: "flagkey", Version: 11, RequireFullEvent: true}
   262  	fe := defaultEventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   263  	fe.Reason = ldreason.NewEvalReasonFallthrough()
   264  	ep.RecordEvaluation(fe)
   265  	ep.Flush()
   266  
   267  	assertEventsReceived(t, es,
   268  		anyIndexEvent(),
   269  		featureEventWithAllProperties(fe, flag),
   270  		anySummaryEvent(),
   271  	)
   272  	es.assertNoMoreEvents(t)
   273  }
   274  
   275  func TestDebugEventIsAddedIfFlagIsTemporarilyInDebugMode(t *testing.T) {
   276  	fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
   277  	config := basicConfigWithoutPrivateAttrs()
   278  	config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
   279  	eventFactory := NewEventFactory(false, config.currentTimeProvider)
   280  
   281  	ep, es := createEventProcessorAndSender(config)
   282  	defer ep.Close()
   283  
   284  	context := basicContext()
   285  	futureTime := fakeTimeNow + 100
   286  	flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: futureTime}
   287  	fe := eventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   288  	ep.RecordEvaluation(fe)
   289  	ep.Flush()
   290  
   291  	assertEventsReceived(t, es,
   292  		anyIndexEvent(),
   293  		debugEventWithAllProperties(fe, flag, contextJSON(context, config)),
   294  		summaryEventWithFlag(flag, summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
   295  	)
   296  	es.assertNoMoreEvents(t)
   297  }
   298  
   299  func TestEventCanBeBothTrackedAndDebugged(t *testing.T) {
   300  	fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
   301  	config := basicConfigWithoutPrivateAttrs()
   302  	config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
   303  	eventFactory := NewEventFactory(false, config.currentTimeProvider)
   304  
   305  	ep, es := createEventProcessorAndSender(config)
   306  	defer ep.Close()
   307  
   308  	context := basicContext()
   309  	futureTime := fakeTimeNow + 100
   310  	flag := FlagEventProperties{Key: "flagkey", Version: 11, RequireFullEvent: true, DebugEventsUntilDate: futureTime}
   311  	fe := eventFactory.NewEvaluationData(flag, context, testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   312  	ep.RecordEvaluation(fe)
   313  	ep.Flush()
   314  
   315  	assertEventsReceived(t, es,
   316  		anyIndexEvent(),
   317  		featureEventWithAllProperties(fe, flag),
   318  		debugEventWithAllProperties(fe, flag, contextJSON(context, config)),
   319  		summaryEventWithFlag(flag, summaryCounterPropsFromEval(testEvalDetailWithoutReason, 1)),
   320  	)
   321  	es.assertNoMoreEvents(t)
   322  }
   323  
   324  func TestDebugModeExpiresBasedOnClientTimeIfClientTimeIsLater(t *testing.T) {
   325  	fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
   326  	config := basicConfigWithoutPrivateAttrs()
   327  	config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
   328  	eventFactory := NewEventFactory(false, config.currentTimeProvider)
   329  
   330  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   331  	defer ep.Close()
   332  
   333  	// Pick a server time that is somewhat behind the client time
   334  	serverTime := fakeTimeNow - 20000
   335  	es.result = EventSenderResult{Success: true, TimeFromServer: serverTime}
   336  
   337  	// Send and flush an event we don't care about, just to set the last server time
   338  	ie := eventFactory.NewIdentifyEventData(basicContext())
   339  	ep.RecordIdentifyEvent(ie)
   340  	ep.Flush()
   341  	assertEventsReceived(t, es, anyIdentifyEvent())
   342  
   343  	// Now send an event with debug mode on, with a "debug until" time that is further in
   344  	// the future than the server time, but in the past compared to the client.
   345  	debugUntil := serverTime + 1000
   346  	flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: debugUntil}
   347  	fe := eventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   348  	ep.RecordEvaluation(fe)
   349  	ep.Flush()
   350  
   351  	// should get a summary event only, not a debug event
   352  	assertEventsReceived(t, es, anySummaryEvent())
   353  	es.assertNoMoreEvents(t)
   354  }
   355  
   356  func TestDebugModeExpiresBasedOnServerTimeIfServerTimeIsLater(t *testing.T) {
   357  	fakeTimeNow := ldtime.UnixMillisecondTime(1000000)
   358  	config := basicConfigWithoutPrivateAttrs()
   359  	config.currentTimeProvider = func() ldtime.UnixMillisecondTime { return fakeTimeNow }
   360  	eventFactory := NewEventFactory(false, config.currentTimeProvider)
   361  
   362  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   363  	defer ep.Close()
   364  
   365  	// Pick a server time that is somewhat ahead of the client time
   366  	serverTime := fakeTimeNow + 20000
   367  	es.result = EventSenderResult{Success: true, TimeFromServer: serverTime}
   368  
   369  	// Send and flush an event we don't care about, just to set the last server time
   370  	ie := eventFactory.NewIdentifyEventData(basicContext())
   371  	ep.RecordIdentifyEvent(ie)
   372  	ep.Flush()
   373  	assertEventsReceived(t, es, anyIdentifyEvent())
   374  
   375  	// Now send an event with debug mode on, with a "debug until" time that is further in
   376  	// the future than the client time, but in the past compared to the server.
   377  	debugUntil := serverTime - 1000
   378  	flag := FlagEventProperties{Key: "flagkey", Version: 11, DebugEventsUntilDate: debugUntil}
   379  	fe := eventFactory.NewEvaluationData(flag, basicContext(), testEvalDetailWithoutReason, false, ldvalue.Null(), "")
   380  	ep.RecordEvaluation(fe)
   381  	ep.Flush()
   382  
   383  	// should get a summary event only, not a debug event
   384  	assertEventsReceived(t, es, anySummaryEvent())
   385  	es.assertNoMoreEvents(t)
   386  }
   387  
   388  func TestNonTrackedEventsAreSummarized(t *testing.T) {
   389  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   390  	defer ep.Close()
   391  
   392  	context := basicContext()
   393  	flag1 := FlagEventProperties{Key: "flagkey1", Version: 11}
   394  	flag2 := FlagEventProperties{Key: "flagkey2", Version: 22}
   395  	flag1Eval := ldreason.NewEvaluationDetail(ldvalue.String("value1"), 2, noReason)
   396  	flag2Eval := ldreason.NewEvaluationDetail(ldvalue.String("value2"), 3, noReason)
   397  	fe1 := defaultEventFactory.NewEvaluationData(flag1, context, flag1Eval, false, ldvalue.Null(), "")
   398  	fe2 := defaultEventFactory.NewEvaluationData(flag2, context, flag2Eval, false, ldvalue.Null(), "")
   399  	fe3 := defaultEventFactory.NewEvaluationData(flag2, context, flag2Eval, false, ldvalue.Null(), "")
   400  	ep.RecordEvaluation(fe1)
   401  	ep.RecordEvaluation(fe2)
   402  	ep.RecordEvaluation(fe3)
   403  	ep.Flush()
   404  
   405  	assertEventsReceived(t, es, anyIndexEvent())
   406  
   407  	assertEventsReceived(t, es, m.AllOf(
   408  		m.JSONProperty("startDate").Should(equalNumericTime(fe1.CreationDate)),
   409  		m.JSONProperty("endDate").Should(equalNumericTime(fe3.CreationDate)),
   410  		summaryEventWithFlag(flag1, summaryCounterPropsFromEval(flag1Eval, 1)),
   411  		summaryEventWithFlag(flag2, summaryCounterPropsFromEval(flag2Eval, 2)),
   412  	))
   413  
   414  	es.assertNoMoreEvents(t)
   415  }
   416  
   417  func TestCustomEventProperties(t *testing.T) {
   418  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   419  	defer ep.Close()
   420  
   421  	context := basicContext()
   422  	data := ldvalue.ObjectBuild().SetString("thing", "stuff").Build()
   423  	ce := defaultEventFactory.NewCustomEventData("eventkey", context, data, false, 0)
   424  	ep.RecordCustomEvent(ce)
   425  	ep.Flush()
   426  
   427  	customEventMatcher := m.JSONEqual(map[string]interface{}{
   428  		"kind":         "custom",
   429  		"creationDate": ce.CreationDate,
   430  		"key":          ce.Key,
   431  		"data":         data,
   432  		"contextKeys":  expectedContextKeys(context.context),
   433  	})
   434  	assertEventsReceived(t, es,
   435  		anyIndexEvent(),
   436  		customEventMatcher,
   437  	)
   438  	es.assertNoMoreEvents(t)
   439  }
   440  
   441  func TestCustomEventCanHaveMetricValue(t *testing.T) {
   442  	config := basicConfigWithoutPrivateAttrs()
   443  	ep, es := createEventProcessorAndSender(config)
   444  	defer ep.Close()
   445  
   446  	context := basicContext()
   447  	data := ldvalue.ObjectBuild().SetString("thing", "stuff").Build()
   448  	metric := float64(2.5)
   449  	ce := defaultEventFactory.NewCustomEventData("eventkey", context, data, true, metric)
   450  	ep.RecordCustomEvent(ce)
   451  	ep.Flush()
   452  
   453  	customEventMatcher := m.JSONEqual(map[string]interface{}{
   454  		"kind":         "custom",
   455  		"creationDate": ce.CreationDate,
   456  		"key":          ce.Key,
   457  		"data":         data,
   458  		"metricValue":  metric,
   459  		"contextKeys":  expectedContextKeys(context.context),
   460  	})
   461  	assertEventsReceived(t, es,
   462  		anyIndexEvent(),
   463  		customEventMatcher,
   464  	)
   465  	es.assertNoMoreEvents(t)
   466  }
   467  
   468  func TestRawEventIsQueued(t *testing.T) {
   469  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   470  	defer ep.Close()
   471  
   472  	rawData := json.RawMessage(`{"kind":"alias","arbitrary":["we","don't","care","what's","in","here"]}`)
   473  	ep.RecordRawEvent(rawData)
   474  	ep.Flush()
   475  	ep.waitUntilInactive()
   476  
   477  	assertEventsReceived(t, es, m.JSONEqual(rawData))
   478  	es.assertNoMoreEvents(t)
   479  }
   480  
   481  func TestPeriodicFlush(t *testing.T) {
   482  	config := basicConfigWithoutPrivateAttrs()
   483  	config.FlushInterval = 10 * time.Millisecond
   484  	ep, es := createEventProcessorAndSender(config)
   485  	defer ep.Close()
   486  
   487  	context := basicContext()
   488  	ie := defaultEventFactory.NewIdentifyEventData(context)
   489  	ep.RecordIdentifyEvent(ie)
   490  
   491  	assertEventsReceived(t, es, identifyEventForContextKey(context.context.Key()))
   492  	es.assertNoMoreEvents(t)
   493  }
   494  
   495  func TestBlockingFlush(t *testing.T) {
   496  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   497  	defer ep.Close()
   498  
   499  	senderGateCh := make(chan struct{}, 1)
   500  	senderWaitingCh := make(chan struct{}, 1)
   501  	es.setGate(senderGateCh, senderWaitingCh)
   502  
   503  	didFlush := make(chan struct{}, 1)
   504  	go func() {
   505  		<-senderWaitingCh
   506  		time.Sleep(time.Millisecond * 100)
   507  		didFlush <- struct{}{}
   508  		senderGateCh <- struct{}{}
   509  	}()
   510  
   511  	ep.RecordIdentifyEvent(defaultEventFactory.NewIdentifyEventData(basicContext()))
   512  	success := ep.FlushBlocking(time.Second)
   513  
   514  	assert.True(t, success)
   515  	assert.NotEqual(t, 0, len(didFlush))
   516  }
   517  
   518  func TestBlockingFlushTimeout(t *testing.T) {
   519  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   520  	defer ep.Close()
   521  
   522  	senderGateCh := make(chan struct{}, 1)
   523  	senderWaitingCh := make(chan struct{}, 1)
   524  	es.setGate(senderGateCh, senderWaitingCh)
   525  
   526  	didFlush := make(chan struct{}, 1)
   527  	go func() {
   528  		<-senderWaitingCh
   529  		time.Sleep(time.Millisecond * 500)
   530  		didFlush <- struct{}{}
   531  		senderGateCh <- struct{}{}
   532  	}()
   533  
   534  	ep.RecordIdentifyEvent(defaultEventFactory.NewIdentifyEventData(basicContext()))
   535  	success := ep.FlushBlocking(time.Millisecond * 50)
   536  
   537  	assert.False(t, success)
   538  	assert.Equal(t, 0, len(didFlush))
   539  }
   540  
   541  func TestClosingEventProcessorForcesSynchronousFlush(t *testing.T) {
   542  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   543  	defer ep.Close()
   544  
   545  	context := basicContext()
   546  	ie := defaultEventFactory.NewIdentifyEventData(context)
   547  	ep.RecordIdentifyEvent(ie)
   548  	ep.Close()
   549  
   550  	assertEventsReceived(t, es, identifyEventForContextKey(context.context.Key()))
   551  	es.assertNoMoreEvents(t)
   552  }
   553  
   554  func TestPeriodicUserKeysFlush(t *testing.T) {
   555  	// This test overrides the context key flush interval to a small value and verifies that a new
   556  	// index event is generated for a context after the context keys have been flushed.
   557  	config := basicConfigWithoutPrivateAttrs()
   558  	config.UserKeysFlushInterval = time.Millisecond * 100
   559  	ep, es := createEventProcessorAndSender(config)
   560  	defer ep.Close()
   561  
   562  	context := basicContext()
   563  	event1 := defaultEventFactory.NewCustomEventData("event1", context, ldvalue.Null(), false, 0)
   564  	event2 := defaultEventFactory.NewCustomEventData("event2", context, ldvalue.Null(), false, 0)
   565  	ep.RecordCustomEvent(event1)
   566  	ep.RecordCustomEvent(event2)
   567  	ep.Flush()
   568  
   569  	// We're relying on the context key flush not happening in between event1 and event2, so we should get
   570  	// a single index event for the context.
   571  	assertEventsReceived(t, es,
   572  		indexEventForContextKey(context.context.Key()),
   573  		customEventWithEventKey("event1"),
   574  		customEventWithEventKey("event2"),
   575  	)
   576  
   577  	// Now wait long enough for the context key cache to be flushed
   578  	<-time.After(200 * time.Millisecond)
   579  
   580  	// Referencing the same context in a new event should produce a new index event
   581  	event3 := defaultEventFactory.NewCustomEventData("event3", context, ldvalue.Null(), false, 0)
   582  	ep.RecordCustomEvent(event3)
   583  	ep.Flush()
   584  	assertEventsReceived(t, es,
   585  		indexEventForContextKey(context.context.Key()),
   586  		customEventWithEventKey("event3"),
   587  	)
   588  	es.assertNoMoreEvents(t)
   589  }
   590  
   591  func TestNothingIsSentIfThereAreNoEvents(t *testing.T) {
   592  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   593  	defer ep.Close()
   594  
   595  	ep.FlushBlocking(time.Second)
   596  
   597  	es.assertNoMoreEvents(t)
   598  }
   599  
   600  func TestEventProcessorStopsSendingEventsAfterUnrecoverableError(t *testing.T) {
   601  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   602  	defer ep.Close()
   603  
   604  	es.result = EventSenderResult{MustShutDown: true}
   605  
   606  	ie := defaultEventFactory.NewIdentifyEventData(basicContext())
   607  	ep.RecordIdentifyEvent(ie)
   608  	ep.Flush()
   609  	es.awaitEvent(t)
   610  
   611  	ep.RecordIdentifyEvent(ie)
   612  	ep.FlushBlocking(time.Second)
   613  
   614  	es.assertNoMoreEvents(t)
   615  }
   616  
   617  func TestDiagnosticInitEventIsSent(t *testing.T) {
   618  	id := NewDiagnosticID("sdkkey")
   619  	startTime := time.Now()
   620  	diagnosticsManager := NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), startTime, nil)
   621  	config := basicConfigWithoutPrivateAttrs()
   622  	config.DiagnosticsManager = diagnosticsManager
   623  
   624  	ep, es := createEventProcessorAndSender(config)
   625  	defer ep.Close()
   626  
   627  	event := es.awaitDiagnosticEvent(t)
   628  	m.In(t).Assert(event, m.AllOf(
   629  		eventKindIs("diagnostic-init"),
   630  		m.JSONProperty("creationDate").Should(equalNumericTime(ldtime.UnixMillisFromTime(startTime))),
   631  	))
   632  	es.assertNoMoreDiagnosticEvents(t)
   633  }
   634  
   635  func TestDiagnosticPeriodicEventsAreSent(t *testing.T) {
   636  	id := NewDiagnosticID("sdkkey")
   637  	startTime := time.Now()
   638  	diagnosticsManager := NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), startTime, nil)
   639  	config := basicConfigWithoutPrivateAttrs()
   640  	config.DiagnosticsManager = diagnosticsManager
   641  	config.forceDiagnosticRecordingInterval = 100 * time.Millisecond
   642  
   643  	ep, es := createEventProcessorAndSender(config)
   644  	defer ep.Close()
   645  
   646  	// We use a channel for this because we can't predict exactly when the events will be sent
   647  	initEvent := es.awaitDiagnosticEvent(t)
   648  	m.In(t).Assert(initEvent, eventKindIs("diagnostic-init"))
   649  	time0 := requireCreationDate(t, initEvent)
   650  
   651  	event1 := es.awaitDiagnosticEvent(t)
   652  	m.In(t).Assert(event1, eventKindIs("diagnostic"))
   653  	time1 := requireCreationDate(t, event1)
   654  	assert.True(t, time1-time0 >= 70, "event times should follow configured interval: %d, %d", time0, time1)
   655  
   656  	event2 := es.awaitDiagnosticEvent(t)
   657  	m.In(t).Assert(event2, eventKindIs("diagnostic"))
   658  	time2 := requireCreationDate(t, event2)
   659  	assert.True(t, time2-time1 >= 70, "event times should follow configured interval: %d, %d", time1, time2)
   660  }
   661  
   662  func TestDiagnosticPeriodicEventHasEventCounters(t *testing.T) {
   663  	id := NewDiagnosticID("sdkkey")
   664  	config := basicConfigWithoutPrivateAttrs()
   665  	config.Capacity = 3
   666  	config.forceDiagnosticRecordingInterval = 100 * time.Millisecond
   667  	periodicEventGate := make(chan struct{})
   668  
   669  	diagnosticsManager := NewDiagnosticsManager(id, ldvalue.Null(), ldvalue.Null(), time.Now(), periodicEventGate)
   670  	config.DiagnosticsManager = diagnosticsManager
   671  
   672  	ep, es := createEventProcessorAndSender(config)
   673  	defer ep.Close()
   674  
   675  	initEvent := es.awaitDiagnosticEvent(t)
   676  	m.In(t).Assert(initEvent, eventKindIs("diagnostic-init"))
   677  
   678  	context := Context(lduser.NewUser("userkey"))
   679  	ep.RecordCustomEvent(defaultEventFactory.NewCustomEventData("key", context, ldvalue.Null(), false, 0))
   680  	ep.RecordCustomEvent(defaultEventFactory.NewCustomEventData("key", context, ldvalue.Null(), false, 0))
   681  	ep.RecordCustomEvent(defaultEventFactory.NewCustomEventData("key", context, ldvalue.Null(), false, 0))
   682  	ep.Flush()
   683  
   684  	periodicEventGate <- struct{}{} // periodic event won't be sent until we do this
   685  
   686  	event1 := es.awaitDiagnosticEvent(t)
   687  	m.In(t).Assert(event1, m.AllOf(
   688  		eventKindIs("diagnostic"),
   689  		m.JSONProperty("eventsInLastBatch").Should(m.Equal(3)), // 1 index, 2 custom
   690  		m.JSONProperty("droppedEvents").Should(m.Equal(1)),     // 3rd custom event was dropped
   691  		m.JSONProperty("deduplicatedUsers").Should(m.Equal(2)),
   692  	))
   693  
   694  	periodicEventGate <- struct{}{}
   695  
   696  	event2 := es.awaitDiagnosticEvent(t) // next periodic event - all counters should have been reset
   697  	m.In(t).Assert(event2, m.AllOf(
   698  		eventKindIs("diagnostic"),
   699  		m.JSONProperty("eventsInLastBatch").Should(m.Equal(0)),
   700  		m.JSONProperty("droppedEvents").Should(m.Equal(0)),
   701  		m.JSONProperty("deduplicatedUsers").Should(m.Equal(0)),
   702  	))
   703  }
   704  
   705  func TestEventsAreKeptInBufferIfAllFlushWorkersAreBusy(t *testing.T) {
   706  	// Note that in the current implementation, although the intention was that we would cancel a flush
   707  	// if there's not an available flush worker, instead what happens is that we will queue *one* flush
   708  	// in that case, and then cancel the *next* flush if the workers are still busy. This is because the
   709  	// flush payload channel has a buffer size of 1, rather than zero. The test below verifies the
   710  	// current behavior.
   711  
   712  	user1 := Context(lduser.NewUser("user1"))
   713  	user2 := Context(lduser.NewUser("user2"))
   714  	user3 := Context(lduser.NewUser("user3"))
   715  
   716  	ep, es := createEventProcessorAndSender(basicConfigWithoutPrivateAttrs())
   717  	defer ep.Close()
   718  
   719  	senderGateCh := make(chan struct{}, maxFlushWorkers)
   720  	senderWaitingCh := make(chan struct{}, maxFlushWorkers)
   721  	es.setGate(senderGateCh, senderWaitingCh)
   722  
   723  	arbitraryContext := Context(ldcontext.New("other"))
   724  	for i := 0; i < maxFlushWorkers; i++ {
   725  		ep.RecordIdentifyEvent(defaultEventFactory.NewIdentifyEventData(arbitraryContext))
   726  		ep.Flush()
   727  		_ = es.awaitEvent(t) // we don't need to see this payload, just throw it away
   728  	}
   729  
   730  	// Each of the worker goroutines should now be blocked waiting for senderGateCh. We can tell when
   731  	// they have all gotten to that point because they have posted to senderReadyCh.
   732  	for i := 0; i < maxFlushWorkers; i++ {
   733  		<-senderWaitingCh
   734  	}
   735  	es.assertNoMoreEvents(t)
   736  	assert.Equal(t, maxFlushWorkers, es.getPayloadCount())
   737  
   738  	// Now, put an event in the buffer and try to flush again. In the current implementation (see
   739  	// above) this payload gets queued in a holding area, and will be flushed after a worker
   740  	// becomes free.
   741  	extraEvent1 := defaultEventFactory.NewIdentifyEventData(user1)
   742  	ep.RecordIdentifyEvent(extraEvent1)
   743  	ep.Flush()
   744  
   745  	// Do an additional flush with another event. This time, the event processor should see that there's
   746  	// no space available and simply ignore the flush request. There's no way to verify programmatically
   747  	// that this has happened, so just give it a short delay.
   748  	extraEvent2 := defaultEventFactory.NewIdentifyEventData(user2)
   749  	ep.RecordIdentifyEvent(extraEvent2)
   750  	ep.Flush()
   751  	<-time.After(100 * time.Millisecond)
   752  	es.assertNoMoreEvents(t)
   753  
   754  	// Enqueue a third event. The current payload should now be extraEvent2 + extraEvent3.
   755  	extraEvent3 := defaultEventFactory.NewIdentifyEventData(user3)
   756  	ep.RecordIdentifyEvent(extraEvent3)
   757  
   758  	// Now allow the workers to unblock.
   759  	for i := 0; i < maxFlushWorkers; i++ {
   760  		senderGateCh <- struct{}{}
   761  	}
   762  
   763  	// The first unblocked worker should pick up the queued payload with event1.
   764  	senderGateCh <- struct{}{}
   765  	assertEventsReceived(t, es, identifyEventForContextKey(user1.context.Key()))
   766  
   767  	// Now a flush should succeed and send the current payload.
   768  	senderGateCh <- struct{}{}
   769  	ep.Flush()
   770  	assertEventsReceived(t, es,
   771  		identifyEventForContextKey(user2.context.Key()),
   772  		identifyEventForContextKey(user3.context.Key()),
   773  	)
   774  	assert.Equal(t, maxFlushWorkers+2, es.getPayloadCount())
   775  }
   776  
   777  // used only for testing - ensures that all pending messages and flushes have completed
   778  func (ep *defaultEventProcessor) waitUntilInactive() {
   779  	m := syncEventsMessage{replyCh: make(chan struct{})}
   780  	ep.inboxCh <- m
   781  	<-m.replyCh // Now we know that all events prior to this call have been processed
   782  }
   783  
   784  func createEventProcessorAndSender(config EventsConfiguration) (*defaultEventProcessor, *mockEventSender) {
   785  	sender := newMockEventSender()
   786  	config.EventSender = sender
   787  	ep := NewDefaultEventProcessor(config)
   788  	return ep.(*defaultEventProcessor), sender
   789  }
   790  
   791  func assertEventsReceived(t *testing.T, es *mockEventSender, matchers ...m.Matcher) {
   792  	t.Helper()
   793  	received := make([]json.RawMessage, 0, len(matchers))
   794  	for range matchers {
   795  		if event, ok := es.tryAwaitEvent(); ok {
   796  			received = append(received, event)
   797  		} else {
   798  			require.Fail(t, "timed out waiting for analytics event(s)", "wanted %d event(s); got: %s",
   799  				len(matchers), jsonhelpers.ToJSONString(received))
   800  		}
   801  	}
   802  	// Use the ItemsInAnyOrder matcher because the exact ordering of events is not significant.
   803  	m.In(t).Assert(received, m.ItemsInAnyOrder(matchers...))
   804  }
   805  
   806  func requireCreationDate(t *testing.T, eventData json.RawMessage) ldtime.UnixMillisecondTime {
   807  	m.In(t).Require(eventData, m.JSONProperty("creationDate").Should(valueIsPositiveNonZeroInteger()))
   808  	return ldtime.UnixMillisecondTime(ldvalue.Parse(eventData).GetByKey("creationDate").Float64Value())
   809  }
   810  

View as plain text