...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package io
30
31 import (
32 "encoding/binary"
33 "io"
34
35 "github.com/gogo/protobuf/proto"
36 )
37
38 const uint32BinaryLen = 4
39
40 func NewUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder) WriteCloser {
41 return &uint32Writer{w, byteOrder, nil, make([]byte, uint32BinaryLen)}
42 }
43
44 func NewSizeUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder, size int) WriteCloser {
45 return &uint32Writer{w, byteOrder, make([]byte, size), make([]byte, uint32BinaryLen)}
46 }
47
48 type uint32Writer struct {
49 w io.Writer
50 byteOrder binary.ByteOrder
51 buffer []byte
52 lenBuf []byte
53 }
54
55 func (this *uint32Writer) writeFallback(msg proto.Message) error {
56 data, err := proto.Marshal(msg)
57 if err != nil {
58 return err
59 }
60
61 length := uint32(len(data))
62 this.byteOrder.PutUint32(this.lenBuf, length)
63 if _, err = this.w.Write(this.lenBuf); err != nil {
64 return err
65 }
66 _, err = this.w.Write(data)
67 return err
68 }
69
70 func (this *uint32Writer) WriteMsg(msg proto.Message) error {
71 m, ok := msg.(marshaler)
72 if !ok {
73 return this.writeFallback(msg)
74 }
75
76 n, ok := getSize(m)
77 if !ok {
78 return this.writeFallback(msg)
79 }
80
81 size := n + uint32BinaryLen
82 if size > len(this.buffer) {
83 this.buffer = make([]byte, size)
84 }
85
86 this.byteOrder.PutUint32(this.buffer, uint32(n))
87 if _, err := m.MarshalTo(this.buffer[uint32BinaryLen:]); err != nil {
88 return err
89 }
90
91 _, err := this.w.Write(this.buffer[:size])
92 return err
93 }
94
95 func (this *uint32Writer) Close() error {
96 if closer, ok := this.w.(io.Closer); ok {
97 return closer.Close()
98 }
99 return nil
100 }
101
102 type uint32Reader struct {
103 r io.Reader
104 byteOrder binary.ByteOrder
105 lenBuf []byte
106 buf []byte
107 maxSize int
108 }
109
110 func NewUint32DelimitedReader(r io.Reader, byteOrder binary.ByteOrder, maxSize int) ReadCloser {
111 return &uint32Reader{r, byteOrder, make([]byte, 4), nil, maxSize}
112 }
113
114 func (this *uint32Reader) ReadMsg(msg proto.Message) error {
115 if _, err := io.ReadFull(this.r, this.lenBuf); err != nil {
116 return err
117 }
118 length32 := this.byteOrder.Uint32(this.lenBuf)
119 length := int(length32)
120 if length < 0 || length > this.maxSize {
121 return io.ErrShortBuffer
122 }
123 if length > len(this.buf) {
124 this.buf = make([]byte, length)
125 }
126 _, err := io.ReadFull(this.r, this.buf[:length])
127 if err != nil {
128 return err
129 }
130 return proto.Unmarshal(this.buf[:length], msg)
131 }
132
133 func (this *uint32Reader) Close() error {
134 if closer, ok := this.r.(io.Closer); ok {
135 return closer.Close()
136 }
137 return nil
138 }
139
View as plain text