...

Source file src/github.com/launchdarkly/eventsource/server.go

Documentation: github.com/launchdarkly/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"net/http"
     5  	"strings"
     6  	"sync"
     7  	"time"
     8  )
     9  
    10  type subscription struct {
    11  	channel     string
    12  	lastEventID string
    13  	out         chan<- eventOrComment
    14  }
    15  
    16  type eventOrComment interface{}
    17  
    18  type outbound struct {
    19  	channels       []string
    20  	eventOrComment eventOrComment
    21  	ackCh          chan<- struct{}
    22  }
    23  
    24  type registration struct {
    25  	channel    string
    26  	repository Repository
    27  }
    28  
    29  type unregistration struct {
    30  	channel         string
    31  	forceDisconnect bool
    32  }
    33  
    34  type comment struct {
    35  	value string
    36  }
    37  
    38  type eventBatch struct {
    39  	events <-chan Event
    40  }
    41  
    42  // Server manages any number of event-publishing channels and allows subscribers to consume them.
    43  // To use it within an HTTP server, create a handler for each channel with Handler().
    44  type Server struct {
    45  	AllowCORS       bool          // Enable all handlers to be accessible from any origin
    46  	ReplayAll       bool          // Replay repository even if there's no Last-Event-Id specified
    47  	BufferSize      int           // How many messages do we let the client get behind before disconnecting
    48  	Gzip            bool          // Enable compression if client can accept it
    49  	MaxConnTime     time.Duration // If non-zero, HTTP connections will be automatically closed after this time
    50  	Logger          Logger        // Logger is a logger that, when set, will be used for logging debug messages
    51  	registrations   chan *registration
    52  	unregistrations chan *unregistration
    53  	pub             chan *outbound
    54  	subs            chan *subscription
    55  	unsubs          chan *subscription
    56  	quit            chan bool
    57  	isClosed        bool
    58  	isClosedMutex   sync.RWMutex
    59  }
    60  
    61  // NewServer creates a new Server instance.
    62  func NewServer() *Server {
    63  	srv := &Server{
    64  		registrations:   make(chan *registration),
    65  		unregistrations: make(chan *unregistration),
    66  		pub:             make(chan *outbound),
    67  		subs:            make(chan *subscription),
    68  		unsubs:          make(chan *subscription, 2),
    69  		quit:            make(chan bool),
    70  		BufferSize:      128,
    71  	}
    72  	go srv.run()
    73  	return srv
    74  }
    75  
    76  // Close permanently shuts down the Server. It will no longer allow new subscriptions.
    77  func (srv *Server) Close() {
    78  	srv.quit <- true
    79  	srv.markServerClosed()
    80  }
    81  
    82  // Handler creates a new HTTP handler for serving a specified channel.
    83  //
    84  // The channel does not have to have been previously registered with Register, but if it has been, the
    85  // handler may replay events from the registered Repository depending on the setting of server.ReplayAll
    86  // and the Last-Event-Id header of the request.
    87  func (srv *Server) Handler(channel string) http.HandlerFunc {
    88  	return func(w http.ResponseWriter, req *http.Request) {
    89  		h := w.Header()
    90  		h.Set("Content-Type", "text/event-stream; charset=utf-8")
    91  		h.Set("Cache-Control", "no-cache, no-store, must-revalidate")
    92  		h.Set("Connection", "keep-alive")
    93  		if srv.AllowCORS {
    94  			h.Set("Access-Control-Allow-Origin", "*")
    95  		}
    96  		useGzip := srv.Gzip && strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")
    97  		if useGzip {
    98  			h.Set("Content-Encoding", "gzip")
    99  		}
   100  		w.WriteHeader(http.StatusOK)
   101  
   102  		// If the Handler is still active even though the server is closed, stop here.
   103  		// Otherwise the Handler will block while publishing to srv.subs indefinitely.
   104  		if srv.isServerClosed() {
   105  			return
   106  		}
   107  
   108  		var maxConnTimeCh <-chan time.Time
   109  		if srv.MaxConnTime > 0 {
   110  			t := time.NewTimer(srv.MaxConnTime)
   111  			defer t.Stop()
   112  			maxConnTimeCh = t.C
   113  		}
   114  
   115  		eventCh := make(chan eventOrComment, srv.BufferSize)
   116  		sub := &subscription{
   117  			channel:     channel,
   118  			lastEventID: req.Header.Get("Last-Event-ID"),
   119  			out:         eventCh,
   120  		}
   121  		srv.subs <- sub
   122  		flusher := w.(http.Flusher)
   123  		flusher.Flush()
   124  		enc := NewEncoder(w, useGzip)
   125  
   126  		writeEventOrComment := func(ec eventOrComment) bool {
   127  			if err := enc.Encode(ec); err != nil {
   128  				srv.unsubs <- sub
   129  				if srv.Logger != nil {
   130  					srv.Logger.Println(err)
   131  				}
   132  				return false // if this happens, we'll end the handler early because something's clearly broken
   133  			}
   134  			flusher.Flush()
   135  			return true
   136  		}
   137  
   138  		// The logic below works as follows:
   139  		// - Normally, the handler is reading from eventCh. Server.run() accesses this channel through sub.out
   140  		//   and sends published events to it.
   141  		// - However, if a Repository is being used, the Server might get a whole batch of events that the
   142  		//   Repository provides through its Replay method. The Repository provides these in the form of a
   143  		//   channel that it writes to. Since we don't know how many events there will be or how long it will
   144  		//   take to write them, we do not want to block Server.run() for this.
   145  		// - Previous implementations of sending events from Replay used a separate goroutine. That was unsafe,
   146  		//   due to a race condition where Server.run() might close the channel while the Replay goroutine is
   147  		//   still writing to it.
   148  		// - So, instead, Server.run() now takes the channel from Replay and wraps it in an eventBatch. When
   149  		//   the handler sees an eventBatch, it switches over to reading events from that channel until the
   150  		//   channel is closed. Then it switches back to reading events from the regular channel.
   151  		// - The Server can close eventCh at any time to indicate that the stream is done. The handler exits.
   152  		// - If the client closes the connection, or if MaxConnTime elapses, the handler exits after telling
   153  		//   the Server to stop publishing events to it.
   154  
   155  		var readMainCh <-chan eventOrComment = eventCh
   156  		var readBatchCh <-chan Event
   157  		closedNormally := false
   158  		closeNotify := req.Context().Done()
   159  
   160  	ReadLoop:
   161  		for {
   162  			select {
   163  			case <-closeNotify:
   164  				break ReadLoop
   165  			case <-maxConnTimeCh: // if MaxConnTime was not set, this is a nil channel and has no effect on the select
   166  				break ReadLoop
   167  			case ev, ok := <-readMainCh:
   168  				if !ok {
   169  					closedNormally = true
   170  					break ReadLoop
   171  				}
   172  				if batch, ok := ev.(eventBatch); ok {
   173  					readBatchCh = batch.events
   174  					readMainCh = nil
   175  				} else if !writeEventOrComment(ev) {
   176  					break ReadLoop
   177  				}
   178  			case ev, ok := <-readBatchCh:
   179  				if !ok { // end of batch
   180  					readBatchCh = nil
   181  					readMainCh = eventCh
   182  				} else if !writeEventOrComment(ev) {
   183  					break ReadLoop
   184  				}
   185  			}
   186  		}
   187  		if !closedNormally {
   188  			srv.unsubs <- sub // the server didn't tell us to close, so we must tell it that we're closing
   189  		}
   190  	}
   191  }
   192  
   193  // Register registers a Repository to be used for the specified channel. The Repository will be used to
   194  // determine whether new subscribers should receive data that was generated before they subscribed.
   195  //
   196  // Channels do not have to be registered unless you want to specify a Repository. An unregistered channel can
   197  // still be subscribed to with Handler, and published to with Publish.
   198  func (srv *Server) Register(channel string, repo Repository) {
   199  	srv.registrations <- &registration{
   200  		channel:    channel,
   201  		repository: repo,
   202  	}
   203  }
   204  
   205  // Unregister removes a channel registration that was created by Register. If forceDisconnect is true, it also
   206  // causes all currently active handlers for that channel to close their connections. If forceDisconnect is false,
   207  // those connections will remain open until closed by their clients but will not receive any more events.
   208  //
   209  // This will not prevent creating new channel subscriptions for the same channel with Handler, or publishing
   210  // events to that channel with Publish. It is the caller's responsibility to avoid using channels that are no
   211  // longer supposed to be used.
   212  func (srv *Server) Unregister(channel string, forceDisconnect bool) {
   213  	srv.unregistrations <- &unregistration{
   214  		channel:         channel,
   215  		forceDisconnect: forceDisconnect,
   216  	}
   217  }
   218  
   219  // Publish publishes an event to one or more channels.
   220  func (srv *Server) Publish(channels []string, ev Event) {
   221  	srv.pub <- &outbound{
   222  		channels:       channels,
   223  		eventOrComment: ev,
   224  	}
   225  }
   226  
   227  // PublishWithAcknowledgment publishes an event to one or more channels, returning a channel that will receive
   228  // a value after the event has been processed by the server.
   229  //
   230  // This can be used to ensure a well-defined ordering of operations. Since each Server method is handled
   231  // asynchronously via a separate channel, if you call server.Publish and then immediately call server.Close,
   232  // there is no guarantee that the server execute the Close operation only after the event has been published.
   233  // If you instead call PublishWithAcknowledgement, and then read from the returned channel before calling
   234  // Close, you can be sure that the event was published before the server was closed.
   235  func (srv *Server) PublishWithAcknowledgment(channels []string, ev Event) <-chan struct{} {
   236  	ackCh := make(chan struct{}, 1)
   237  	srv.pub <- &outbound{
   238  		channels:       channels,
   239  		eventOrComment: ev,
   240  		ackCh:          ackCh,
   241  	}
   242  	return ackCh
   243  }
   244  
   245  // PublishComment publishes a comment to one or more channels.
   246  func (srv *Server) PublishComment(channels []string, text string) {
   247  	srv.pub <- &outbound{
   248  		channels:       channels,
   249  		eventOrComment: comment{value: text},
   250  	}
   251  }
   252  
   253  func (srv *Server) run() {
   254  	// All access to the subs and repos maps is done from the same goroutine, so modifications are safe.
   255  	subs := make(map[string]map[*subscription]struct{})
   256  	repos := make(map[string]Repository)
   257  	trySend := func(sub *subscription, ec eventOrComment) {
   258  		if !sub.send(ec) {
   259  			sub.close()
   260  			delete(subs[sub.channel], sub)
   261  		}
   262  	}
   263  	for {
   264  		select {
   265  		case reg := <-srv.registrations:
   266  			repos[reg.channel] = reg.repository
   267  		case unreg := <-srv.unregistrations:
   268  			delete(repos, unreg.channel)
   269  			previousSubs := subs[unreg.channel]
   270  			delete(subs, unreg.channel)
   271  			if unreg.forceDisconnect {
   272  				for s := range previousSubs {
   273  					s.close()
   274  				}
   275  			}
   276  		case sub := <-srv.unsubs:
   277  			delete(subs[sub.channel], sub)
   278  		case pub := <-srv.pub:
   279  			for _, c := range pub.channels {
   280  				for s := range subs[c] {
   281  					trySend(s, pub.eventOrComment)
   282  				}
   283  			}
   284  			if pub.ackCh != nil {
   285  				select {
   286  				// It shouldn't be possible for this channel to block since it is created for a single use, but
   287  				// we'll do a non-blocking push just to be safe
   288  				case pub.ackCh <- struct{}{}:
   289  				default:
   290  				}
   291  			}
   292  		case sub := <-srv.subs:
   293  			if _, ok := subs[sub.channel]; !ok {
   294  				subs[sub.channel] = make(map[*subscription]struct{})
   295  			}
   296  			subs[sub.channel][sub] = struct{}{}
   297  			if srv.ReplayAll || len(sub.lastEventID) > 0 {
   298  				repo, ok := repos[sub.channel]
   299  				if ok {
   300  					batchCh := repo.Replay(sub.channel, sub.lastEventID)
   301  					if batchCh != nil {
   302  						trySend(sub, eventBatch{events: batchCh})
   303  					}
   304  				}
   305  			}
   306  		case <-srv.quit:
   307  			for _, sub := range subs {
   308  				for s := range sub {
   309  					s.close()
   310  				}
   311  			}
   312  			return
   313  		}
   314  	}
   315  }
   316  
   317  func (srv *Server) isServerClosed() bool {
   318  	srv.isClosedMutex.RLock()
   319  	defer srv.isClosedMutex.RUnlock()
   320  	return srv.isClosed
   321  }
   322  
   323  func (srv *Server) markServerClosed() {
   324  	srv.isClosedMutex.Lock()
   325  	defer srv.isClosedMutex.Unlock()
   326  	srv.isClosed = true
   327  }
   328  
   329  // Attempts to send an event or comment to the subscription's channel.
   330  //
   331  // We do not want to block the main Server goroutine, so this is a non-blocking send. If it fails,
   332  // we return false to tell the Server that the subscriber has fallen behind and should be removed;
   333  // we also immediately close the channel in that case. If the send succeeds-- or if we didn't need
   334  // to attempt a send, because the channel was already closed-- we return true.
   335  //
   336  // This should be called only from the Server.run() goroutine.
   337  func (s *subscription) send(e eventOrComment) bool {
   338  	if s.out == nil {
   339  		return true
   340  	}
   341  	select {
   342  	case s.out <- e:
   343  		return true
   344  	default:
   345  		s.close()
   346  		return false
   347  	}
   348  }
   349  
   350  // Closes a subscription's channel and sets it to nil.
   351  //
   352  // This should be called only from the Server.run() goroutine.
   353  func (s *subscription) close() {
   354  	close(s.out)
   355  	s.out = nil
   356  }
   357  

View as plain text