...

Package eventsource

import "github.com/donovanhide/eventsource"
Overview
Index
Examples

Overview ▾

Package eventsource implements a client and server to allow streaming data one-way over a HTTP connection using the Server-Sent Events API http://dev.w3.org/html5/eventsource/

The client and server respect the Last-Event-ID header. If the Repository interface is implemented on the server, events can be replayed in case of a network disconnection.

type Decoder

A Decoder is capable of reading Events from a stream.

type Decoder struct {
    *bufio.Reader
}

func NewDecoder

func NewDecoder(r io.Reader) *Decoder

NewDecoder returns a new Decoder instance that reads events with the given io.Reader.

func (*Decoder) Decode

func (dec *Decoder) Decode() (Event, error)

Decode reads the next Event from a stream (and will block until one comes in). Graceful disconnects (between events) are indicated by an io.EOF error. Any error occuring mid-event is considered non-graceful and will show up as some other error (most likely io.ErrUnexpectedEOF).

type Encoder

An Encoder is capable of writing Events to a stream. Optionally Events can be gzip compressed in this process.

type Encoder struct {
    // contains filtered or unexported fields
}

func NewEncoder

func NewEncoder(w io.Writer, compressed bool) *Encoder

NewEncoder returns an Encoder for a given io.Writer. When compressed is set to true, a gzip writer will be created.

func (*Encoder) Encode

func (enc *Encoder) Encode(ev Event) error

Encode writes an event in the format specified by the server-sent events protocol.

type Event

Any event received by the client or sent by the server will implement this interface

type Event interface {
    // Id is an identifier that can be used to allow a client to replay
    // missed Events by returning the Last-Event-Id header.
    // Return empty string if not required.
    Id() string
    // The name of the event. Return empty string if not required.
    Event() string
    // The payload of the event.
    Data() string
}

Example

Code:

package eventsource_test

import (
    "fmt"
    "net"
    "net/http"
    "time"

    "github.com/donovanhide/eventsource"
)

type TimeEvent time.Time

func (t TimeEvent) Id() string    { return fmt.Sprint(time.Time(t).UnixNano()) }
func (t TimeEvent) Event() string { return "Tick" }
func (t TimeEvent) Data() string  { return time.Time(t).String() }

const (
    TICK_COUNT = 5
)

func TimePublisher(srv *eventsource.Server) {
    start := time.Date(2013, time.January, 1, 0, 0, 0, 0, time.UTC)
    ticker := time.NewTicker(time.Second)
    for i := 0; i < TICK_COUNT; i++ {
        <-ticker.C
        srv.Publish([]string{"time"}, TimeEvent(start))
        start = start.Add(time.Second)
    }
}

func ExampleEvent() {
    srv := eventsource.NewServer()
    srv.Gzip = true
    defer srv.Close()
    l, err := net.Listen("tcp", ":8080")
    if err != nil {
        return
    }
    defer l.Close()
    http.HandleFunc("/time", srv.Handler("time"))
    go http.Serve(l, nil)
    go TimePublisher(srv)
    stream, err := eventsource.Subscribe("http://127.0.0.1:8080/time", "")
    if err != nil {
        return
    }
    for i := 0; i < TICK_COUNT; i++ {
        ev := <-stream.Events
        fmt.Println(ev.Id(), ev.Event(), ev.Data())
    }

    // Output:
    // 1356998400000000000 Tick 2013-01-01 00:00:00 +0000 UTC
    // 1356998401000000000 Tick 2013-01-01 00:00:01 +0000 UTC
    // 1356998402000000000 Tick 2013-01-01 00:00:02 +0000 UTC
    // 1356998403000000000 Tick 2013-01-01 00:00:03 +0000 UTC
    // 1356998404000000000 Tick 2013-01-01 00:00:04 +0000 UTC
}

type Repository

If history is required, this interface will allow clients to reply previous events through the server. Both methods can be called from different goroutines concurrently, so you must make sure they are go-routine safe.

type Repository interface {
    // Gets the Events which should follow on from the specified channel and event id.
    Replay(channel, id string) chan Event
}

Example

Code:

package eventsource_test

import (
    "encoding/json"
    "fmt"
    "github.com/donovanhide/eventsource"
    "net"
    "net/http"
)

type NewsArticle struct {
    id             string
    Title, Content string
}

func (a *NewsArticle) Id() string    { return a.id }
func (a *NewsArticle) Event() string { return "News Article" }
func (a *NewsArticle) Data() string  { b, _ := json.Marshal(a); return string(b) }

