A Decoder is capable of reading Events from a stream.
type Decoder struct { *bufio.Reader }
func NewDecoder(r io.Reader) *Decoder
NewDecoder returns a new Decoder instance that reads events with the given io.Reader.
func (dec *Decoder) Decode() (Event, error)
Decode reads the next Event from a stream (and will block until one comes in). Graceful disconnects (between events) are indicated by an io.EOF error. Any error occuring mid-event is considered non-graceful and will show up as some other error (most likely io.ErrUnexpectedEOF).
An Encoder is capable of writing Events to a stream. Optionally Events can be gzip compressed in this process.
type Encoder struct {
// contains filtered or unexported fields
}
func NewEncoder(w io.Writer, compressed bool) *Encoder
NewEncoder returns an Encoder for a given io.Writer. When compressed is set to true, a gzip writer will be created.
func (enc *Encoder) Encode(ev Event) error
Encode writes an event in the format specified by the server-sent events protocol.
Any event received by the client or sent by the server will implement this interface
type Event interface { // Id is an identifier that can be used to allow a client to replay // missed Events by returning the Last-Event-Id header. // Return empty string if not required. Id() string // The name of the event. Return empty string if not required. Event() string // The payload of the event. Data() string }
▹ Example
If history is required, this interface will allow clients to reply previous events through the server. Both methods can be called from different goroutines concurrently, so you must make sure they are go-routine safe.
type Repository interface { // Gets the Events which should follow on from the specified channel and event id. Replay(channel, id string) chan Event }
▹ Example
type Server struct { AllowCORS bool // Enable all handlers to be accessible from any origin ReplayAll bool // Replay repository even if there's no Last-Event-Id specified BufferSize int // How many messages do we let the client get behind before disconnecting Gzip bool // Enable compression if client can accept it Logger *log.Logger // Logger is a logger that, when set, will be used for logging debug messages // contains filtered or unexported fields }
func NewServer() *Server
Create a new Server ready for handler creation and publishing events
func (srv *Server) Close()
Stop handling publishing
func (srv *Server) Handler(channel string) http.HandlerFunc
Create a new handler for serving a specified channel
func (srv *Server) Publish(channels []string, ev Event)
Publish an event with the specified id to one or more channels
func (srv *Server) Register(channel string, repo Repository)
Register the repository to be used for the specified channel
Example repository that uses a slice as storage for past events.
type SliceRepository struct {
// contains filtered or unexported fields
}
func NewSliceRepository() *SliceRepository
func (repo *SliceRepository) Add(channel string, event Event)
func (repo SliceRepository) Replay(channel, id string) (out chan Event)
Stream handles a connection for receiving Server Sent Events. It will try and reconnect if the connection is lost, respecting both received retry delays and event id's.
type Stream struct { // Events emits the events received by the stream Events chan Event // Errors emits any errors encountered while reading events from the stream. // It's mainly for informative purposes - the client isn't required to take any // action when an error is encountered. The stream will always attempt to continue, // even if that involves reconnecting to the server. Errors chan error // Logger is a logger that, when set, will be used for logging debug messages Logger *log.Logger // contains filtered or unexported fields }
func Subscribe(url, lastEventId string) (*Stream, error)
Subscribe to the Events emitted from the specified url. If lastEventId is non-empty it will be sent to the server in case it can replay missed events.
func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error)
SubscribeWith takes a http client and request providing customization over both headers and control over the http client settings (timeouts, tls, etc)
func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error)
SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers to be specified, authentication to be configured, etc.
func (stream *Stream) Close()
Close will close the stream. It is safe for concurrent access and can be called multiple times.
type SubscriptionError struct { Code int Message string }
func (e SubscriptionError) Error() string