...

Package events

import "github.com/docker/go-events"
Overview
Index

Overview ▾

Index ▾

Variables
type Breaker
    func NewBreaker(threshold int, backoff time.Duration) *Breaker
    func (b *Breaker) Failure(event Event, err error) bool
    func (b *Breaker) Proceed(event Event) time.Duration
    func (b *Breaker) Success(event Event)
type Broadcaster
    func NewBroadcaster(sinks ...Sink) *Broadcaster
    func (b *Broadcaster) Add(sink Sink) error
    func (b *Broadcaster) Close() error
    func (b *Broadcaster) Remove(sink Sink) error
    func (b *Broadcaster) String() string
    func (b *Broadcaster) Write(event Event) error
type Channel
    func NewChannel(buffer int) *Channel
    func (ch *Channel) Close() error
    func (ch *Channel) Done() chan struct{}
    func (ch *Channel) String() string
    func (ch *Channel) Write(event Event) error
type Event
type ExponentialBackoff
    func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff
    func (b *ExponentialBackoff) Failure(event Event, err error) bool
    func (b *ExponentialBackoff) Proceed(event Event) time.Duration
    func (b *ExponentialBackoff) Success(event Event)
type ExponentialBackoffConfig
type Filter
    func (f *Filter) Close() error
    func (f *Filter) Write(event Event) error
type Matcher
type MatcherFunc
    func (fn MatcherFunc) Match(event Event) bool
type Queue
    func NewQueue(dst Sink) *Queue
    func (eq *Queue) Close() error
    func (eq *Queue) Write(event Event) error
type RetryStrategy
type RetryingSink
    func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink
    func (rs *RetryingSink) Close() error
    func (rs *RetryingSink) String() string
    func (rs *RetryingSink) Write(event Event) error
type Sink
    func NewFilter(dst Sink, matcher Matcher) Sink

Package files

broadcast.go channel.go errors.go event.go filter.go queue.go retry.go

Variables

var (
    // DefaultExponentialBackoffConfig provides a default configuration for
    // exponential backoff.
    DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
        Base:   time.Second,
        Factor: time.Second,
        Max:    20 * time.Second,
    }
)
var (
    // ErrSinkClosed is returned if a write is issued to a sink that has been
    // closed. If encountered, the error should be considered terminal and
    // retries will not be successful.
    ErrSinkClosed = fmt.Errorf("events: sink closed")
)

type Breaker

Breaker implements a circuit breaker retry strategy.

The current implementation never drops events.

type Breaker struct {
    // contains filtered or unexported fields
}

func NewBreaker

func NewBreaker(threshold int, backoff time.Duration) *Breaker

NewBreaker returns a breaker that will backoff after the threshold has been tripped. A Breaker is thread safe and may be shared by many goroutines.

func (*Breaker) Failure

func (b *Breaker) Failure(event Event, err error) bool

Failure records the failure and latest failure time.

func (*Breaker) Proceed

func (b *Breaker) Proceed(event Event) time.Duration

Proceed checks the failures against the threshold.

func (*Breaker) Success

func (b *Breaker) Success(event Event)

Success resets the breaker.

type Broadcaster

Broadcaster sends events to multiple, reliable Sinks. The goal of this component is to dispatch events to configured endpoints. Reliability can be provided by wrapping incoming sinks.

type Broadcaster struct {
    // contains filtered or unexported fields
}

func NewBroadcaster

func NewBroadcaster(sinks ...Sink) *Broadcaster

NewBroadcaster appends one or more sinks to the list of sinks. The broadcaster behavior will be affected by the properties of the sink. Generally, the sink should accept all messages and deal with reliability on its own. Use of EventQueue and RetryingSink should be used here.

func (*Broadcaster) Add

func (b *Broadcaster) Add(sink Sink) error

Add the sink to the broadcaster.

The provided sink must be comparable with equality. Typically, this just works with a regular pointer type.

func (*Broadcaster) Close

func (b *Broadcaster) Close() error

Close the broadcaster, ensuring that all messages are flushed to the underlying sink before returning.

func (*Broadcaster) Remove

func (b *Broadcaster) Remove(sink Sink) error

Remove the provided sink.

func (*Broadcaster) String

func (b *Broadcaster) String() string

func (*Broadcaster) Write

func (b *Broadcaster) Write(event Event) error

Write accepts an event to be dispatched to all sinks. This method will never fail and should never block (hopefully!). The caller cedes the memory to the broadcaster and should not modify it after calling write.

type Channel

Channel provides a sink that can be listened on. The writer and channel listener must operate in separate goroutines.

Consumers should listen on Channel.C until Closed is closed.

type Channel struct {
    C chan Event
    // contains filtered or unexported fields
}

func NewChannel

func NewChannel(buffer int) *Channel

NewChannel returns a channel. If buffer is zero, the channel is unbuffered.

func (*Channel) Close

func (ch *Channel) Close() error

