...

Source file src/github.com/launchdarkly/go-sdk-events/v2/event_processor.go

Documentation: github.com/launchdarkly/go-sdk-events/v2

     1  package ldevents
     2  
     3  import (
     4  	"encoding/json"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/launchdarkly/go-jsonstream/v3/jwriter"
     9  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    10  	"github.com/launchdarkly/go-sdk-common/v3/ldtime"
    11  	"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
    12  )
    13  
    14  // anyEventInput and anyEventOutput only exist to make it a little clearer in the code whether we're referring
    15  // to something in the inbox or the outbox.
    16  type anyEventInput interface{}
    17  type anyEventOutput interface{}
    18  
    19  type defaultEventProcessor struct {
    20  	inboxCh       chan eventDispatcherMessage
    21  	inboxFullOnce sync.Once
    22  	closeOnce     sync.Once
    23  	loggers       ldlog.Loggers
    24  }
    25  
    26  type eventDispatcher struct {
    27  	config               EventsConfiguration
    28  	outbox               *eventsOutbox
    29  	flushCh              chan *flushPayload
    30  	workersGroup         *sync.WaitGroup
    31  	userKeys             lruCache
    32  	lastKnownPastTime    ldtime.UnixMillisecondTime
    33  	deduplicatedContexts int
    34  	eventsInLastBatch    int
    35  	disabled             bool
    36  	currentTimestampFn   func() ldtime.UnixMillisecondTime
    37  	stateLock            sync.Mutex
    38  }
    39  
    40  type flushPayload struct {
    41  	diagnosticEvent ldvalue.Value
    42  	events          []anyEventOutput
    43  	summary         eventSummary
    44  }
    45  
    46  // Payload of the inboxCh channel.
    47  type eventDispatcherMessage interface{}
    48  
    49  type sendEventMessage struct {
    50  	event anyEventInput
    51  }
    52  
    53  type flushEventsMessage struct {
    54  	replyCh chan struct{}
    55  }
    56  
    57  type shutdownEventsMessage struct {
    58  	replyCh chan struct{}
    59  }
    60  
    61  type syncEventsMessage struct {
    62  	replyCh chan struct{}
    63  }
    64  
    65  const (
    66  	maxFlushWorkers = 5
    67  )
    68  
    69  // NewDefaultEventProcessor creates an instance of the default implementation of analytics event processing.
    70  func NewDefaultEventProcessor(config EventsConfiguration) EventProcessor {
    71  	inboxCh := make(chan eventDispatcherMessage, config.Capacity)
    72  	startEventDispatcher(config, inboxCh)
    73  	return &defaultEventProcessor{
    74  		inboxCh: inboxCh,
    75  		loggers: config.Loggers,
    76  	}
    77  }
    78  
    79  func (ep *defaultEventProcessor) RecordEvaluation(ed EvaluationData) {
    80  	ep.postNonBlockingMessageToInbox(sendEventMessage{event: ed})
    81  }
    82  
    83  func (ep *defaultEventProcessor) RecordIdentifyEvent(e IdentifyEventData) {
    84  	ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
    85  }
    86  
    87  func (ep *defaultEventProcessor) RecordCustomEvent(e CustomEventData) {
    88  	ep.postNonBlockingMessageToInbox(sendEventMessage{event: e})
    89  }
    90  
    91  func (ep *defaultEventProcessor) RecordRawEvent(data json.RawMessage) {
    92  	ep.postNonBlockingMessageToInbox(sendEventMessage{event: rawEvent{data: data}})
    93  }
    94  
    95  func (ep *defaultEventProcessor) Flush() {
    96  	ep.postNonBlockingMessageToInbox(flushEventsMessage{})
    97  }
    98  
    99  func (ep *defaultEventProcessor) FlushBlocking(timeout time.Duration) bool {
   100  	replyCh := make(chan struct{}, 1)
   101  	m := flushEventsMessage{replyCh: replyCh}
   102  	ep.inboxCh <- m
   103  	var deadline <-chan time.Time
   104  	if timeout > 0 {
   105  		timer := time.NewTimer(timeout)
   106  		defer timer.Stop()
   107  		deadline = timer.C
   108  	}
   109  	select {
   110  	case <-m.replyCh:
   111  		return true
   112  	case <-deadline:
   113  		return false
   114  	}
   115  }
   116  
   117  func (ep *defaultEventProcessor) postNonBlockingMessageToInbox(e eventDispatcherMessage) {
   118  	select {
   119  	case ep.inboxCh <- e:
   120  		return
   121  	default: // COVERAGE: no way to simulate this condition in unit tests
   122  	}
   123  	// If the inbox is full, it means the eventDispatcher is seriously backed up with not-yet-processed events.
   124  	// This is unlikely, but if it happens, it means the application is probably doing a ton of flag evaluations
   125  	// across many goroutines-- so if we wait for a space in the inbox, we risk a very serious slowdown of the
   126  	// app. To avoid that, we'll just drop the event. The log warning about this will only be shown once.
   127  	ep.inboxFullOnce.Do(func() { // COVERAGE: no way to simulate this condition in unit tests
   128  		ep.loggers.Warn("Events are being produced faster than they can be processed; some events will be dropped")
   129  	})
   130  }
   131  
   132  func (ep *defaultEventProcessor) Close() error {
   133  	ep.closeOnce.Do(func() {
   134  		// We put the flush and shutdown messages directly into the channel instead of calling
   135  		// postNonBlockingMessageToInbox, because we *do* want to block to make sure there is room in the channel;
   136  		// these aren't analytics events, they are messages that are necessary for an orderly shutdown.
   137  		ep.inboxCh <- flushEventsMessage{}
   138  		m := shutdownEventsMessage{replyCh: make(chan struct{})}
   139  		ep.inboxCh <- m
   140  		<-m.replyCh
   141  	})
   142  	return nil
   143  }
   144  
   145  func startEventDispatcher(
   146  	config EventsConfiguration,
   147  	inboxCh <-chan eventDispatcherMessage,
   148  ) {
   149  	ed := &eventDispatcher{
   150  		config:             config,
   151  		outbox:             newEventsOutbox(config.Capacity, config.Loggers),
   152  		flushCh:            make(chan *flushPayload, 1),
   153  		workersGroup:       &sync.WaitGroup{},
   154  		userKeys:           newLruCache(config.UserKeysCapacity),
   155  		currentTimestampFn: config.currentTimeProvider,
   156  	}
   157  
   158  	if ed.currentTimestampFn == nil {
   159  		ed.currentTimestampFn = ldtime.UnixMillisNow
   160  	}
   161  
   162  	formatter := &eventOutputFormatter{
   163  		contextFormatter: newEventContextFormatter(config),
   164  		config:           config,
   165  	}
   166  
   167  	// Start a fixed-size pool of workers that wait on flushTriggerCh. This is the
   168  	// maximum number of flushes we can do concurrently.
   169  	for i := 0; i < maxFlushWorkers; i++ {
   170  		go runFlushTask(config, formatter, ed.flushCh, ed.workersGroup, ed.handleResult)
   171  	}
   172  	if config.DiagnosticsManager != nil {
   173  		event := config.DiagnosticsManager.CreateInitEvent()
   174  		ed.sendDiagnosticsEvent(event)
   175  	}
   176  	go ed.runMainLoop(inboxCh)
   177  }
   178  
   179  func (ed *eventDispatcher) runMainLoop(
   180  	inboxCh <-chan eventDispatcherMessage,
   181  ) {
   182  	if err := recover(); err != nil { // COVERAGE: no way to simulate this condition in unit tests
   183  		ed.config.Loggers.Errorf("Unexpected panic in event processing thread: %+v", err)
   184  	}
   185  
   186  	flushInterval := ed.config.FlushInterval
   187  	if flushInterval <= 0 { // COVERAGE: no way to test this logic in unit tests
   188  		flushInterval = DefaultFlushInterval
   189  	}
   190  	userKeysFlushInterval := ed.config.UserKeysFlushInterval
   191  	if userKeysFlushInterval <= 0 { // COVERAGE: no way to test this logic in unit tests
   192  		userKeysFlushInterval = DefaultUserKeysFlushInterval
   193  	}
   194  	flushTicker := time.NewTicker(flushInterval)
   195  	usersResetTicker := time.NewTicker(userKeysFlushInterval)
   196  
   197  	var diagnosticsTicker *time.Ticker
   198  	var diagnosticsTickerCh <-chan time.Time
   199  	diagnosticsManager := ed.config.DiagnosticsManager
   200  	if diagnosticsManager != nil {
   201  		interval := ed.config.DiagnosticRecordingInterval
   202  		if interval > 0 {
   203  			if interval < MinimumDiagnosticRecordingInterval { // COVERAGE: no way to test this logic in unit tests
   204  				interval = DefaultDiagnosticRecordingInterval
   205  			}
   206  		} else {
   207  			if ed.config.forceDiagnosticRecordingInterval > 0 {
   208  				interval = ed.config.forceDiagnosticRecordingInterval
   209  			} else {
   210  				interval = DefaultDiagnosticRecordingInterval
   211  			}
   212  		}
   213  		diagnosticsTicker = time.NewTicker(interval)
   214  		diagnosticsTickerCh = diagnosticsTicker.C
   215  	}
   216  
   217  	for {
   218  		// Drain the response channel with a higher priority than anything else
   219  		// to ensure that the flush workers don't get blocked.
   220  		select {
   221  		case message := <-inboxCh:
   222  			switch m := message.(type) {
   223  			case sendEventMessage:
   224  				ed.processEvent(m.event)
   225  			case flushEventsMessage:
   226  				ed.triggerFlush()
   227  				if m.replyCh != nil {
   228  					ed.workersGroup.Wait() // Wait for all in-progress flushes to complete
   229  					m.replyCh <- struct{}{}
   230  				}
   231  			case syncEventsMessage:
   232  				ed.workersGroup.Wait()
   233  				m.replyCh <- struct{}{}
   234  			case shutdownEventsMessage:
   235  				flushTicker.Stop()
   236  				usersResetTicker.Stop()
   237  				if diagnosticsTicker != nil {
   238  					diagnosticsTicker.Stop()
   239  				}
   240  				ed.workersGroup.Wait() // Wait for all in-progress flushes to complete
   241  				close(ed.flushCh)      // Causes all idle flush workers to terminate
   242  				m.replyCh <- struct{}{}
   243  				return
   244  			}
   245  		case <-flushTicker.C:
   246  			ed.triggerFlush()
   247  		case <-usersResetTicker.C:
   248  			ed.userKeys.clear()
   249  		case <-diagnosticsTickerCh:
   250  			if diagnosticsManager == nil || !diagnosticsManager.CanSendStatsEvent() {
   251  				// COVERAGE: no way to test this logic in unit tests
   252  				break
   253  			}
   254  			event := diagnosticsManager.CreateStatsEventAndReset(
   255  				ed.outbox.droppedEvents,
   256  				ed.deduplicatedContexts,
   257  				ed.eventsInLastBatch,
   258  			)
   259  			ed.outbox.droppedEvents = 0
   260  			ed.deduplicatedContexts = 0
   261  			ed.eventsInLastBatch = 0
   262  			ed.sendDiagnosticsEvent(event)
   263  		}
   264  	}
   265  }
   266  
   267  func (ed *eventDispatcher) processEvent(evt anyEventInput) {
   268  	// Decide whether to add the event to the payload. Feature events may be added twice, once for
   269  	// the event (if tracked) and once for debugging.
   270  	willAddFullEvent := true
   271  	var debugEvent anyEventInput
   272  	inlinedUser := false
   273  	var eventContext EventInputContext
   274  	var creationDate ldtime.UnixMillisecondTime
   275  	switch evt := evt.(type) {
   276  	case EvaluationData:
   277  		eventContext = evt.Context
   278  		creationDate = evt.CreationDate
   279  		ed.outbox.addToSummary(evt) // add all feature events to summaries
   280  		willAddFullEvent = evt.RequireFullEvent
   281  		if ed.shouldDebugEvent(&evt) {
   282  			de := evt
   283  			de.debug = true
   284  			debugEvent = de
   285  		}
   286  	case IdentifyEventData:
   287  		eventContext = evt.Context
   288  		creationDate = evt.CreationDate
   289  		inlinedUser = true
   290  	case CustomEventData:
   291  		eventContext = evt.Context
   292  		creationDate = evt.CreationDate
   293  	default:
   294  		ed.outbox.addEvent(evt)
   295  		return
   296  	}
   297  	// For each context we haven't seen before, we add an index event before the event that referenced
   298  	// the context - unless the original event will contain an inline context (e.g. an identify event).
   299  	alreadySeenUser := ed.userKeys.add(eventContext.context.FullyQualifiedKey())
   300  	if !(willAddFullEvent && inlinedUser) {
   301  		if alreadySeenUser {
   302  			ed.deduplicatedContexts++
   303  		} else {
   304  			indexEvent := indexEvent{
   305  				BaseEvent{CreationDate: creationDate, Context: eventContext},
   306  			}
   307  			ed.outbox.addEvent(indexEvent)
   308  		}
   309  	}
   310  	if willAddFullEvent {
   311  		ed.outbox.addEvent(evt)
   312  	}
   313  	if debugEvent != nil {
   314  		ed.outbox.addEvent(debugEvent)
   315  	}
   316  }
   317  
   318  func (ed *eventDispatcher) shouldDebugEvent(evt *EvaluationData) bool {
   319  	if evt.DebugEventsUntilDate == 0 {
   320  		return false
   321  	}
   322  	// The "last known past time" comes from the last HTTP response we got from the server.
   323  	// In case the client's time is set wrong, at least we know that any expiration date
   324  	// earlier than that point is definitely in the past.  If there's any discrepancy, we
   325  	// want to err on the side of cutting off event debugging sooner.
   326  	ed.stateLock.Lock() // This should be done infrequently since it's only for debug events
   327  	defer ed.stateLock.Unlock()
   328  	return evt.DebugEventsUntilDate > ed.lastKnownPastTime &&
   329  		evt.DebugEventsUntilDate > ed.currentTimestampFn()
   330  }
   331  
   332  // Signal that we would like to do a flush as soon as possible.
   333  func (ed *eventDispatcher) triggerFlush() {
   334  	if ed.isDisabled() {
   335  		ed.outbox.clear()
   336  		return
   337  	}
   338  	// Is there anything to flush?
   339  	payload := ed.outbox.getPayload()
   340  	totalEventCount := len(payload.events)
   341  	if payload.summary.hasCounters() {
   342  		totalEventCount++
   343  	}
   344  	if totalEventCount == 0 {
   345  		ed.eventsInLastBatch = 0
   346  		return
   347  	}
   348  	ed.workersGroup.Add(1) // Increment the count of active flushes
   349  	select {
   350  	case ed.flushCh <- &payload:
   351  		// If the channel wasn't full, then there is a worker available who will pick up
   352  		// this flush payload and send it. The event outbox and summary state can now be
   353  		// cleared from the main goroutine.
   354  		ed.eventsInLastBatch = totalEventCount
   355  		ed.outbox.clear()
   356  	default:
   357  		// We can't start a flush right now because we're waiting for one of the workers
   358  		// to pick up the last one.  Do not reset the event outbox or summary state.
   359  		ed.workersGroup.Done()
   360  	}
   361  }
   362  
   363  func (ed *eventDispatcher) isDisabled() bool {
   364  	// Since we're using a mutex, we should avoid calling this often.
   365  	ed.stateLock.Lock()
   366  	defer ed.stateLock.Unlock()
   367  	return ed.disabled
   368  }
   369  
   370  func (ed *eventDispatcher) handleResult(result EventSenderResult) {
   371  	if result.MustShutDown {
   372  		ed.stateLock.Lock()
   373  		defer ed.stateLock.Unlock()
   374  		ed.disabled = true
   375  	} else if result.TimeFromServer > 0 {
   376  		ed.stateLock.Lock()
   377  		defer ed.stateLock.Unlock()
   378  		ed.lastKnownPastTime = result.TimeFromServer
   379  	}
   380  }
   381  
   382  func (ed *eventDispatcher) sendDiagnosticsEvent(
   383  	event ldvalue.Value,
   384  ) {
   385  	payload := flushPayload{diagnosticEvent: event}
   386  	ed.workersGroup.Add(1) // Increment the count of active flushes
   387  	select {
   388  	case ed.flushCh <- &payload:
   389  		// If the channel wasn't full, then there is a worker available who will pick up
   390  		// this flush payload and send it.
   391  	default:
   392  		// We can't start a flush right now because we're waiting for one of the workers
   393  		// to pick up the last one. We'll just discard this diagnostic event - presumably
   394  		// we'll send another one later anyway, and we don't want this kind of nonessential
   395  		// data to cause any kind of back-pressure.
   396  		ed.workersGroup.Done() // COVERAGE: no way to simulate this condition in unit tests
   397  	}
   398  }
   399  
   400  func runFlushTask(config EventsConfiguration, formatter *eventOutputFormatter, flushCh <-chan *flushPayload,
   401  	workersGroup *sync.WaitGroup, resultFn func(EventSenderResult)) {
   402  	for {
   403  		payload, more := <-flushCh
   404  		if !more {
   405  			// Channel has been closed - we're shutting down
   406  			break
   407  		}
   408  		if !payload.diagnosticEvent.IsNull() {
   409  			w := jwriter.NewWriter()
   410  			payload.diagnosticEvent.WriteToJSONWriter(&w)
   411  			bytes := w.Bytes()
   412  			_ = config.EventSender.SendEventData(DiagnosticEventDataKind, bytes, 1)
   413  		} else {
   414  			bytes, count := formatter.makeOutputEvents(payload.events, payload.summary)
   415  			if len(bytes) > 0 {
   416  				result := config.EventSender.SendEventData(AnalyticsEventDataKind, bytes, count)
   417  				resultFn(result)
   418  			}
   419  		}
   420  		workersGroup.Done() // Decrement the count of in-progress flushes
   421  	}
   422  }
   423  

View as plain text