...

Source file src/github.com/donovanhide/eventsource/decoder.go

Documentation: github.com/donovanhide/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"bufio"
     5  	"io"
     6  	"strconv"
     7  	"strings"
     8  )
     9  
    10  type publication struct {
    11  	id, event, data string
    12  	retry           int64
    13  }
    14  
    15  func (s *publication) Id() string    { return s.id }
    16  func (s *publication) Event() string { return s.event }
    17  func (s *publication) Data() string  { return s.data }
    18  func (s *publication) Retry() int64  { return s.retry }
    19  
    20  // A Decoder is capable of reading Events from a stream.
    21  type Decoder struct {
    22  	*bufio.Reader
    23  }
    24  
    25  // NewDecoder returns a new Decoder instance that reads events
    26  // with the given io.Reader.
    27  func NewDecoder(r io.Reader) *Decoder {
    28  	dec := &Decoder{bufio.NewReader(newNormaliser(r))}
    29  	return dec
    30  }
    31  
    32  // Decode reads the next Event from a stream (and will block until one
    33  // comes in).
    34  // Graceful disconnects (between events) are indicated by an io.EOF error.
    35  // Any error occuring mid-event is considered non-graceful and will
    36  // show up as some other error (most likely io.ErrUnexpectedEOF).
    37  func (dec *Decoder) Decode() (Event, error) {
    38  	// peek ahead before we start a new event so we can return EOFs
    39  	_, err := dec.Peek(1)
    40  	if err == io.ErrUnexpectedEOF {
    41  		err = io.EOF
    42  	}
    43  	if err != nil {
    44  		return nil, err
    45  	}
    46  	pub := new(publication)
    47  	inDecoding := false
    48  	for {
    49  		line, err := dec.ReadString('\n')
    50  		if err != nil {
    51  			return nil, err
    52  		}
    53  		if line == "\n" && inDecoding {
    54  			// the empty line signals the end of an event
    55  			break
    56  		} else if line == "\n" && !inDecoding {
    57  			// only a newline was sent, so we don't want to publish an empty event but try to read again
    58  			continue
    59  		}
    60  		line = strings.TrimSuffix(line, "\n")
    61  		if strings.HasPrefix(line, ":") {
    62  			continue
    63  		}
    64  		sections := strings.SplitN(line, ":", 2)
    65  		field, value := sections[0], ""
    66  		if len(sections) == 2 {
    67  			value = strings.TrimPrefix(sections[1], " ")
    68  		}
    69  		inDecoding = true
    70  		switch field {
    71  		case "event":
    72  			pub.event = value
    73  		case "data":
    74  			pub.data += value + "\n"
    75  		case "id":
    76  			pub.id = value
    77  		case "retry":
    78  			pub.retry, _ = strconv.ParseInt(value, 10, 64)
    79  		}
    80  	}
    81  	pub.data = strings.TrimSuffix(pub.data, "\n")
    82  	return pub, nil
    83  }
    84  

View as plain text