...
1 package eventsource
2
3 import (
4 "bufio"
5 "io"
6 "strconv"
7 "strings"
8 "time"
9 )
10
11 type publication struct {
12 id, event, data string
13 retry int64
14 }
15
16
17 func (s *publication) Id() string { return s.id }
18 func (s *publication) Event() string { return s.event }
19 func (s *publication) Data() string { return s.data }
20 func (s *publication) Retry() int64 { return s.retry }
21
22
23 type Decoder struct {
24 linesCh <-chan string
25 errorCh <-chan error
26 readTimeout time.Duration
27 }
28
29
30
31 type DecoderOption interface {
32 apply(s *Decoder)
33 }
34
35 type readTimeoutDecoderOption time.Duration
36
37 func (o readTimeoutDecoderOption) apply(d *Decoder) {
38 d.readTimeout = time.Duration(o)
39 }
40
41
42
43
44 func DecoderOptionReadTimeout(timeout time.Duration) DecoderOption {
45 return readTimeoutDecoderOption(timeout)
46 }
47
48
49 func NewDecoder(r io.Reader) *Decoder {
50 bufReader := bufio.NewReader(newNormaliser(r))
51 linesCh, errorCh := newLineStreamChannel(bufReader)
52 return &Decoder{
53 linesCh: linesCh,
54 errorCh: errorCh,
55 }
56 }
57
58
59
60 func NewDecoderWithOptions(r io.Reader, options ...DecoderOption) *Decoder {
61 d := NewDecoder(r)
62 for _, o := range options {
63 o.apply(d)
64 }
65 return d
66 }
67
68
69
70
71
72
73 func (dec *Decoder) Decode() (Event, error) {
74 pub := new(publication)
75 inDecoding := false
76 var timeoutTimer *time.Timer
77 var timeoutCh <-chan time.Time
78 if dec.readTimeout > 0 {
79 timeoutTimer = time.NewTimer(dec.readTimeout)
80 defer timeoutTimer.Stop()
81 timeoutCh = timeoutTimer.C
82 }
83 ReadLoop:
84 for {
85 select {
86 case line := <-dec.linesCh:
87 if timeoutTimer != nil {
88 if !timeoutTimer.Stop() {
89 <-timeoutCh
90 }
91 timeoutTimer.Reset(dec.readTimeout)
92 }
93 if line == "\n" && inDecoding {
94
95 break ReadLoop
96 } else if line == "\n" && !inDecoding {
97
98 continue ReadLoop
99 }
100 line = strings.TrimSuffix(line, "\n")
101 if strings.HasPrefix(line, ":") {
102 continue ReadLoop
103 }
104 sections := strings.SplitN(line, ":", 2)
105 field, value := sections[0], ""
106 if len(sections) == 2 {
107 value = strings.TrimPrefix(sections[1], " ")
108 }
109 inDecoding = true
110 switch field {
111 case "event":
112 pub.event = value
113 case "data":
114 pub.data += value + "\n"
115 case "id":
116 pub.id = value
117 case "retry":
118 pub.retry, _ = strconv.ParseInt(value, 10, 64)
119 }
120 case err := <-dec.errorCh:
121 if err == io.ErrUnexpectedEOF && !inDecoding {
122
123 err = io.EOF
124 } else if err == io.EOF && inDecoding {
125
126 err = io.ErrUnexpectedEOF
127 }
128 return nil, err
129 case <-timeoutCh:
130 return nil, ErrReadTimeout
131 }
132 }
133 pub.data = strings.TrimSuffix(pub.data, "\n")
134 return pub, nil
135 }
136
137
141 func newLineStreamChannel(r *bufio.Reader) (<-chan string, <-chan error) {
142 linesCh := make(chan string)
143 errorCh := make(chan error)
144 go func() {
145 defer close(linesCh)
146 defer close(errorCh)
147 for {
148 line, err := r.ReadString('\n')
149 if err != nil {
150 errorCh <- err
151 return
152 }
153 linesCh <- line
154 }
155 }()
156 return linesCh, errorCh
157 }
158
View as plain text