...

Source file src/github.com/docker/go-events/retry.go

Documentation: github.com/docker/go-events

     1  package events
     2  
     3  import (
     4  	"fmt"
     5  	"math/rand"
     6  	"sync"
     7  	"sync/atomic"
     8  	"time"
     9  
    10  	"github.com/sirupsen/logrus"
    11  )
    12  
    13  // RetryingSink retries the write until success or an ErrSinkClosed is
    14  // returned. Underlying sink must have p > 0 of succeeding or the sink will
    15  // block. Retry is configured with a RetryStrategy.  Concurrent calls to a
    16  // retrying sink are serialized through the sink, meaning that if one is
    17  // in-flight, another will not proceed.
    18  type RetryingSink struct {
    19  	sink     Sink
    20  	strategy RetryStrategy
    21  	closed   chan struct{}
    22  	once     sync.Once
    23  }
    24  
    25  // NewRetryingSink returns a sink that will retry writes to a sink, backing
    26  // off on failure. Parameters threshold and backoff adjust the behavior of the
    27  // circuit breaker.
    28  func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
    29  	rs := &RetryingSink{
    30  		sink:     sink,
    31  		strategy: strategy,
    32  		closed:   make(chan struct{}),
    33  	}
    34  
    35  	return rs
    36  }
    37  
    38  // Write attempts to flush the events to the downstream sink until it succeeds
    39  // or the sink is closed.
    40  func (rs *RetryingSink) Write(event Event) error {
    41  	logger := logrus.WithField("event", event)
    42  
    43  retry:
    44  	select {
    45  	case <-rs.closed:
    46  		return ErrSinkClosed
    47  	default:
    48  	}
    49  
    50  	if backoff := rs.strategy.Proceed(event); backoff > 0 {
    51  		select {
    52  		case <-time.After(backoff):
    53  			// TODO(stevvooe): This branch holds up the next try. Before, we
    54  			// would simply break to the "retry" label and then possibly wait
    55  			// again. However, this requires all retry strategies to have a
    56  			// large probability of probing the sync for success, rather than
    57  			// just backing off and sending the request.
    58  		case <-rs.closed:
    59  			return ErrSinkClosed
    60  		}
    61  	}
    62  
    63  	if err := rs.sink.Write(event); err != nil {
    64  		if err == ErrSinkClosed {
    65  			// terminal!
    66  			return err
    67  		}
    68  
    69  		logger := logger.WithError(err) // shadow!!
    70  
    71  		if rs.strategy.Failure(event, err) {
    72  			logger.Errorf("retryingsink: dropped event")
    73  			return nil
    74  		}
    75  
    76  		logger.Errorf("retryingsink: error writing event, retrying")
    77  		goto retry
    78  	}
    79  
    80  	rs.strategy.Success(event)
    81  	return nil
    82  }
    83  
    84  // Close closes the sink and the underlying sink.
    85  func (rs *RetryingSink) Close() error {
    86  	rs.once.Do(func() {
    87  		close(rs.closed)
    88  	})
    89  
    90  	return nil
    91  }
    92  
    93  func (rs *RetryingSink) String() string {
    94  	// Serialize a copy of the RetryingSink without the sync.Once, to avoid
    95  	// a data race.
    96  	rs2 := map[string]interface{}{
    97  		"sink":     rs.sink,
    98  		"strategy": rs.strategy,
    99  		"closed":   rs.closed,
   100  	}
   101  	return fmt.Sprint(rs2)
   102  }
   103  
   104  // RetryStrategy defines a strategy for retrying event sink writes.
   105  //
   106  // All methods should be goroutine safe.
   107  type RetryStrategy interface {
   108  	// Proceed is called before every event send. If proceed returns a
   109  	// positive, non-zero integer, the retryer will back off by the provided
   110  	// duration.
   111  	//
   112  	// An event is provided, by may be ignored.
   113  	Proceed(event Event) time.Duration
   114  
   115  	// Failure reports a failure to the strategy. If this method returns true,
   116  	// the event should be dropped.
   117  	Failure(event Event, err error) bool
   118  
   119  	// Success should be called when an event is sent successfully.
   120  	Success(event Event)
   121  }
   122  
   123  // Breaker implements a circuit breaker retry strategy.
   124  //
   125  // The current implementation never drops events.
   126  type Breaker struct {
   127  	threshold int
   128  	recent    int
   129  	last      time.Time
   130  	backoff   time.Duration // time after which we retry after failure.
   131  	mu        sync.Mutex
   132  }
   133  
   134  var _ RetryStrategy = &Breaker{}
   135  
   136  // NewBreaker returns a breaker that will backoff after the threshold has been
   137  // tripped. A Breaker is thread safe and may be shared by many goroutines.
   138  func NewBreaker(threshold int, backoff time.Duration) *Breaker {
   139  	return &Breaker{
   140  		threshold: threshold,
   141  		backoff:   backoff,
   142  	}
   143  }
   144  
   145  // Proceed checks the failures against the threshold.
   146  func (b *Breaker) Proceed(event Event) time.Duration {
   147  	b.mu.Lock()
   148  	defer b.mu.Unlock()
   149  
   150  	if b.recent < b.threshold {
   151  		return 0
   152  	}
   153  
   154  	return b.last.Add(b.backoff).Sub(time.Now())
   155  }
   156  
   157  // Success resets the breaker.
   158  func (b *Breaker) Success(event Event) {
   159  	b.mu.Lock()
   160  	defer b.mu.Unlock()
   161  
   162  	b.recent = 0
   163  	b.last = time.Time{}
   164  }
   165  
   166  // Failure records the failure and latest failure time.
   167  func (b *Breaker) Failure(event Event, err error) bool {
   168  	b.mu.Lock()
   169  	defer b.mu.Unlock()
   170  
   171  	b.recent++
   172  	b.last = time.Now().UTC()
   173  	return false // never drop events.
   174  }
   175  
   176  var (
   177  	// DefaultExponentialBackoffConfig provides a default configuration for
   178  	// exponential backoff.
   179  	DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
   180  		Base:   time.Second,
   181  		Factor: time.Second,
   182  		Max:    20 * time.Second,
   183  	}
   184  )
   185  
   186  // ExponentialBackoffConfig configures backoff parameters.
   187  //
   188  // Note that these parameters operate on the upper bound for choosing a random
   189  // value. For example, at Base=1s, a random value in [0,1s) will be chosen for
   190  // the backoff value.
   191  type ExponentialBackoffConfig struct {
   192  	// Base is the minimum bound for backing off after failure.
   193  	Base time.Duration
   194  
   195  	// Factor sets the amount of time by which the backoff grows with each
   196  	// failure.
   197  	Factor time.Duration
   198  
   199  	// Max is the absolute maxiumum bound for a single backoff.
   200  	Max time.Duration
   201  }
   202  
   203  // ExponentialBackoff implements random backoff with exponentially increasing
   204  // bounds as the number consecutive failures increase.
   205  type ExponentialBackoff struct {
   206  	failures uint64 // consecutive failure counter (needs to be 64-bit aligned)
   207  	config   ExponentialBackoffConfig
   208  }
   209  
   210  // NewExponentialBackoff returns an exponential backoff strategy with the
   211  // desired config. If config is nil, the default is returned.
   212  func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
   213  	return &ExponentialBackoff{
   214  		config: config,
   215  	}
   216  }
   217  
   218  // Proceed returns the next randomly bound exponential backoff time.
   219  func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
   220  	return b.backoff(atomic.LoadUint64(&b.failures))
   221  }
   222  
   223  // Success resets the failures counter.
   224  func (b *ExponentialBackoff) Success(event Event) {
   225  	atomic.StoreUint64(&b.failures, 0)
   226  }
   227  
   228  // Failure increments the failure counter.
   229  func (b *ExponentialBackoff) Failure(event Event, err error) bool {
   230  	atomic.AddUint64(&b.failures, 1)
   231  	return false
   232  }
   233  
   234  // backoff calculates the amount of time to wait based on the number of
   235  // consecutive failures.
   236  func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
   237  	if failures <= 0 {
   238  		// proceed normally when there are no failures.
   239  		return 0
   240  	}
   241  
   242  	factor := b.config.Factor
   243  	if factor <= 0 {
   244  		factor = DefaultExponentialBackoffConfig.Factor
   245  	}
   246  
   247  	backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
   248  
   249  	max := b.config.Max
   250  	if max <= 0 {
   251  		max = DefaultExponentialBackoffConfig.Max
   252  	}
   253  
   254  	if backoff > max || backoff < 0 {
   255  		backoff = max
   256  	}
   257  
   258  	// Choose a uniformly distributed value from [0, backoff).
   259  	return time.Duration(rand.Int63n(int64(backoff)))
   260  }
   261  

View as plain text