...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "encoding/binary"
19 "errors"
20 "io"
21
22 "go.etcd.io/etcd/pkg/v3/pbutil"
23 "go.etcd.io/etcd/raft/v3/raftpb"
24 )
25
26
27
28 type messageEncoder struct {
29 w io.Writer
30 }
31
32 func (enc *messageEncoder) encode(m *raftpb.Message) error {
33 if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
34 return err
35 }
36 _, err := enc.w.Write(pbutil.MustMarshal(m))
37 return err
38 }
39
40
41 type messageDecoder struct {
42 r io.Reader
43 }
44
45 var (
46 readBytesLimit uint64 = 512 * 1024 * 1024
47 ErrExceedSizeLimit = errors.New("rafthttp: error limit exceeded")
48 )
49
50 func (dec *messageDecoder) decode() (raftpb.Message, error) {
51 return dec.decodeLimit(readBytesLimit)
52 }
53
54 func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
55 var m raftpb.Message
56 var l uint64
57 if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
58 return m, err
59 }
60 if l > numBytes {
61 return m, ErrExceedSizeLimit
62 }
63 buf := make([]byte, int(l))
64 if _, err := io.ReadFull(dec.r, buf); err != nil {
65 return m, err
66 }
67 return m, m.Unmarshal(buf)
68 }
69
View as plain text