var articles = []NewsArticle{
    {"2", "Governments struggle to control global price of gas", "Hot air...."},
    {"1", "Tomorrow is another day", "And so is the day after."},
    {"3", "News for news' sake", "Nothing has happened."},
}

func buildRepo(srv *eventsource.Server) {
    repo := eventsource.NewSliceRepository()
    srv.Register("articles", repo)
    for i := range articles {
        repo.Add("articles", &articles[i])
        srv.Publish([]string{"articles"}, &articles[i])
    }
}

func ExampleRepository() {
    srv := eventsource.NewServer()
    defer srv.Close()
    http.HandleFunc("/articles", srv.Handler("articles"))
    l, err := net.Listen("tcp", ":8080")
    if err != nil {
        return
    }
    defer l.Close()
    go http.Serve(l, nil)
    stream, err := eventsource.Subscribe("http://127.0.0.1:8080/articles", "")
    if err != nil {
        return
    }
    go buildRepo(srv)
    // This will receive events in the order that they come
    for i := 0; i < 3; i++ {
        ev := <-stream.Events
        fmt.Println(ev.Id(), ev.Event(), ev.Data())
    }
    stream, err = eventsource.Subscribe("http://127.0.0.1:8080/articles", "1")
    if err != nil {
        fmt.Println(err)
        return
    }
    // This will replay the events in order of id
    for i := 0; i < 3; i++ {
        ev := <-stream.Events
        fmt.Println(ev.Id(), ev.Event(), ev.Data())
    }
    // Output:
    // 2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."}
    // 1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."}
    // 3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}
    // 1 News Article {"Title":"Tomorrow is another day","Content":"And so is the day after."}
    // 2 News Article {"Title":"Governments struggle to control global price of gas","Content":"Hot air...."}
    // 3 News Article {"Title":"News for news' sake","Content":"Nothing has happened."}
}

type Server

type Server struct {
    AllowCORS  bool        // Enable all handlers to be accessible from any origin
    ReplayAll  bool        // Replay repository even if there's no Last-Event-Id specified
    BufferSize int         // How many messages do we let the client get behind before disconnecting
    Gzip       bool        // Enable compression if client can accept it
    Logger     *log.Logger // Logger is a logger that, when set, will be used for logging debug messages
    // contains filtered or unexported fields
}

func NewServer

func NewServer() *Server

Create a new Server ready for handler creation and publishing events

func (*Server) Close

func (srv *Server) Close()

Stop handling publishing

func (*Server) Handler

func (srv *Server) Handler(channel string) http.HandlerFunc

Create a new handler for serving a specified channel

func (*Server) Publish

func (srv *Server) Publish(channels []string, ev Event)

Publish an event with the specified id to one or more channels

func (*Server) Register

func (srv *Server) Register(channel string, repo Repository)

Register the repository to be used for the specified channel

type SliceRepository

Example repository that uses a slice as storage for past events.

type SliceRepository struct {
    // contains filtered or unexported fields
}

func NewSliceRepository

func NewSliceRepository() *SliceRepository

func (*SliceRepository) Add

func (repo *SliceRepository) Add(channel string, event Event)

func (SliceRepository) Replay

func (repo SliceRepository) Replay(channel, id string) (out chan Event)

type Stream

Stream handles a connection for receiving Server Sent Events. It will try and reconnect if the connection is lost, respecting both received retry delays and event id's.

type Stream struct {

    // Events emits the events received by the stream
    Events chan Event
    // Errors emits any errors encountered while reading events from the stream.
    // It's mainly for informative purposes - the client isn't required to take any
    // action when an error is encountered. The stream will always attempt to continue,
    // even if that involves reconnecting to the server.
    Errors chan error
    // Logger is a logger that, when set, will be used for logging debug messages
    Logger *log.Logger
    // contains filtered or unexported fields
}

func Subscribe

func Subscribe(url, lastEventId string) (*Stream, error)

Subscribe to the Events emitted from the specified url. If lastEventId is non-empty it will be sent to the server in case it can replay missed events.

func SubscribeWith

func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error)

SubscribeWith takes a http client and request providing customization over both headers and control over the http client settings (timeouts, tls, etc)

func SubscribeWithRequest

func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error)

SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers to be specified, authentication to be configured, etc.

func (*Stream) Close

func (stream *Stream) Close()

Close will close the stream. It is safe for concurrent access and can be called multiple times.

type SubscriptionError

type SubscriptionError struct {
    Code    int
    Message string
}

func (SubscriptionError) Error

func (e SubscriptionError) Error() string