Close the channel sink.

func (*Channel) Done

func (ch *Channel) Done() chan struct{}

Done returns a channel that will always proceed once the sink is closed.

func (*Channel) String

func (ch *Channel) String() string

func (*Channel) Write

func (ch *Channel) Write(event Event) error

Write the event to the channel. Must be called in a separate goroutine from the listener.

type Event

Event marks items that can be sent as events.

type Event interface{}

type ExponentialBackoff

ExponentialBackoff implements random backoff with exponentially increasing bounds as the number consecutive failures increase.

type ExponentialBackoff struct {
    // contains filtered or unexported fields
}

func NewExponentialBackoff

func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff

NewExponentialBackoff returns an exponential backoff strategy with the desired config. If config is nil, the default is returned.

func (*ExponentialBackoff) Failure

func (b *ExponentialBackoff) Failure(event Event, err error) bool

Failure increments the failure counter.

func (*ExponentialBackoff) Proceed

func (b *ExponentialBackoff) Proceed(event Event) time.Duration

Proceed returns the next randomly bound exponential backoff time.

func (*ExponentialBackoff) Success

func (b *ExponentialBackoff) Success(event Event)

Success resets the failures counter.

type ExponentialBackoffConfig

ExponentialBackoffConfig configures backoff parameters.

Note that these parameters operate on the upper bound for choosing a random value. For example, at Base=1s, a random value in [0,1s) will be chosen for the backoff value.

type ExponentialBackoffConfig struct {
    // Base is the minimum bound for backing off after failure.
    Base time.Duration

    // Factor sets the amount of time by which the backoff grows with each
    // failure.
    Factor time.Duration

    // Max is the absolute maxiumum bound for a single backoff.
    Max time.Duration
}

type Filter

Filter provides an event sink that sends only events that are accepted by a Matcher. No methods on filter are goroutine safe.

type Filter struct {
    // contains filtered or unexported fields
}

func (*Filter) Close

func (f *Filter) Close() error

Close the filter and allow no more events to pass through.

func (*Filter) Write

func (f *Filter) Write(event Event) error

Write an event to the filter.

type Matcher

Matcher matches events.

type Matcher interface {
    Match(event Event) bool
}

type MatcherFunc

MatcherFunc implements matcher with just a function.

type MatcherFunc func(event Event) bool

func (MatcherFunc) Match

func (fn MatcherFunc) Match(event Event) bool

Match calls the wrapped function.

type Queue

Queue accepts all messages into a queue for asynchronous consumption by a sink. It is unbounded and thread safe but the sink must be reliable or events will be dropped.

type Queue struct {
    // contains filtered or unexported fields
}

func NewQueue

func NewQueue(dst Sink) *Queue

NewQueue returns a queue to the provided Sink dst.

func (*Queue) Close

func (eq *Queue) Close() error

Close shutsdown the event queue, flushing

func (*Queue) Write

func (eq *Queue) Write(event Event) error

Write accepts the events into the queue, only failing if the queue has been closed.

type RetryStrategy

RetryStrategy defines a strategy for retrying event sink writes.

All methods should be goroutine safe.

type RetryStrategy interface {
    // Proceed is called before every event send. If proceed returns a
    // positive, non-zero integer, the retryer will back off by the provided
    // duration.
    //
    // An event is provided, by may be ignored.
    Proceed(event Event) time.Duration

    // Failure reports a failure to the strategy. If this method returns true,
    // the event should be dropped.
    Failure(event Event, err error) bool

    // Success should be called when an event is sent successfully.
    Success(event Event)
}

type RetryingSink

RetryingSink retries the write until success or an ErrSinkClosed is returned. Underlying sink must have p > 0 of succeeding or the sink will block. Retry is configured with a RetryStrategy. Concurrent calls to a retrying sink are serialized through the sink, meaning that if one is in-flight, another will not proceed.

type RetryingSink struct {
    // contains filtered or unexported fields
}

func NewRetryingSink

func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink

NewRetryingSink returns a sink that will retry writes to a sink, backing off on failure. Parameters threshold and backoff adjust the behavior of the circuit breaker.

func (*RetryingSink) Close

func (rs *RetryingSink) Close() error

Close closes the sink and the underlying sink.

func (*RetryingSink) String

func (rs *RetryingSink) String() string

func (*RetryingSink) Write

func (rs *RetryingSink) Write(event Event) error

Write attempts to flush the events to the downstream sink until it succeeds or the sink is closed.

type Sink

Sink accepts and sends events.

type Sink interface {
    // Write an event to the Sink. If no error is returned, the caller will
    // assume that all events have been committed to the sink. If an error is
    // received, the caller may retry sending the event.
    Write(event Event) error

    // Close the sink, possibly waiting for pending events to flush.
    Close() error
}

func NewFilter

func NewFilter(dst Sink, matcher Matcher) Sink

NewFilter returns a new filter that will send to events to dst that return true for Matcher.