1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "fmt"
19 "log"
20
21 pb "go.etcd.io/etcd/raft/v3/raftpb"
22 )
23
24 type raftLog struct {
25
26 storage Storage
27
28
29
30 unstable unstable
31
32
33
34 committed uint64
35
36
37
38 applied uint64
39
40 logger Logger
41
42
43
44 maxNextEntsSize uint64
45 }
46
47
48
49
50 func newLog(storage Storage, logger Logger) *raftLog {
51 return newLogWithSize(storage, logger, noLimit)
52 }
53
54
55
56 func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
57 if storage == nil {
58 log.Panic("storage must not be nil")
59 }
60 log := &raftLog{
61 storage: storage,
62 logger: logger,
63 maxNextEntsSize: maxNextEntsSize,
64 }
65 firstIndex, err := storage.FirstIndex()
66 if err != nil {
67 panic(err)
68 }
69 lastIndex, err := storage.LastIndex()
70 if err != nil {
71 panic(err)
72 }
73 log.unstable.offset = lastIndex + 1
74 log.unstable.logger = logger
75
76 log.committed = firstIndex - 1
77 log.applied = firstIndex - 1
78
79 return log
80 }
81
82 func (l *raftLog) String() string {
83 return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
84 }
85
86
87
88 func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
89 if l.matchTerm(index, logTerm) {
90 lastnewi = index + uint64(len(ents))
91 ci := l.findConflict(ents)
92 switch {
93 case ci == 0:
94 case ci <= l.committed:
95 l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
96 default:
97 offset := index + 1
98 l.append(ents[ci-offset:]...)
99 }
100 l.commitTo(min(committed, lastnewi))
101 return lastnewi, true
102 }
103 return 0, false
104 }
105
106 func (l *raftLog) append(ents ...pb.Entry) uint64 {
107 if len(ents) == 0 {
108 return l.lastIndex()
109 }
110 if after := ents[0].Index - 1; after < l.committed {
111 l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
112 }
113 l.unstable.truncateAndAppend(ents)
114 return l.lastIndex()
115 }
116
117
118
119
120
121
122
123
124
125
126
127 func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
128 for _, ne := range ents {
129 if !l.matchTerm(ne.Index, ne.Term) {
130 if ne.Index <= l.lastIndex() {
131 l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
132 ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
133 }
134 return ne.Index
135 }
136 }
137 return 0
138 }
139
140
141
142
143
144
145
146
147 func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
148 if li := l.lastIndex(); index > li {
149
150
151
152
153
154
155
156 l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
157 index, li)
158 return index
159 }
160 for {
161 logTerm, err := l.term(index)
162 if logTerm <= term || err != nil {
163 break
164 }
165 index--
166 }
167 return index
168 }
169
170 func (l *raftLog) unstableEntries() []pb.Entry {
171 if len(l.unstable.entries) == 0 {
172 return nil
173 }
174 return l.unstable.entries
175 }
176
177
178
179
180 func (l *raftLog) nextEnts() (ents []pb.Entry) {
181 off := max(l.applied+1, l.firstIndex())
182 if l.committed+1 > off {
183 ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
184 if err != nil {
185 l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
186 }
187 return ents
188 }
189 return nil
190 }
191
192
193
194 func (l *raftLog) hasNextEnts() bool {
195 off := max(l.applied+1, l.firstIndex())
196 return l.committed+1 > off
197 }
198
199
200 func (l *raftLog) hasPendingSnapshot() bool {
201 return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot)
202 }
203
204 func (l *raftLog) snapshot() (pb.Snapshot, error) {
205 if l.unstable.snapshot != nil {
206 return *l.unstable.snapshot, nil
207 }
208 return l.storage.Snapshot()
209 }
210
211 func (l *raftLog) firstIndex() uint64 {
212 if i, ok := l.unstable.maybeFirstIndex(); ok {
213 return i
214 }
215 index, err := l.storage.FirstIndex()
216 if err != nil {
217 panic(err)
218 }
219 return index
220 }
221
222 func (l *raftLog) lastIndex() uint64 {
223 if i, ok := l.unstable.maybeLastIndex(); ok {
224 return i
225 }
226 i, err := l.storage.LastIndex()
227 if err != nil {
228 panic(err)
229 }
230 return i
231 }
232
233 func (l *raftLog) commitTo(tocommit uint64) {
234
235 if l.committed < tocommit {
236 if l.lastIndex() < tocommit {
237 l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
238 }
239 l.committed = tocommit
240 }
241 }
242
243 func (l *raftLog) appliedTo(i uint64) {
244 if i == 0 {
245 return
246 }
247 if l.committed < i || i < l.applied {
248 l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
249 }
250 l.applied = i
251 }
252
253 func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
254
255 func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
256
257 func (l *raftLog) lastTerm() uint64 {
258 t, err := l.term(l.lastIndex())
259 if err != nil {
260 l.logger.Panicf("unexpected error when getting the last term (%v)", err)
261 }
262 return t
263 }
264
265 func (l *raftLog) term(i uint64) (uint64, error) {
266
267 dummyIndex := l.firstIndex() - 1
268 if i < dummyIndex || i > l.lastIndex() {
269
270 return 0, nil
271 }
272
273 if t, ok := l.unstable.maybeTerm(i); ok {
274 return t, nil
275 }
276
277 t, err := l.storage.Term(i)
278 if err == nil {
279 return t, nil
280 }
281 if err == ErrCompacted || err == ErrUnavailable {
282 return 0, err
283 }
284 panic(err)
285 }
286
287 func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
288 if i > l.lastIndex() {
289 return nil, nil
290 }
291 return l.slice(i, l.lastIndex()+1, maxsize)
292 }
293
294
295 func (l *raftLog) allEntries() []pb.Entry {
296 ents, err := l.entries(l.firstIndex(), noLimit)
297 if err == nil {
298 return ents
299 }
300 if err == ErrCompacted {
301 return l.allEntries()
302 }
303
304 panic(err)
305 }
306
307
308
309
310
311
312
313 func (l *raftLog) isUpToDate(lasti, term uint64) bool {
314 return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
315 }
316
317 func (l *raftLog) matchTerm(i, term uint64) bool {
318 t, err := l.term(i)
319 if err != nil {
320 return false
321 }
322 return t == term
323 }
324
325 func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
326 if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
327 l.commitTo(maxIndex)
328 return true
329 }
330 return false
331 }
332
333 func (l *raftLog) restore(s pb.Snapshot) {
334 l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
335 l.committed = s.Metadata.Index
336 l.unstable.restore(s)
337 }
338
339
340 func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
341 err := l.mustCheckOutOfBounds(lo, hi)
342 if err != nil {
343 return nil, err
344 }
345 if lo == hi {
346 return nil, nil
347 }
348 var ents []pb.Entry
349 if lo < l.unstable.offset {
350 storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
351 if err == ErrCompacted {
352 return nil, err
353 } else if err == ErrUnavailable {
354 l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
355 } else if err != nil {
356 panic(err)
357 }
358
359
360 if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
361 return storedEnts, nil
362 }
363
364 ents = storedEnts
365 }
366 if hi > l.unstable.offset {
367 unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
368 if len(ents) > 0 {
369 combined := make([]pb.Entry, len(ents)+len(unstable))
370 n := copy(combined, ents)
371 copy(combined[n:], unstable)
372 ents = combined
373 } else {
374 ents = unstable
375 }
376 }
377 return limitSize(ents, maxSize), nil
378 }
379
380
381 func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
382 if lo > hi {
383 l.logger.Panicf("invalid slice %d > %d", lo, hi)
384 }
385 fi := l.firstIndex()
386 if lo < fi {
387 return ErrCompacted
388 }
389
390 length := l.lastIndex() + 1 - fi
391 if hi > fi+length {
392 l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
393 }
394 return nil
395 }
396
397 func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
398 if err == nil {
399 return t
400 }
401 if err == ErrCompacted {
402 return 0
403 }
404 l.logger.Panicf("unexpected error (%v)", err)
405 return 0
406 }
407
View as plain text