1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "encoding/binary"
19 "fmt"
20 "io"
21 "time"
22
23 "go.etcd.io/etcd/client/pkg/v3/types"
24 "go.etcd.io/etcd/pkg/v3/pbutil"
25 "go.etcd.io/etcd/raft/v3/raftpb"
26 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
27 )
28
29 const (
30 msgTypeLinkHeartbeat uint8 = 0
31 msgTypeAppEntries uint8 = 1
32 msgTypeApp uint8 = 2
33
34 msgAppV2BufSize = 1024 * 1024
35 )
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 type msgAppV2Encoder struct {
65 w io.Writer
66 fs *stats.FollowerStats
67
68 term uint64
69 index uint64
70 buf []byte
71 uint64buf []byte
72 uint8buf []byte
73 }
74
75 func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
76 return &msgAppV2Encoder{
77 w: w,
78 fs: fs,
79 buf: make([]byte, msgAppV2BufSize),
80 uint64buf: make([]byte, 8),
81 uint8buf: make([]byte, 1),
82 }
83 }
84
85 func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error {
86 start := time.Now()
87 switch {
88 case isLinkHeartbeatMessage(m):
89 enc.uint8buf[0] = msgTypeLinkHeartbeat
90 if _, err := enc.w.Write(enc.uint8buf); err != nil {
91 return err
92 }
93 case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
94 enc.uint8buf[0] = msgTypeAppEntries
95 if _, err := enc.w.Write(enc.uint8buf); err != nil {
96 return err
97 }
98
99 binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
100 if _, err := enc.w.Write(enc.uint64buf); err != nil {
101 return err
102 }
103 for i := 0; i < len(m.Entries); i++ {
104
105 binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
106 if _, err := enc.w.Write(enc.uint64buf); err != nil {
107 return err
108 }
109 if n := m.Entries[i].Size(); n < msgAppV2BufSize {
110 if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
111 return err
112 }
113 if _, err := enc.w.Write(enc.buf[:n]); err != nil {
114 return err
115 }
116 } else {
117 if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
118 return err
119 }
120 }
121 enc.index++
122 }
123
124 binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
125 if _, err := enc.w.Write(enc.uint64buf); err != nil {
126 return err
127 }
128 enc.fs.Succ(time.Since(start))
129 default:
130 if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
131 return err
132 }
133
134 if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
135 return err
136 }
137
138 if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil {
139 return err
140 }
141
142 enc.term = m.Term
143 enc.index = m.Index
144 if l := len(m.Entries); l > 0 {
145 enc.index = m.Entries[l-1].Index
146 }
147 enc.fs.Succ(time.Since(start))
148 }
149 return nil
150 }
151
152 type msgAppV2Decoder struct {
153 r io.Reader
154 local, remote types.ID
155
156 term uint64
157 index uint64
158 buf []byte
159 uint64buf []byte
160 uint8buf []byte
161 }
162
163 func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
164 return &msgAppV2Decoder{
165 r: r,
166 local: local,
167 remote: remote,
168 buf: make([]byte, msgAppV2BufSize),
169 uint64buf: make([]byte, 8),
170 uint8buf: make([]byte, 1),
171 }
172 }
173
174 func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
175 var (
176 m raftpb.Message
177 typ uint8
178 )
179 if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
180 return m, err
181 }
182 typ = dec.uint8buf[0]
183 switch typ {
184 case msgTypeLinkHeartbeat:
185 return linkHeartbeatMessage, nil
186 case msgTypeAppEntries:
187 m = raftpb.Message{
188 Type: raftpb.MsgApp,
189 From: uint64(dec.remote),
190 To: uint64(dec.local),
191 Term: dec.term,
192 LogTerm: dec.term,
193 Index: dec.index,
194 }
195
196
197 if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
198 return m, err
199 }
200 l := binary.BigEndian.Uint64(dec.uint64buf)
201 m.Entries = make([]raftpb.Entry, int(l))
202 for i := 0; i < int(l); i++ {
203 if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
204 return m, err
205 }
206 size := binary.BigEndian.Uint64(dec.uint64buf)
207 var buf []byte
208 if size < msgAppV2BufSize {
209 buf = dec.buf[:size]
210 if _, err := io.ReadFull(dec.r, buf); err != nil {
211 return m, err
212 }
213 } else {
214 buf = make([]byte, int(size))
215 if _, err := io.ReadFull(dec.r, buf); err != nil {
216 return m, err
217 }
218 }
219 dec.index++
220
221 pbutil.MustUnmarshal(&m.Entries[i], buf)
222 }
223
224 if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
225 return m, err
226 }
227 m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
228 case msgTypeApp:
229 var size uint64
230 if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
231 return m, err
232 }
233 buf := make([]byte, int(size))
234 if _, err := io.ReadFull(dec.r, buf); err != nil {
235 return m, err
236 }
237 pbutil.MustUnmarshal(&m, buf)
238
239 dec.term = m.Term
240 dec.index = m.Index
241 if l := len(m.Entries); l > 0 {
242 dec.index = m.Entries[l-1].Index
243 }
244 default:
245 return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
246 }
247 return m, nil
248 }
249
View as plain text