...

Source file src/github.com/donovanhide/eventsource/server.go

Documentation: github.com/donovanhide/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"log"
     5  	"net/http"
     6  	"strings"
     7  	"sync"
     8  )
     9  
    10  type subscription struct {
    11  	channel     string
    12  	lastEventId string
    13  	out         chan Event
    14  }
    15  
    16  type outbound struct {
    17  	channels []string
    18  	event    Event
    19  }
    20  type registration struct {
    21  	channel    string
    22  	repository Repository
    23  }
    24  
    25  type Server struct {
    26  	AllowCORS     bool        // Enable all handlers to be accessible from any origin
    27  	ReplayAll     bool        // Replay repository even if there's no Last-Event-Id specified
    28  	BufferSize    int         // How many messages do we let the client get behind before disconnecting
    29  	Gzip          bool        // Enable compression if client can accept it
    30  	Logger        *log.Logger // Logger is a logger that, when set, will be used for logging debug messages
    31  	registrations chan *registration
    32  	pub           chan *outbound
    33  	subs          chan *subscription
    34  	unregister    chan *subscription
    35  	quit          chan bool
    36  	isClosed      bool
    37  	isClosedMutex sync.RWMutex
    38  }
    39  
    40  // Create a new Server ready for handler creation and publishing events
    41  func NewServer() *Server {
    42  	srv := &Server{
    43  		registrations: make(chan *registration),
    44  		pub:           make(chan *outbound),
    45  		subs:          make(chan *subscription),
    46  		unregister:    make(chan *subscription, 2),
    47  		quit:          make(chan bool),
    48  		BufferSize:    128,
    49  	}
    50  	go srv.run()
    51  	return srv
    52  }
    53  
    54  // Stop handling publishing
    55  func (srv *Server) Close() {
    56  	srv.quit <- true
    57  	srv.markServerClosed()
    58  }
    59  
    60  // Create a new handler for serving a specified channel
    61  func (srv *Server) Handler(channel string) http.HandlerFunc {
    62  	return func(w http.ResponseWriter, req *http.Request) {
    63  		h := w.Header()
    64  		h.Set("Content-Type", "text/event-stream; charset=utf-8")
    65  		h.Set("Cache-Control", "no-cache, no-store, must-revalidate")
    66  		h.Set("Connection", "keep-alive")
    67  		if srv.AllowCORS {
    68  			h.Set("Access-Control-Allow-Origin", "*")
    69  		}
    70  		useGzip := srv.Gzip && strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")
    71  		if useGzip {
    72  			h.Set("Content-Encoding", "gzip")
    73  		}
    74  		w.WriteHeader(http.StatusOK)
    75  
    76  		// If the Handler is still active even though the server is closed, stop here.
    77  		// Otherwise the Handler will block while publishing to srv.subs indefinitely.
    78  		if srv.isServerClosed() {
    79  			return
    80  		}
    81  
    82  		sub := &subscription{
    83  			channel:     channel,
    84  			lastEventId: req.Header.Get("Last-Event-ID"),
    85  			out:         make(chan Event, srv.BufferSize),
    86  		}
    87  		srv.subs <- sub
    88  		flusher := w.(http.Flusher)
    89  		notifier := w.(http.CloseNotifier)
    90  		flusher.Flush()
    91  		enc := NewEncoder(w, useGzip)
    92  		for {
    93  			select {
    94  			case <-notifier.CloseNotify():
    95  				srv.unregister <- sub
    96  				return
    97  			case ev, ok := <-sub.out:
    98  				if !ok {
    99  					return
   100  				}
   101  				if err := enc.Encode(ev); err != nil {
   102  					srv.unregister <- sub
   103  					if srv.Logger != nil {
   104  						srv.Logger.Println(err)
   105  					}
   106  					return
   107  				}
   108  				flusher.Flush()
   109  			}
   110  		}
   111  	}
   112  }
   113  
   114  // Register the repository to be used for the specified channel
   115  func (srv *Server) Register(channel string, repo Repository) {
   116  	srv.registrations <- &registration{
   117  		channel:    channel,
   118  		repository: repo,
   119  	}
   120  }
   121  
   122  // Publish an event with the specified id to one or more channels
   123  func (srv *Server) Publish(channels []string, ev Event) {
   124  	srv.pub <- &outbound{
   125  		channels: channels,
   126  		event:    ev,
   127  	}
   128  }
   129  
   130  func replay(repo Repository, sub *subscription) {
   131  	for ev := range repo.Replay(sub.channel, sub.lastEventId) {
   132  		sub.out <- ev
   133  	}
   134  }
   135  
   136  func (srv *Server) run() {
   137  	subs := make(map[string]map[*subscription]struct{})
   138  	repos := make(map[string]Repository)
   139  	for {
   140  		select {
   141  		case reg := <-srv.registrations:
   142  			repos[reg.channel] = reg.repository
   143  		case sub := <-srv.unregister:
   144  			delete(subs[sub.channel], sub)
   145  		case pub := <-srv.pub:
   146  			for _, c := range pub.channels {
   147  				for s := range subs[c] {
   148  					select {
   149  					case s.out <- pub.event:
   150  					default:
   151  						srv.unregister <- s
   152  						close(s.out)
   153  					}
   154  
   155  				}
   156  			}
   157  		case sub := <-srv.subs:
   158  			if _, ok := subs[sub.channel]; !ok {
   159  				subs[sub.channel] = make(map[*subscription]struct{})
   160  			}
   161  			subs[sub.channel][sub] = struct{}{}
   162  			if srv.ReplayAll || len(sub.lastEventId) > 0 {
   163  				repo, ok := repos[sub.channel]
   164  				if ok {
   165  					go replay(repo, sub)
   166  				}
   167  			}
   168  		case <-srv.quit:
   169  			for _, sub := range subs {
   170  				for s := range sub {
   171  					close(s.out)
   172  				}
   173  			}
   174  			return
   175  		}
   176  	}
   177  }
   178  
   179  func (srv *Server) isServerClosed() bool {
   180  	srv.isClosedMutex.RLock()
   181  	defer srv.isClosedMutex.RUnlock()
   182  	return srv.isClosed
   183  }
   184  
   185  func (srv *Server) markServerClosed() {
   186  	srv.isClosedMutex.Lock()
   187  	defer srv.isClosedMutex.Unlock()
   188  	srv.isClosed = true
   189  }
   190  

View as plain text