...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datasource/polling_data_source.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datasource

     1  package datasource
     2  
     3  import (
     4  	"sync"
     5  	"time"
     6  
     7  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
     8  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
     9  	"github.com/launchdarkly/go-server-sdk/v6/internal"
    10  	"github.com/launchdarkly/go-server-sdk/v6/subsystems"
    11  )
    12  
    13  const (
    14  	pollingErrorContext     = "on polling request"
    15  	pollingWillRetryMessage = "will retry at next scheduled poll interval"
    16  )
    17  
    18  // PollingProcessor is the internal implementation of the polling data source.
    19  //
    20  // This type is exported from internal so that the PollingDataSourceBuilder tests can verify its
    21  // configuration. All other code outside of this package should interact with it only via the
    22  // DataSource interface.
    23  type PollingProcessor struct {
    24  	dataSourceUpdates  subsystems.DataSourceUpdateSink
    25  	requestor          requestor
    26  	pollInterval       time.Duration
    27  	loggers            ldlog.Loggers
    28  	setInitializedOnce sync.Once
    29  	isInitialized      internal.AtomicBoolean
    30  	quit               chan struct{}
    31  	closeOnce          sync.Once
    32  }
    33  
    34  // NewPollingProcessor creates the internal implementation of the polling data source.
    35  func NewPollingProcessor(
    36  	context subsystems.ClientContext,
    37  	dataSourceUpdates subsystems.DataSourceUpdateSink,
    38  	baseURI string,
    39  	pollInterval time.Duration,
    40  ) *PollingProcessor {
    41  	requestor := newRequestorImpl(context, context.GetHTTP().CreateHTTPClient(), baseURI)
    42  	return newPollingProcessor(context, dataSourceUpdates, requestor, pollInterval)
    43  }
    44  
    45  func newPollingProcessor(
    46  	context subsystems.ClientContext,
    47  	dataSourceUpdates subsystems.DataSourceUpdateSink,
    48  	requestor requestor,
    49  	pollInterval time.Duration,
    50  ) *PollingProcessor {
    51  	pp := &PollingProcessor{
    52  		dataSourceUpdates: dataSourceUpdates,
    53  		requestor:         requestor,
    54  		pollInterval:      pollInterval,
    55  		loggers:           context.GetLogging().Loggers,
    56  		quit:              make(chan struct{}),
    57  	}
    58  
    59  	return pp
    60  }
    61  
    62  //nolint:revive // no doc comment for standard method
    63  func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
    64  	pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)
    65  
    66  	ticker := newTickerWithInitialTick(pp.pollInterval)
    67  
    68  	go func() {
    69  		defer ticker.Stop()
    70  
    71  		var readyOnce sync.Once
    72  		notifyReady := func() {
    73  			readyOnce.Do(func() {
    74  				close(closeWhenReady)
    75  			})
    76  		}
    77  		// Ensure we stop waiting for initialization if we exit, even if initialization fails
    78  		defer notifyReady()
    79  
    80  		for {
    81  			select {
    82  			case <-pp.quit:
    83  				return
    84  			case <-ticker.C:
    85  				if err := pp.poll(); err != nil {
    86  					if hse, ok := err.(httpStatusError); ok {
    87  						errorInfo := interfaces.DataSourceErrorInfo{
    88  							Kind:       interfaces.DataSourceErrorKindErrorResponse,
    89  							StatusCode: hse.Code,
    90  							Time:       time.Now(),
    91  						}
    92  						recoverable := checkIfErrorIsRecoverableAndLog(
    93  							pp.loggers,
    94  							httpErrorDescription(hse.Code),
    95  							pollingErrorContext,
    96  							hse.Code,
    97  							pollingWillRetryMessage,
    98  						)
    99  						if recoverable {
   100  							pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
   101  						} else {
   102  							pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
   103  							notifyReady()
   104  							return
   105  						}
   106  					} else {
   107  						errorInfo := interfaces.DataSourceErrorInfo{
   108  							Kind:    interfaces.DataSourceErrorKindNetworkError,
   109  							Message: err.Error(),
   110  							Time:    time.Now(),
   111  						}
   112  						if _, ok := err.(malformedJSONError); ok {
   113  							errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData
   114  						}
   115  						checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
   116  						pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
   117  					}
   118  					continue
   119  				}
   120  				pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
   121  				pp.setInitializedOnce.Do(func() {
   122  					pp.isInitialized.Set(true)
   123  					pp.loggers.Info("First polling request successful")
   124  					notifyReady()
   125  				})
   126  			}
   127  		}
   128  	}()
   129  }
   130  
   131  func (pp *PollingProcessor) poll() error {
   132  	allData, cached, err := pp.requestor.requestAll()
   133  
   134  	if err != nil {
   135  		return err
   136  	}
   137  
   138  	// We initialize the store only if the request wasn't cached
   139  	if !cached {
   140  		pp.dataSourceUpdates.Init(allData)
   141  	}
   142  	return nil
   143  }
   144  
   145  //nolint:revive // no doc comment for standard method
   146  func (pp *PollingProcessor) Close() error {
   147  	pp.closeOnce.Do(func() {
   148  		close(pp.quit)
   149  	})
   150  	return nil
   151  }
   152  
   153  //nolint:revive // no doc comment for standard method
   154  func (pp *PollingProcessor) IsInitialized() bool {
   155  	return pp.isInitialized.Get()
   156  }
   157  
   158  // GetBaseURI returns the configured polling base URI, for testing.
   159  func (pp *PollingProcessor) GetBaseURI() string {
   160  	return (pp.requestor.(*requestorImpl)).baseURI
   161  }
   162  
   163  // GetPollInterval returns the configured polling interval, for testing.
   164  func (pp *PollingProcessor) GetPollInterval() time.Duration {
   165  	return pp.pollInterval
   166  }
   167  
   168  type tickerWithInitialTick struct {
   169  	*time.Ticker
   170  	C <-chan time.Time
   171  }
   172  
   173  func newTickerWithInitialTick(interval time.Duration) *tickerWithInitialTick {
   174  	c := make(chan time.Time)
   175  	ticker := time.NewTicker(interval)
   176  	t := &tickerWithInitialTick{
   177  		C:      c,
   178  		Ticker: ticker,
   179  	}
   180  	go func() {
   181  		c <- time.Now() // Ensure we do an initial poll immediately
   182  		for tt := range ticker.C {
   183  			c <- tt
   184  		}
   185  	}()
   186  	return t
   187  }
   188  

View as plain text