...

Source file src/github.com/docker/go-events/queue.go

Documentation: github.com/docker/go-events

     1  package events
     2  
     3  import (
     4  	"container/list"
     5  	"sync"
     6  
     7  	"github.com/sirupsen/logrus"
     8  )
     9  
    10  // Queue accepts all messages into a queue for asynchronous consumption
    11  // by a sink. It is unbounded and thread safe but the sink must be reliable or
    12  // events will be dropped.
    13  type Queue struct {
    14  	dst    Sink
    15  	events *list.List
    16  	cond   *sync.Cond
    17  	mu     sync.Mutex
    18  	closed bool
    19  }
    20  
    21  // NewQueue returns a queue to the provided Sink dst.
    22  func NewQueue(dst Sink) *Queue {
    23  	eq := Queue{
    24  		dst:    dst,
    25  		events: list.New(),
    26  	}
    27  
    28  	eq.cond = sync.NewCond(&eq.mu)
    29  	go eq.run()
    30  	return &eq
    31  }
    32  
    33  // Write accepts the events into the queue, only failing if the queue has
    34  // been closed.
    35  func (eq *Queue) Write(event Event) error {
    36  	eq.mu.Lock()
    37  	defer eq.mu.Unlock()
    38  
    39  	if eq.closed {
    40  		return ErrSinkClosed
    41  	}
    42  
    43  	eq.events.PushBack(event)
    44  	eq.cond.Signal() // signal waiters
    45  
    46  	return nil
    47  }
    48  
    49  // Close shutsdown the event queue, flushing
    50  func (eq *Queue) Close() error {
    51  	eq.mu.Lock()
    52  	defer eq.mu.Unlock()
    53  
    54  	if eq.closed {
    55  		return nil
    56  	}
    57  
    58  	// set closed flag
    59  	eq.closed = true
    60  	eq.cond.Signal() // signal flushes queue
    61  	eq.cond.Wait()   // wait for signal from last flush
    62  	return eq.dst.Close()
    63  }
    64  
    65  // run is the main goroutine to flush events to the target sink.
    66  func (eq *Queue) run() {
    67  	for {
    68  		event := eq.next()
    69  
    70  		if event == nil {
    71  			return // nil block means event queue is closed.
    72  		}
    73  
    74  		if err := eq.dst.Write(event); err != nil {
    75  			// TODO(aaronl): Dropping events could be bad depending
    76  			// on the application. We should have a way of
    77  			// communicating this condition. However, logging
    78  			// at a log level above debug may not be appropriate.
    79  			// Eventually, go-events should not use logrus at all,
    80  			// and should bubble up conditions like this through
    81  			// error values.
    82  			logrus.WithFields(logrus.Fields{
    83  				"event": event,
    84  				"sink":  eq.dst,
    85  			}).WithError(err).Debug("eventqueue: dropped event")
    86  		}
    87  	}
    88  }
    89  
    90  // next encompasses the critical section of the run loop. When the queue is
    91  // empty, it will block on the condition. If new data arrives, it will wake
    92  // and return a block. When closed, a nil slice will be returned.
    93  func (eq *Queue) next() Event {
    94  	eq.mu.Lock()
    95  	defer eq.mu.Unlock()
    96  
    97  	for eq.events.Len() < 1 {
    98  		if eq.closed {
    99  			eq.cond.Broadcast()
   100  			return nil
   101  		}
   102  
   103  		eq.cond.Wait()
   104  	}
   105  
   106  	front := eq.events.Front()
   107  	block := front.Value.(Event)
   108  	eq.events.Remove(front)
   109  
   110  	return block
   111  }
   112  

View as plain text