...

Source file src/github.com/launchdarkly/go-sdk-events/v2/event_sender.go

Documentation: github.com/launchdarkly/go-sdk-events/v2

     1  package ldevents
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  	"io/ioutil"
     7  	"net/http"
     8  	"strconv"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    13  	"github.com/launchdarkly/go-sdk-common/v3/ldtime"
    14  
    15  	"github.com/google/uuid"
    16  )
    17  
    18  const (
    19  	defaultEventsURI   = "https://events.launchdarkly.com"
    20  	eventSchemaHeader  = "X-LaunchDarkly-Event-Schema"
    21  	payloadIDHeader    = "X-LaunchDarkly-Payload-ID"
    22  	currentEventSchema = "4"
    23  	defaultRetryDelay  = time.Second
    24  )
    25  
    26  // EventSenderConfiguration contains parameters for event delivery that do not vary from one event payload to another.
    27  type EventSenderConfiguration struct {
    28  	// Client is the HTTP client instance to use, or nil to use http.DefaultClient.
    29  	Client *http.Client
    30  	// BaseURI is the base URI to which the event endpoint paths will be added.
    31  	BaseURI string
    32  	// BaseHeaders contains any headers that should be added to the HTTP request, other than the schema version.
    33  	// The event delivery logic will never modify this map; it will clone it if necessary.
    34  	BaseHeaders func() http.Header
    35  	// SchemaVersion specifies the value for the X-LaunchDarkly-Event-Schema header, or 0 to use the latest version.
    36  	SchemaVersion int
    37  	// Loggers is used for logging event delivery status.
    38  	Loggers ldlog.Loggers
    39  	// RetryDelay is the length of time to wait for a retry, or 0 to use the default delay (1 second).
    40  	RetryDelay time.Duration
    41  }
    42  
    43  type defaultEventSender struct {
    44  	config EventSenderConfiguration
    45  }
    46  
    47  // NewServerSideEventSender creates the standard implementation of EventSender for server-side SDKs.
    48  //
    49  // The underlying behavior is mostly provided by SendEventData. It adds the behavior of putting the SDK key in an
    50  // Authorization header (in addition to any other headers in config.BaseHeaders), and it forces the schema version
    51  // to be the latest schema version (since, in the regular use case of EventSender being used within a
    52  // DefaultEventProcessor, the latter is only ever going to generate output in the current schema).
    53  //
    54  // This object maintains no state other than its configuration, so discarding it does not require any special
    55  // cleanup.
    56  func NewServerSideEventSender(
    57  	config EventSenderConfiguration,
    58  	sdkKey string,
    59  ) EventSender {
    60  	realConfig := config
    61  	realConfig.SchemaVersion = 0 // defaults to current
    62  	realConfig.BaseHeaders = func() http.Header {
    63  		var base http.Header
    64  		if config.BaseHeaders != nil {
    65  			base = config.BaseHeaders()
    66  		}
    67  		ret := make(http.Header, len(base)+1)
    68  		for k, vv := range base {
    69  			ret[k] = vv
    70  		}
    71  		ret.Set("Authorization", sdkKey)
    72  		return ret
    73  	}
    74  	return &defaultEventSender{
    75  		config: realConfig,
    76  	}
    77  }
    78  
    79  func (s *defaultEventSender) SendEventData(kind EventDataKind, data []byte, eventCount int) EventSenderResult {
    80  	return SendEventDataWithRetry(
    81  		s.config,
    82  		kind,
    83  		"",
    84  		data,
    85  		eventCount,
    86  	)
    87  }
    88  
    89  // SendEventDataWithRetry provides an entry point to the same event delivery logic that is used by DefaultEventSender.
    90  // This is exported separately for convenience in code such as the Relay Proxy which needs to implement the same
    91  // behavior in situations where EventProcessor and EventSender are not relevant. The behavior provided is specifically:
    92  //
    93  // 1. Add headers as follows, besides config.BaseHeaders: Content-Type (application/json); X-LaunchDarkly-Schema-Version
    94  // (based on config.Schema Version; omitted for diagnostic events); and X-LaunchDarkly-Payload-ID (a UUID value).
    95  // Unlike NewServerSideEventSender, it does not add an Authorization header.
    96  //
    97  // 2. If delivery fails with a recoverable error, such as an HTTP 503 or an I/O error, retry exactly once after a delay
    98  // configured by config.RetryDelay. This is done synchronously. If the retry fails, return Success: false.
    99  //
   100  // 3. If delivery fails with an unrecoverable error, such as an HTTP 401, return Success: false and MustShutDown: true.
   101  //
   102  // 4. If the response has a Date header, parse it into TimeFromServer.
   103  //
   104  // The overridePath parameter only needs to be set if you need to customize the URI path. If it is empty, the standard
   105  // path of /bulk or /diagnostic will be used as appropriate.
   106  func SendEventDataWithRetry(
   107  	config EventSenderConfiguration,
   108  	kind EventDataKind,
   109  	overridePath string,
   110  	data []byte,
   111  	eventCount int,
   112  ) EventSenderResult {
   113  	headers := make(http.Header)
   114  	if config.BaseHeaders != nil {
   115  		for k, vv := range config.BaseHeaders() {
   116  			headers[k] = vv
   117  		}
   118  	}
   119  	headers.Set("Content-Type", "application/json")
   120  
   121  	var uri, path, description string
   122  
   123  	switch kind {
   124  	case AnalyticsEventDataKind:
   125  		path = "/bulk"
   126  		description = fmt.Sprintf("%d events", eventCount)
   127  		if config.SchemaVersion == 0 {
   128  			headers.Add(eventSchemaHeader, currentEventSchema)
   129  		} else {
   130  			headers.Add(eventSchemaHeader, strconv.Itoa(config.SchemaVersion))
   131  		}
   132  		payloadUUID, _ := uuid.NewRandom()
   133  		headers.Add(payloadIDHeader, payloadUUID.String())
   134  		// if NewRandom somehow failed, we'll just proceed with an empty string
   135  	case DiagnosticEventDataKind:
   136  		path = "/diagnostic"
   137  		description = "diagnostic event"
   138  	default:
   139  		return EventSenderResult{}
   140  	}
   141  
   142  	if overridePath != "" {
   143  		path = "/" + strings.TrimLeft(overridePath, "/")
   144  	}
   145  	baseURI := strings.TrimRight(config.BaseURI, "/")
   146  	if baseURI == "" {
   147  		baseURI = defaultEventsURI
   148  	}
   149  	uri = baseURI + path
   150  
   151  	config.Loggers.Debugf("Sending %s: %s", description, data)
   152  
   153  	var resp *http.Response
   154  	var respErr error
   155  	for attempt := 0; attempt < 2; attempt++ {
   156  		if attempt > 0 {
   157  			delay := config.RetryDelay
   158  			if delay == 0 {
   159  				delay = defaultRetryDelay // COVERAGE: unit tests always set a short delay
   160  			}
   161  			config.Loggers.Warnf("Will retry posting events after %f second", float64(delay/time.Second))
   162  			time.Sleep(delay)
   163  		}
   164  		req, reqErr := http.NewRequest("POST", uri, bytes.NewReader(data))
   165  		if reqErr != nil { // COVERAGE: no way to simulate this condition in unit tests
   166  			config.Loggers.Errorf("Unexpected error while creating event request: %+v", reqErr)
   167  			return EventSenderResult{}
   168  		}
   169  		req.Header = headers
   170  
   171  		client := config.Client
   172  		if client == nil {
   173  			client = http.DefaultClient
   174  		}
   175  		resp, respErr = client.Do(req)
   176  
   177  		if resp != nil && resp.Body != nil {
   178  			_, _ = ioutil.ReadAll(resp.Body)
   179  			_ = resp.Body.Close()
   180  		}
   181  
   182  		if respErr != nil {
   183  			config.Loggers.Warnf("Unexpected error while sending events: %+v", respErr)
   184  			continue
   185  		}
   186  		if resp.StatusCode >= 200 && resp.StatusCode < 300 {
   187  			result := EventSenderResult{Success: true}
   188  			t, err := http.ParseTime(resp.Header.Get("Date"))
   189  			if err == nil {
   190  				result.TimeFromServer = ldtime.UnixMillisFromTime(t)
   191  			}
   192  			return result
   193  		}
   194  		if isHTTPErrorRecoverable(resp.StatusCode) {
   195  			maybeRetry := "will retry"
   196  			if attempt == 1 {
   197  				maybeRetry = "some events were dropped"
   198  			}
   199  			config.Loggers.Warnf(httpErrorMessage(resp.StatusCode, "sending events", maybeRetry))
   200  		} else {
   201  			config.Loggers.Warnf(httpErrorMessage(resp.StatusCode, "sending events", ""))
   202  			return EventSenderResult{MustShutDown: true}
   203  		}
   204  	}
   205  	return EventSenderResult{}
   206  }
   207  

View as plain text