var ( // DefaultExponentialBackoffConfig provides a default configuration for // exponential backoff. DefaultExponentialBackoffConfig = ExponentialBackoffConfig{ Base: time.Second, Factor: time.Second, Max: 20 * time.Second, } )
var ( // ErrSinkClosed is returned if a write is issued to a sink that has been // closed. If encountered, the error should be considered terminal and // retries will not be successful. ErrSinkClosed = fmt.Errorf("events: sink closed") )
Breaker implements a circuit breaker retry strategy.
The current implementation never drops events.
type Breaker struct {
// contains filtered or unexported fields
}
func NewBreaker(threshold int, backoff time.Duration) *Breaker
NewBreaker returns a breaker that will backoff after the threshold has been tripped. A Breaker is thread safe and may be shared by many goroutines.
func (b *Breaker) Failure(event Event, err error) bool
Failure records the failure and latest failure time.
func (b *Breaker) Proceed(event Event) time.Duration
Proceed checks the failures against the threshold.
func (b *Breaker) Success(event Event)
Success resets the breaker.
Broadcaster sends events to multiple, reliable Sinks. The goal of this component is to dispatch events to configured endpoints. Reliability can be provided by wrapping incoming sinks.
type Broadcaster struct {
// contains filtered or unexported fields
}
func NewBroadcaster(sinks ...Sink) *Broadcaster
NewBroadcaster appends one or more sinks to the list of sinks. The broadcaster behavior will be affected by the properties of the sink. Generally, the sink should accept all messages and deal with reliability on its own. Use of EventQueue and RetryingSink should be used here.
func (b *Broadcaster) Add(sink Sink) error
Add the sink to the broadcaster.
The provided sink must be comparable with equality. Typically, this just works with a regular pointer type.
func (b *Broadcaster) Close() error
Close the broadcaster, ensuring that all messages are flushed to the underlying sink before returning.
func (b *Broadcaster) Remove(sink Sink) error
Remove the provided sink.
func (b *Broadcaster) String() string
func (b *Broadcaster) Write(event Event) error
Write accepts an event to be dispatched to all sinks. This method will never fail and should never block (hopefully!). The caller cedes the memory to the broadcaster and should not modify it after calling write.
Channel provides a sink that can be listened on. The writer and channel listener must operate in separate goroutines.
Consumers should listen on Channel.C until Closed is closed.
type Channel struct { C chan Event // contains filtered or unexported fields }
func NewChannel(buffer int) *Channel
NewChannel returns a channel. If buffer is zero, the channel is unbuffered.
func (ch *Channel) Close() error
Close the channel sink.
func (ch *Channel) Done() chan struct{}
Done returns a channel that will always proceed once the sink is closed.
func (ch *Channel) String() string
func (ch *Channel) Write(event Event) error
Write the event to the channel. Must be called in a separate goroutine from the listener.
Event marks items that can be sent as events.
type Event interface{}
ExponentialBackoff implements random backoff with exponentially increasing bounds as the number consecutive failures increase.
type ExponentialBackoff struct {
// contains filtered or unexported fields
}
func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff
NewExponentialBackoff returns an exponential backoff strategy with the desired config. If config is nil, the default is returned.
func (b *ExponentialBackoff) Failure(event Event, err error) bool
Failure increments the failure counter.
func (b *ExponentialBackoff) Proceed(event Event) time.Duration
Proceed returns the next randomly bound exponential backoff time.
func (b *ExponentialBackoff) Success(event Event)
Success resets the failures counter.
ExponentialBackoffConfig configures backoff parameters.
Note that these parameters operate on the upper bound for choosing a random value. For example, at Base=1s, a random value in [0,1s) will be chosen for the backoff value.
type ExponentialBackoffConfig struct { // Base is the minimum bound for backing off after failure. Base time.Duration // Factor sets the amount of time by which the backoff grows with each // failure. Factor time.Duration // Max is the absolute maxiumum bound for a single backoff. Max time.Duration }
Filter provides an event sink that sends only events that are accepted by a Matcher. No methods on filter are goroutine safe.
type Filter struct {
// contains filtered or unexported fields
}
func (f *Filter) Close() error
Close the filter and allow no more events to pass through.
func (f *Filter) Write(event Event) error
Write an event to the filter.
Matcher matches events.
type Matcher interface { Match(event Event) bool }
MatcherFunc implements matcher with just a function.
type MatcherFunc func(event Event) bool
func (fn MatcherFunc) Match(event Event) bool
Match calls the wrapped function.
Queue accepts all messages into a queue for asynchronous consumption by a sink. It is unbounded and thread safe but the sink must be reliable or events will be dropped.
type Queue struct {
// contains filtered or unexported fields
}
func NewQueue(dst Sink) *Queue
NewQueue returns a queue to the provided Sink dst.
func (eq *Queue) Close() error
Close shutsdown the event queue, flushing
func (eq *Queue) Write(event Event) error
Write accepts the events into the queue, only failing if the queue has been closed.
RetryStrategy defines a strategy for retrying event sink writes.
All methods should be goroutine safe.
type RetryStrategy interface { // Proceed is called before every event send. If proceed returns a // positive, non-zero integer, the retryer will back off by the provided // duration. // // An event is provided, by may be ignored. Proceed(event Event) time.Duration // Failure reports a failure to the strategy. If this method returns true, // the event should be dropped. Failure(event Event, err error) bool // Success should be called when an event is sent successfully. Success(event Event) }
RetryingSink retries the write until success or an ErrSinkClosed is returned. Underlying sink must have p > 0 of succeeding or the sink will block. Retry is configured with a RetryStrategy. Concurrent calls to a retrying sink are serialized through the sink, meaning that if one is in-flight, another will not proceed.
type RetryingSink struct {
// contains filtered or unexported fields
}
func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink
NewRetryingSink returns a sink that will retry writes to a sink, backing off on failure. Parameters threshold and backoff adjust the behavior of the circuit breaker.
func (rs *RetryingSink) Close() error
Close closes the sink and the underlying sink.
func (rs *RetryingSink) String() string
func (rs *RetryingSink) Write(event Event) error
Write attempts to flush the events to the downstream sink until it succeeds or the sink is closed.
Sink accepts and sends events.
type Sink interface { // Write an event to the Sink. If no error is returned, the caller will // assume that all events have been committed to the sink. If an error is // received, the caller may retry sending the event. Write(event Event) error // Close the sink, possibly waiting for pending events to flush. Close() error }
func NewFilter(dst Sink, matcher Matcher) Sink
NewFilter returns a new filter that will send to events to dst that return true for Matcher.