...

Source file src/github.com/docker/distribution/notifications/sinks.go

Documentation: github.com/docker/distribution/notifications

     1  package notifications
     2  
     3  import (
     4  	"container/list"
     5  	"fmt"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/sirupsen/logrus"
    10  )
    11  
    12  // NOTE(stevvooe): This file contains definitions for several utility sinks.
    13  // Typically, the broadcaster is the only sink that should be required
    14  // externally, but others are suitable for export if the need arises. Albeit,
    15  // the tight integration with endpoint metrics should be removed.
    16  
    17  // Broadcaster sends events to multiple, reliable Sinks. The goal of this
    18  // component is to dispatch events to configured endpoints. Reliability can be
    19  // provided by wrapping incoming sinks.
    20  type Broadcaster struct {
    21  	sinks  []Sink
    22  	events chan []Event
    23  	closed chan chan struct{}
    24  }
    25  
    26  // NewBroadcaster ...
    27  // Add appends one or more sinks to the list of sinks. The broadcaster
    28  // behavior will be affected by the properties of the sink. Generally, the
    29  // sink should accept all messages and deal with reliability on its own. Use
    30  // of EventQueue and RetryingSink should be used here.
    31  func NewBroadcaster(sinks ...Sink) *Broadcaster {
    32  	b := Broadcaster{
    33  		sinks:  sinks,
    34  		events: make(chan []Event),
    35  		closed: make(chan chan struct{}),
    36  	}
    37  
    38  	// Start the broadcaster
    39  	go b.run()
    40  
    41  	return &b
    42  }
    43  
    44  // Write accepts a block of events to be dispatched to all sinks. This method
    45  // will never fail and should never block (hopefully!). The caller cedes the
    46  // slice memory to the broadcaster and should not modify it after calling
    47  // write.
    48  func (b *Broadcaster) Write(events ...Event) error {
    49  	select {
    50  	case b.events <- events:
    51  	case <-b.closed:
    52  		return ErrSinkClosed
    53  	}
    54  	return nil
    55  }
    56  
    57  // Close the broadcaster, ensuring that all messages are flushed to the
    58  // underlying sink before returning.
    59  func (b *Broadcaster) Close() error {
    60  	logrus.Infof("broadcaster: closing")
    61  	select {
    62  	case <-b.closed:
    63  		// already closed
    64  		return fmt.Errorf("broadcaster: already closed")
    65  	default:
    66  		// do a little chan handoff dance to synchronize closing
    67  		closed := make(chan struct{})
    68  		b.closed <- closed
    69  		close(b.closed)
    70  		<-closed
    71  		return nil
    72  	}
    73  }
    74  
    75  // run is the main broadcast loop, started when the broadcaster is created.
    76  // Under normal conditions, it waits for events on the event channel. After
    77  // Close is called, this goroutine will exit.
    78  func (b *Broadcaster) run() {
    79  	for {
    80  		select {
    81  		case block := <-b.events:
    82  			for _, sink := range b.sinks {
    83  				if err := sink.Write(block...); err != nil {
    84  					logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
    85  				}
    86  			}
    87  		case closing := <-b.closed:
    88  
    89  			// close all the underlying sinks
    90  			for _, sink := range b.sinks {
    91  				if err := sink.Close(); err != nil {
    92  					logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
    93  				}
    94  			}
    95  			closing <- struct{}{}
    96  
    97  			logrus.Debugf("broadcaster: closed")
    98  			return
    99  		}
   100  	}
   101  }
   102  
   103  // eventQueue accepts all messages into a queue for asynchronous consumption
   104  // by a sink. It is unbounded and thread safe but the sink must be reliable or
   105  // events will be dropped.
   106  type eventQueue struct {
   107  	sink      Sink
   108  	events    *list.List
   109  	listeners []eventQueueListener
   110  	cond      *sync.Cond
   111  	mu        sync.Mutex
   112  	closed    bool
   113  }
   114  
   115  // eventQueueListener is called when various events happen on the queue.
   116  type eventQueueListener interface {
   117  	ingress(events ...Event)
   118  	egress(events ...Event)
   119  }
   120  
   121  // newEventQueue returns a queue to the provided sink. If the updater is non-
   122  // nil, it will be called to update pending metrics on ingress and egress.
   123  func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
   124  	eq := eventQueue{
   125  		sink:      sink,
   126  		events:    list.New(),
   127  		listeners: listeners,
   128  	}
   129  
   130  	eq.cond = sync.NewCond(&eq.mu)
   131  	go eq.run()
   132  	return &eq
   133  }
   134  
   135  // Write accepts the events into the queue, only failing if the queue has
   136  // beend closed.
   137  func (eq *eventQueue) Write(events ...Event) error {
   138  	eq.mu.Lock()
   139  	defer eq.mu.Unlock()
   140  
   141  	if eq.closed {
   142  		return ErrSinkClosed
   143  	}
   144  
   145  	for _, listener := range eq.listeners {
   146  		listener.ingress(events...)
   147  	}
   148  	eq.events.PushBack(events)
   149  	eq.cond.Signal() // signal waiters
   150  
   151  	return nil
   152  }
   153  
   154  // Close shuts down the event queue, flushing
   155  func (eq *eventQueue) Close() error {
   156  	eq.mu.Lock()
   157  	defer eq.mu.Unlock()
   158  
   159  	if eq.closed {
   160  		return fmt.Errorf("eventqueue: already closed")
   161  	}
   162  
   163  	// set closed flag
   164  	eq.closed = true
   165  	eq.cond.Signal() // signal flushes queue
   166  	eq.cond.Wait()   // wait for signal from last flush
   167  
   168  	return eq.sink.Close()
   169  }
   170  
   171  // run is the main goroutine to flush events to the target sink.
   172  func (eq *eventQueue) run() {
   173  	for {
   174  		block := eq.next()
   175  
   176  		if block == nil {
   177  			return // nil block means event queue is closed.
   178  		}
   179  
   180  		if err := eq.sink.Write(block...); err != nil {
   181  			logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
   182  		}
   183  
   184  		for _, listener := range eq.listeners {
   185  			listener.egress(block...)
   186  		}
   187  	}
   188  }
   189  
   190  // next encompasses the critical section of the run loop. When the queue is
   191  // empty, it will block on the condition. If new data arrives, it will wake
   192  // and return a block. When closed, a nil slice will be returned.
   193  func (eq *eventQueue) next() []Event {
   194  	eq.mu.Lock()
   195  	defer eq.mu.Unlock()
   196  
   197  	for eq.events.Len() < 1 {
   198  		if eq.closed {
   199  			eq.cond.Broadcast()
   200  			return nil
   201  		}
   202  
   203  		eq.cond.Wait()
   204  	}
   205  
   206  	front := eq.events.Front()
   207  	block := front.Value.([]Event)
   208  	eq.events.Remove(front)
   209  
   210  	return block
   211  }
   212  
   213  // ignoredSink discards events with ignored target media types and actions.
   214  // passes the rest along.
   215  type ignoredSink struct {
   216  	Sink
   217  	ignoreMediaTypes map[string]bool
   218  	ignoreActions    map[string]bool
   219  }
   220  
   221  func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink {
   222  	if len(ignored) == 0 {
   223  		return sink
   224  	}
   225  
   226  	ignoredMap := make(map[string]bool)
   227  	for _, mediaType := range ignored {
   228  		ignoredMap[mediaType] = true
   229  	}
   230  
   231  	ignoredActionsMap := make(map[string]bool)
   232  	for _, action := range ignoreActions {
   233  		ignoredActionsMap[action] = true
   234  	}
   235  
   236  	return &ignoredSink{
   237  		Sink:             sink,
   238  		ignoreMediaTypes: ignoredMap,
   239  		ignoreActions:    ignoredActionsMap,
   240  	}
   241  }
   242  
   243  // Write discards events with ignored target media types and passes the rest
   244  // along.
   245  func (imts *ignoredSink) Write(events ...Event) error {
   246  	var kept []Event
   247  	for _, e := range events {
   248  		if !imts.ignoreMediaTypes[e.Target.MediaType] {
   249  			kept = append(kept, e)
   250  		}
   251  	}
   252  	if len(kept) == 0 {
   253  		return nil
   254  	}
   255  
   256  	var results []Event
   257  	for _, e := range kept {
   258  		if !imts.ignoreActions[e.Action] {
   259  			results = append(results, e)
   260  		}
   261  	}
   262  	if len(results) == 0 {
   263  		return nil
   264  	}
   265  	return imts.Sink.Write(results...)
   266  }
   267  
   268  // retryingSink retries the write until success or an ErrSinkClosed is
   269  // returned. Underlying sink must have p > 0 of succeeding or the sink will
   270  // block. Internally, it is a circuit breaker retries to manage reset.
   271  // Concurrent calls to a retrying sink are serialized through the sink,
   272  // meaning that if one is in-flight, another will not proceed.
   273  type retryingSink struct {
   274  	mu     sync.Mutex
   275  	sink   Sink
   276  	closed bool
   277  
   278  	// circuit breaker heuristics
   279  	failures struct {
   280  		threshold int
   281  		recent    int
   282  		last      time.Time
   283  		backoff   time.Duration // time after which we retry after failure.
   284  	}
   285  }
   286  
   287  // TODO(stevvooe): We are using circuit break here, which actually doesn't
   288  // make a whole lot of sense for this use case, since we always retry. Move
   289  // this to use bounded exponential backoff.
   290  
   291  // newRetryingSink returns a sink that will retry writes to a sink, backing
   292  // off on failure. Parameters threshold and backoff adjust the behavior of the
   293  // circuit breaker.
   294  func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
   295  	rs := &retryingSink{
   296  		sink: sink,
   297  	}
   298  	rs.failures.threshold = threshold
   299  	rs.failures.backoff = backoff
   300  
   301  	return rs
   302  }
   303  
   304  // Write attempts to flush the events to the downstream sink until it succeeds
   305  // or the sink is closed.
   306  func (rs *retryingSink) Write(events ...Event) error {
   307  	rs.mu.Lock()
   308  	defer rs.mu.Unlock()
   309  
   310  retry:
   311  
   312  	if rs.closed {
   313  		return ErrSinkClosed
   314  	}
   315  
   316  	if !rs.proceed() {
   317  		logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
   318  		rs.wait(rs.failures.backoff)
   319  		goto retry
   320  	}
   321  
   322  	if err := rs.write(events...); err != nil {
   323  		if err == ErrSinkClosed {
   324  			// terminal!
   325  			return err
   326  		}
   327  
   328  		logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
   329  		goto retry
   330  	}
   331  
   332  	return nil
   333  }
   334  
   335  // Close closes the sink and the underlying sink.
   336  func (rs *retryingSink) Close() error {
   337  	rs.mu.Lock()
   338  	defer rs.mu.Unlock()
   339  
   340  	if rs.closed {
   341  		return fmt.Errorf("retryingsink: already closed")
   342  	}
   343  
   344  	rs.closed = true
   345  	return rs.sink.Close()
   346  }
   347  
   348  // write provides a helper that dispatches failure and success properly. Used
   349  // by write as the single-flight write call.
   350  func (rs *retryingSink) write(events ...Event) error {
   351  	if err := rs.sink.Write(events...); err != nil {
   352  		rs.failure()
   353  		return err
   354  	}
   355  
   356  	rs.reset()
   357  	return nil
   358  }
   359  
   360  // wait backoff time against the sink, unlocking so others can proceed. Should
   361  // only be called by methods that currently have the mutex.
   362  func (rs *retryingSink) wait(backoff time.Duration) {
   363  	rs.mu.Unlock()
   364  	defer rs.mu.Lock()
   365  
   366  	// backoff here
   367  	time.Sleep(backoff)
   368  }
   369  
   370  // reset marks a successful call.
   371  func (rs *retryingSink) reset() {
   372  	rs.failures.recent = 0
   373  	rs.failures.last = time.Time{}
   374  }
   375  
   376  // failure records a failure.
   377  func (rs *retryingSink) failure() {
   378  	rs.failures.recent++
   379  	rs.failures.last = time.Now().UTC()
   380  }
   381  
   382  // proceed returns true if the call should proceed based on circuit breaker
   383  // heuristics.
   384  func (rs *retryingSink) proceed() bool {
   385  	return rs.failures.recent < rs.failures.threshold ||
   386  		time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
   387  }
   388  

View as plain text