...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package io
30
31 import (
32 "bufio"
33 "encoding/binary"
34 "errors"
35 "github.com/gogo/protobuf/proto"
36 "io"
37 )
38
39 var (
40 errSmallBuffer = errors.New("Buffer Too Small")
41 errLargeValue = errors.New("Value is Larger than 64 bits")
42 )
43
44 func NewDelimitedWriter(w io.Writer) WriteCloser {
45 return &varintWriter{w, make([]byte, binary.MaxVarintLen64), nil}
46 }
47
48 type varintWriter struct {
49 w io.Writer
50 lenBuf []byte
51 buffer []byte
52 }
53
54 func (this *varintWriter) WriteMsg(msg proto.Message) (err error) {
55 var data []byte
56 if m, ok := msg.(marshaler); ok {
57 n, ok := getSize(m)
58 if ok {
59 if n+binary.MaxVarintLen64 >= len(this.buffer) {
60 this.buffer = make([]byte, n+binary.MaxVarintLen64)
61 }
62 lenOff := binary.PutUvarint(this.buffer, uint64(n))
63 _, err = m.MarshalTo(this.buffer[lenOff:])
64 if err != nil {
65 return err
66 }
67 _, err = this.w.Write(this.buffer[:lenOff+n])
68 return err
69 }
70 }
71
72
73 data, err = proto.Marshal(msg)
74 if err != nil {
75 return err
76 }
77 length := uint64(len(data))
78 n := binary.PutUvarint(this.lenBuf, length)
79 _, err = this.w.Write(this.lenBuf[:n])
80 if err != nil {
81 return err
82 }
83 _, err = this.w.Write(data)
84 return err
85 }
86
87 func (this *varintWriter) Close() error {
88 if closer, ok := this.w.(io.Closer); ok {
89 return closer.Close()
90 }
91 return nil
92 }
93
94 func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
95 var closer io.Closer
96 if c, ok := r.(io.Closer); ok {
97 closer = c
98 }
99 return &varintReader{bufio.NewReader(r), nil, maxSize, closer}
100 }
101
102 type varintReader struct {
103 r *bufio.Reader
104 buf []byte
105 maxSize int
106 closer io.Closer
107 }
108
109 func (this *varintReader) ReadMsg(msg proto.Message) error {
110 length64, err := binary.ReadUvarint(this.r)
111 if err != nil {
112 return err
113 }
114 length := int(length64)
115 if length < 0 || length > this.maxSize {
116 return io.ErrShortBuffer
117 }
118 if len(this.buf) < length {
119 this.buf = make([]byte, length)
120 }
121 buf := this.buf[:length]
122 if _, err := io.ReadFull(this.r, buf); err != nil {
123 return err
124 }
125 return proto.Unmarshal(buf, msg)
126 }
127
128 func (this *varintReader) Close() error {
129 if this.closer != nil {
130 return this.closer.Close()
131 }
132 return nil
133 }
134
View as plain text