...

Source file src/github.com/docker/go-events/broadcast.go

Documentation: github.com/docker/go-events

     1  package events
     2  
     3  import (
     4  	"fmt"
     5  	"sync"
     6  
     7  	"github.com/sirupsen/logrus"
     8  )
     9  
    10  // Broadcaster sends events to multiple, reliable Sinks. The goal of this
    11  // component is to dispatch events to configured endpoints. Reliability can be
    12  // provided by wrapping incoming sinks.
    13  type Broadcaster struct {
    14  	sinks   []Sink
    15  	events  chan Event
    16  	adds    chan configureRequest
    17  	removes chan configureRequest
    18  
    19  	shutdown chan struct{}
    20  	closed   chan struct{}
    21  	once     sync.Once
    22  }
    23  
    24  // NewBroadcaster appends one or more sinks to the list of sinks. The
    25  // broadcaster behavior will be affected by the properties of the sink.
    26  // Generally, the sink should accept all messages and deal with reliability on
    27  // its own. Use of EventQueue and RetryingSink should be used here.
    28  func NewBroadcaster(sinks ...Sink) *Broadcaster {
    29  	b := Broadcaster{
    30  		sinks:    sinks,
    31  		events:   make(chan Event),
    32  		adds:     make(chan configureRequest),
    33  		removes:  make(chan configureRequest),
    34  		shutdown: make(chan struct{}),
    35  		closed:   make(chan struct{}),
    36  	}
    37  
    38  	// Start the broadcaster
    39  	go b.run()
    40  
    41  	return &b
    42  }
    43  
    44  // Write accepts an event to be dispatched to all sinks. This method will never
    45  // fail and should never block (hopefully!). The caller cedes the memory to the
    46  // broadcaster and should not modify it after calling write.
    47  func (b *Broadcaster) Write(event Event) error {
    48  	select {
    49  	case b.events <- event:
    50  	case <-b.closed:
    51  		return ErrSinkClosed
    52  	}
    53  	return nil
    54  }
    55  
    56  // Add the sink to the broadcaster.
    57  //
    58  // The provided sink must be comparable with equality. Typically, this just
    59  // works with a regular pointer type.
    60  func (b *Broadcaster) Add(sink Sink) error {
    61  	return b.configure(b.adds, sink)
    62  }
    63  
    64  // Remove the provided sink.
    65  func (b *Broadcaster) Remove(sink Sink) error {
    66  	return b.configure(b.removes, sink)
    67  }
    68  
    69  type configureRequest struct {
    70  	sink     Sink
    71  	response chan error
    72  }
    73  
    74  func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
    75  	response := make(chan error, 1)
    76  
    77  	for {
    78  		select {
    79  		case ch <- configureRequest{
    80  			sink:     sink,
    81  			response: response}:
    82  			ch = nil
    83  		case err := <-response:
    84  			return err
    85  		case <-b.closed:
    86  			return ErrSinkClosed
    87  		}
    88  	}
    89  }
    90  
    91  // Close the broadcaster, ensuring that all messages are flushed to the
    92  // underlying sink before returning.
    93  func (b *Broadcaster) Close() error {
    94  	b.once.Do(func() {
    95  		close(b.shutdown)
    96  	})
    97  
    98  	<-b.closed
    99  	return nil
   100  }
   101  
   102  // run is the main broadcast loop, started when the broadcaster is created.
   103  // Under normal conditions, it waits for events on the event channel. After
   104  // Close is called, this goroutine will exit.
   105  func (b *Broadcaster) run() {
   106  	defer close(b.closed)
   107  	remove := func(target Sink) {
   108  		for i, sink := range b.sinks {
   109  			if sink == target {
   110  				b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
   111  				break
   112  			}
   113  		}
   114  	}
   115  
   116  	for {
   117  		select {
   118  		case event := <-b.events:
   119  			for _, sink := range b.sinks {
   120  				if err := sink.Write(event); err != nil {
   121  					if err == ErrSinkClosed {
   122  						// remove closed sinks
   123  						remove(sink)
   124  						continue
   125  					}
   126  					logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
   127  						Errorf("broadcaster: dropping event")
   128  				}
   129  			}
   130  		case request := <-b.adds:
   131  			// while we have to iterate for add/remove, common iteration for
   132  			// send is faster against slice.
   133  
   134  			var found bool
   135  			for _, sink := range b.sinks {
   136  				if request.sink == sink {
   137  					found = true
   138  					break
   139  				}
   140  			}
   141  
   142  			if !found {
   143  				b.sinks = append(b.sinks, request.sink)
   144  			}
   145  			// b.sinks[request.sink] = struct{}{}
   146  			request.response <- nil
   147  		case request := <-b.removes:
   148  			remove(request.sink)
   149  			request.response <- nil
   150  		case <-b.shutdown:
   151  			// close all the underlying sinks
   152  			for _, sink := range b.sinks {
   153  				if err := sink.Close(); err != nil && err != ErrSinkClosed {
   154  					logrus.WithField("events.sink", sink).WithError(err).
   155  						Errorf("broadcaster: closing sink failed")
   156  				}
   157  			}
   158  			return
   159  		}
   160  	}
   161  }
   162  
   163  func (b *Broadcaster) String() string {
   164  	// Serialize copy of this broadcaster without the sync.Once, to avoid
   165  	// a data race.
   166  
   167  	b2 := map[string]interface{}{
   168  		"sinks":   b.sinks,
   169  		"events":  b.events,
   170  		"adds":    b.adds,
   171  		"removes": b.removes,
   172  
   173  		"shutdown": b.shutdown,
   174  		"closed":   b.closed,
   175  	}
   176  
   177  	return fmt.Sprint(b2)
   178  }
   179  

View as plain text