...

Source file src/github.com/launchdarkly/go-server-sdk/v6/internal/datastore/data_store_status_poller.go

Documentation: github.com/launchdarkly/go-server-sdk/v6/internal/datastore

     1  package datastore
     2  
     3  import (
     4  	"sync"
     5  	"time"
     6  
     7  	"github.com/launchdarkly/go-server-sdk/v6/interfaces"
     8  
     9  	"github.com/launchdarkly/go-sdk-common/v3/ldlog"
    10  )
    11  
    12  // dataStoreStatusPoller maintains the "last known available" state for a persistent data store and
    13  // can poll the store for recovery. This is used only by persistentDataStoreWrapper.
    14  type dataStoreStatusPoller struct {
    15  	statusUpdater     func(interfaces.DataStoreStatus)
    16  	lock              sync.Mutex
    17  	lastAvailable     bool
    18  	pollFn            func() bool
    19  	refreshOnRecovery bool
    20  	pollCloser        chan struct{}
    21  	closeOnce         sync.Once
    22  	loggers           ldlog.Loggers
    23  }
    24  
    25  const statusPollInterval = time.Millisecond * 500
    26  
    27  // newDataStoreStatusPoller creates a new dataStoreStatusPoller. The pollFn should return
    28  // true if the store is available, false if not.
    29  func newDataStoreStatusPoller(
    30  	availableNow bool,
    31  	pollFn func() bool,
    32  	statusUpdater func(interfaces.DataStoreStatus),
    33  	refreshOnRecovery bool,
    34  	loggers ldlog.Loggers,
    35  ) *dataStoreStatusPoller {
    36  	return &dataStoreStatusPoller{
    37  		lastAvailable:     availableNow,
    38  		pollFn:            pollFn,
    39  		statusUpdater:     statusUpdater,
    40  		refreshOnRecovery: refreshOnRecovery,
    41  		loggers:           loggers,
    42  	}
    43  }
    44  
    45  // UpdateAvailability signals that the store is now available or unavailable. If that is a change,
    46  // an update will be sent (and, if the new status is unavailable, it will start polling for recovery).
    47  func (m *dataStoreStatusPoller) UpdateAvailability(available bool) {
    48  	m.lock.Lock()
    49  	defer m.lock.Unlock()
    50  	if available == m.lastAvailable {
    51  		return
    52  	}
    53  	m.lastAvailable = available
    54  	newStatus := interfaces.DataStoreStatus{Available: available}
    55  	if available {
    56  		m.loggers.Warn("Persistent store is available again")
    57  		newStatus.NeedsRefresh = m.refreshOnRecovery
    58  	}
    59  	m.statusUpdater(newStatus)
    60  
    61  	// If the store has just become unavailable, start a poller to detect when it comes back.
    62  	if !available {
    63  		m.loggers.Warn("Detected persistent store unavailability; updates will be cached until it recovers")
    64  		// Start a goroutine to poll until the store starts working again or we shut down.
    65  		m.pollCloser = m.startStatusPoller()
    66  	}
    67  }
    68  
    69  // Close shuts down all channels and goroutines used by the manager.
    70  func (m *dataStoreStatusPoller) Close() {
    71  	m.closeOnce.Do(func() {
    72  		if m.pollCloser != nil {
    73  			close(m.pollCloser)
    74  			m.pollCloser = nil
    75  		}
    76  	})
    77  }
    78  
    79  func (m *dataStoreStatusPoller) startStatusPoller() chan struct{} {
    80  	closer := make(chan struct{})
    81  	go func() {
    82  		ticker := time.NewTicker(statusPollInterval)
    83  		defer ticker.Stop()
    84  		for {
    85  			select {
    86  			case <-ticker.C:
    87  				if m.pollFn() {
    88  					m.UpdateAvailability(true)
    89  					return
    90  				}
    91  			case <-closer:
    92  				return
    93  			}
    94  		}
    95  	}()
    96  	return closer
    97  }
    98  

View as plain text