...
1
16
17
18 package framer
19
20 import (
21 "encoding/binary"
22 "encoding/json"
23 "io"
24 )
25
26 type lengthDelimitedFrameWriter struct {
27 w io.Writer
28 h [4]byte
29 }
30
31 func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer {
32 return &lengthDelimitedFrameWriter{w: w}
33 }
34
35
36
37 func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
38 binary.BigEndian.PutUint32(w.h[:], uint32(len(data)))
39 n, err := w.w.Write(w.h[:])
40 if err != nil {
41 return 0, err
42 }
43 if n != len(w.h) {
44 return 0, io.ErrShortWrite
45 }
46 return w.w.Write(data)
47 }
48
49 type lengthDelimitedFrameReader struct {
50 r io.ReadCloser
51 remaining int
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65
66 func NewLengthDelimitedFrameReader(r io.ReadCloser) io.ReadCloser {
67 return &lengthDelimitedFrameReader{r: r}
68 }
69
70
71
72
73 func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
74 if r.remaining <= 0 {
75 header := [4]byte{}
76 n, err := io.ReadAtLeast(r.r, header[:4], 4)
77 if err != nil {
78 return 0, err
79 }
80 if n != 4 {
81 return 0, io.ErrUnexpectedEOF
82 }
83 frameLength := int(binary.BigEndian.Uint32(header[:]))
84 r.remaining = frameLength
85 }
86
87 expect := r.remaining
88 max := expect
89 if max > len(data) {
90 max = len(data)
91 }
92 n, err := io.ReadAtLeast(r.r, data[:max], int(max))
93 r.remaining -= n
94 if err == io.ErrShortBuffer || r.remaining > 0 {
95 return n, io.ErrShortBuffer
96 }
97 if err != nil {
98 return n, err
99 }
100 if n != expect {
101 return n, io.ErrUnexpectedEOF
102 }
103
104 return n, nil
105 }
106
107 func (r *lengthDelimitedFrameReader) Close() error {
108 return r.r.Close()
109 }
110
111 type jsonFrameReader struct {
112 r io.ReadCloser
113 decoder *json.Decoder
114 remaining []byte
115 }
116
117
118
119
120
121
122 func NewJSONFramedReader(r io.ReadCloser) io.ReadCloser {
123 return &jsonFrameReader{
124 r: r,
125 decoder: json.NewDecoder(r),
126 }
127 }
128
129
130
131 func (r *jsonFrameReader) Read(data []byte) (int, error) {
132
133 if n := len(r.remaining); n > 0 {
134 if n <= len(data) {
135
136 data = append(data[0:0], r.remaining...)
137 r.remaining = nil
138 return n, nil
139 }
140
141 n = len(data)
142
143 data = append(data[0:0], r.remaining[:n]...)
144 r.remaining = r.remaining[n:]
145 return n, io.ErrShortBuffer
146 }
147
148
149
150 n := len(data)
151 m := json.RawMessage(data[:0])
152 if err := r.decoder.Decode(&m); err != nil {
153 return 0, err
154 }
155
156
157
158
159 if len(m) > n {
160
161 data = append(data[0:0], m[:n]...)
162 r.remaining = m[n:]
163 return n, io.ErrShortBuffer
164 }
165 return len(m), nil
166 }
167
168 func (r *jsonFrameReader) Close() error {
169 return r.r.Close()
170 }
171
View as plain text