...

Source file src/github.com/launchdarkly/eventsource/decoder.go

Documentation: github.com/launchdarkly/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"bufio"
     5  	"io"
     6  	"strconv"
     7  	"strings"
     8  	"time"
     9  )
    10  
    11  type publication struct {
    12  	id, event, data string
    13  	retry           int64
    14  }
    15  
    16  //nolint:golint,stylecheck // should be ID; retained for backward compatibility
    17  func (s *publication) Id() string    { return s.id }
    18  func (s *publication) Event() string { return s.event }
    19  func (s *publication) Data() string  { return s.data }
    20  func (s *publication) Retry() int64  { return s.retry }
    21  
    22  // A Decoder is capable of reading Events from a stream.
    23  type Decoder struct {
    24  	linesCh     <-chan string
    25  	errorCh     <-chan error
    26  	readTimeout time.Duration
    27  }
    28  
    29  // DecoderOption is a common interface for optional configuration parameters that can be
    30  // used in creating a Decoder.
    31  type DecoderOption interface {
    32  	apply(s *Decoder)
    33  }
    34  
    35  type readTimeoutDecoderOption time.Duration
    36  
    37  func (o readTimeoutDecoderOption) apply(d *Decoder) {
    38  	d.readTimeout = time.Duration(o)
    39  }
    40  
    41  // DecoderOptionReadTimeout returns an option that sets the read timeout interval for a
    42  // Decoder when the Decoder is created. If the Decoder does not receive new data within this
    43  // length of time, it will return an error. By default, there is no read timeout.
    44  func DecoderOptionReadTimeout(timeout time.Duration) DecoderOption {
    45  	return readTimeoutDecoderOption(timeout)
    46  }
    47  
    48  // NewDecoder returns a new Decoder instance that reads events with the given io.Reader.
    49  func NewDecoder(r io.Reader) *Decoder {
    50  	bufReader := bufio.NewReader(newNormaliser(r))
    51  	linesCh, errorCh := newLineStreamChannel(bufReader)
    52  	return &Decoder{
    53  		linesCh: linesCh,
    54  		errorCh: errorCh,
    55  	}
    56  }
    57  
    58  // NewDecoderWithOptions returns a new Decoder instance that reads events with the given
    59  // io.Reader, with optional configuration parameters.
    60  func NewDecoderWithOptions(r io.Reader, options ...DecoderOption) *Decoder {
    61  	d := NewDecoder(r)
    62  	for _, o := range options {
    63  		o.apply(d)
    64  	}
    65  	return d
    66  }
    67  
    68  // Decode reads the next Event from a stream (and will block until one
    69  // comes in).
    70  // Graceful disconnects (between events) are indicated by an io.EOF error.
    71  // Any error occurring mid-event is considered non-graceful and will
    72  // show up as some other error (most likely io.ErrUnexpectedEOF).
    73  func (dec *Decoder) Decode() (Event, error) {
    74  	pub := new(publication)
    75  	inDecoding := false
    76  	var timeoutTimer *time.Timer
    77  	var timeoutCh <-chan time.Time
    78  	if dec.readTimeout > 0 {
    79  		timeoutTimer = time.NewTimer(dec.readTimeout)
    80  		defer timeoutTimer.Stop()
    81  		timeoutCh = timeoutTimer.C
    82  	}
    83  ReadLoop:
    84  	for {
    85  		select {
    86  		case line := <-dec.linesCh:
    87  			if timeoutTimer != nil {
    88  				if !timeoutTimer.Stop() {
    89  					<-timeoutCh
    90  				}
    91  				timeoutTimer.Reset(dec.readTimeout)
    92  			}
    93  			if line == "\n" && inDecoding {
    94  				// the empty line signals the end of an event
    95  				break ReadLoop
    96  			} else if line == "\n" && !inDecoding {
    97  				// only a newline was sent, so we don't want to publish an empty event but try to read again
    98  				continue ReadLoop
    99  			}
   100  			line = strings.TrimSuffix(line, "\n")
   101  			if strings.HasPrefix(line, ":") {
   102  				continue ReadLoop
   103  			}
   104  			sections := strings.SplitN(line, ":", 2)
   105  			field, value := sections[0], ""
   106  			if len(sections) == 2 {
   107  				value = strings.TrimPrefix(sections[1], " ")
   108  			}
   109  			inDecoding = true
   110  			switch field {
   111  			case "event":
   112  				pub.event = value
   113  			case "data":
   114  				pub.data += value + "\n"
   115  			case "id":
   116  				pub.id = value
   117  			case "retry":
   118  				pub.retry, _ = strconv.ParseInt(value, 10, 64)
   119  			}
   120  		case err := <-dec.errorCh:
   121  			if err == io.ErrUnexpectedEOF && !inDecoding {
   122  				// if we're not in the middle of an event then just return EOF
   123  				err = io.EOF
   124  			} else if err == io.EOF && inDecoding {
   125  				// if we are in the middle of an event then EOF is unexpected
   126  				err = io.ErrUnexpectedEOF
   127  			}
   128  			return nil, err
   129  		case <-timeoutCh:
   130  			return nil, ErrReadTimeout
   131  		}
   132  	}
   133  	pub.data = strings.TrimSuffix(pub.data, "\n")
   134  	return pub, nil
   135  }
   136  
   137  /**
   138   * Returns a channel that will receive lines of text as they are read. On any error
   139   * from the underlying reader, it stops and posts the error to a second channel.
   140   */
   141  func newLineStreamChannel(r *bufio.Reader) (<-chan string, <-chan error) {
   142  	linesCh := make(chan string)
   143  	errorCh := make(chan error)
   144  	go func() {
   145  		defer close(linesCh)
   146  		defer close(errorCh)
   147  		for {
   148  			line, err := r.ReadString('\n')
   149  			if err != nil {
   150  				errorCh <- err
   151  				return
   152  			}
   153  			linesCh <- line
   154  		}
   155  	}()
   156  	return linesCh, errorCh
   157  }
   158  

View as plain text