1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package wal
16
17 import (
18 "bytes"
19 "fmt"
20 "github.com/stretchr/testify/require"
21 "io"
22 "io/ioutil"
23 "math"
24 "math/rand"
25 "os"
26 "path"
27 "path/filepath"
28 "reflect"
29 "regexp"
30 "testing"
31
32 "github.com/stretchr/testify/assert"
33 "go.etcd.io/etcd/client/pkg/v3/fileutil"
34 "go.etcd.io/etcd/pkg/v3/pbutil"
35 "go.etcd.io/etcd/raft/v3/raftpb"
36 "go.etcd.io/etcd/server/v3/wal/walpb"
37 "go.uber.org/zap/zaptest"
38
39 "go.uber.org/zap"
40 )
41
42 var (
43 confState = raftpb.ConfState{
44 Voters: []uint64{0x00ffca74},
45 AutoLeave: false,
46 }
47 )
48
49 func TestNew(t *testing.T) {
50 p, err := ioutil.TempDir(t.TempDir(), "waltest")
51 if err != nil {
52 t.Fatal(err)
53 }
54 defer os.RemoveAll(p)
55
56 w, err := Create(zap.NewExample(), p, []byte("somedata"))
57 if err != nil {
58 t.Fatalf("err = %v, want nil", err)
59 }
60 if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
61 t.Errorf("name = %+v, want %+v", g, walName(0, 0))
62 }
63 defer w.Close()
64
65
66 off, err := w.tail().Seek(0, io.SeekCurrent)
67 if err != nil {
68 t.Fatal(err)
69 }
70 gd := make([]byte, off)
71 f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
72 if err != nil {
73 t.Fatal(err)
74 }
75 defer f.Close()
76 if _, err = io.ReadFull(f, gd); err != nil {
77 t.Fatalf("err = %v, want nil", err)
78 }
79
80 var wb bytes.Buffer
81 e := newEncoder(&wb, 0, 0)
82 err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
83 if err != nil {
84 t.Fatalf("err = %v, want nil", err)
85 }
86 err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")})
87 if err != nil {
88 t.Fatalf("err = %v, want nil", err)
89 }
90 r := &walpb.Record{
91 Type: snapshotType,
92 Data: pbutil.MustMarshal(&walpb.Snapshot{}),
93 }
94 if err = e.encode(r); err != nil {
95 t.Fatalf("err = %v, want nil", err)
96 }
97 e.flush()
98 if !bytes.Equal(gd, wb.Bytes()) {
99 t.Errorf("data = %v, want %v", gd, wb.Bytes())
100 }
101 }
102
103 func TestCreateFailFromPollutedDir(t *testing.T) {
104 p, err := ioutil.TempDir(t.TempDir(), "waltest")
105 if err != nil {
106 t.Fatal(err)
107 }
108 defer os.RemoveAll(p)
109 ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary)
110
111 _, err = Create(zap.NewExample(), p, []byte("data"))
112 if err != os.ErrExist {
113 t.Fatalf("expected %v, got %v", os.ErrExist, err)
114 }
115 }
116
117 func TestWalCleanup(t *testing.T) {
118 testRoot, err := ioutil.TempDir(t.TempDir(), "waltestroot")
119 if err != nil {
120 t.Fatal(err)
121 }
122 p, err := ioutil.TempDir(testRoot, "waltest")
123 if err != nil {
124 t.Fatal(err)
125 }
126 defer os.RemoveAll(testRoot)
127
128 logger := zap.NewExample()
129 w, err := Create(logger, p, []byte(""))
130 if err != nil {
131 t.Fatalf("err = %v, want nil", err)
132 }
133 w.cleanupWAL(logger)
134 fnames, err := fileutil.ReadDir(testRoot)
135 if err != nil {
136 t.Fatalf("err = %v, want nil", err)
137 }
138 if len(fnames) != 1 {
139 t.Fatalf("expected 1 file under %v, got %v", testRoot, len(fnames))
140 }
141 pattern := fmt.Sprintf(`%s.broken\.[\d]{8}\.[\d]{6}\.[\d]{1,6}?`, filepath.Base(p))
142 match, _ := regexp.MatchString(pattern, fnames[0])
143 if !match {
144 t.Errorf("match = false, expected true for %v with pattern %v", fnames[0], pattern)
145 }
146 }
147
148 func TestCreateFailFromNoSpaceLeft(t *testing.T) {
149 p, err := ioutil.TempDir(t.TempDir(), "waltest")
150 if err != nil {
151 t.Fatal(err)
152 }
153 defer os.RemoveAll(p)
154
155 oldSegmentSizeBytes := SegmentSizeBytes
156 defer func() {
157 SegmentSizeBytes = oldSegmentSizeBytes
158 }()
159 SegmentSizeBytes = math.MaxInt64
160
161 _, err = Create(zap.NewExample(), p, []byte("data"))
162 if err == nil {
163 t.Fatalf("expected error 'no space left on device', got nil")
164 }
165 }
166
167 func TestNewForInitedDir(t *testing.T) {
168 p, err := ioutil.TempDir(t.TempDir(), "waltest")
169 if err != nil {
170 t.Fatal(err)
171 }
172 defer os.RemoveAll(p)
173
174 os.Create(filepath.Join(p, walName(0, 0)))
175 if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist {
176 t.Errorf("err = %v, want %v", err, os.ErrExist)
177 }
178 }
179
180 func TestOpenAtIndex(t *testing.T) {
181 dir, err := ioutil.TempDir(t.TempDir(), "waltest")
182 if err != nil {
183 t.Fatal(err)
184 }
185 defer os.RemoveAll(dir)
186
187 f, err := os.Create(filepath.Join(dir, walName(0, 0)))
188 if err != nil {
189 t.Fatal(err)
190 }
191 f.Close()
192
193 w, err := Open(zap.NewExample(), dir, walpb.Snapshot{})
194 if err != nil {
195 t.Fatalf("err = %v, want nil", err)
196 }
197 if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
198 t.Errorf("name = %+v, want %+v", g, walName(0, 0))
199 }
200 if w.seq() != 0 {
201 t.Errorf("seq = %d, want %d", w.seq(), 0)
202 }
203 w.Close()
204
205 wname := walName(2, 10)
206 f, err = os.Create(filepath.Join(dir, wname))
207 if err != nil {
208 t.Fatal(err)
209 }
210 f.Close()
211
212 w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5})
213 if err != nil {
214 t.Fatalf("err = %v, want nil", err)
215 }
216 if g := filepath.Base(w.tail().Name()); g != wname {
217 t.Errorf("name = %+v, want %+v", g, wname)
218 }
219 if w.seq() != 2 {
220 t.Errorf("seq = %d, want %d", w.seq(), 2)
221 }
222 w.Close()
223
224 emptydir, err := ioutil.TempDir(t.TempDir(), "waltestempty")
225 if err != nil {
226 t.Fatal(err)
227 }
228 defer os.RemoveAll(emptydir)
229 if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
230 t.Errorf("err = %v, want %v", err, ErrFileNotFound)
231 }
232 }
233
234
235
236
237 func TestVerify(t *testing.T) {
238 lg := zaptest.NewLogger(t)
239 walDir, err := ioutil.TempDir(t.TempDir(), "waltest")
240 if err != nil {
241 t.Fatal(err)
242 }
243
244
245 w, err := Create(lg, walDir, nil)
246 if err != nil {
247 t.Fatal(err)
248 }
249 defer w.Close()
250
251
252 for i := 0; i < 5; i++ {
253 es := []raftpb.Entry{{Index: uint64(i), Data: []byte(fmt.Sprintf("waldata%d", i+1))}}
254 if err = w.Save(raftpb.HardState{}, es); err != nil {
255 t.Fatal(err)
256 }
257 if err = w.cut(); err != nil {
258 t.Fatal(err)
259 }
260 }
261
262 hs := raftpb.HardState{Term: 1, Vote: 3, Commit: 5}
263 assert.NoError(t, w.Save(hs, nil))
264
265
266 hardstate, err := Verify(lg, walDir, walpb.Snapshot{})
267 if err != nil {
268 t.Errorf("expected a nil error, got %v", err)
269 }
270 assert.Equal(t, hs, *hardstate)
271
272 walFiles, err := ioutil.ReadDir(walDir)
273 if err != nil {
274 t.Fatal(err)
275 }
276
277
278 err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0)
279 if err != nil {
280 t.Fatal(err)
281 }
282
283 _, err = Verify(lg, walDir, walpb.Snapshot{})
284 if err == nil {
285 t.Error("expected a non-nil error, got nil")
286 }
287 }
288
289
290
291 func TestCut(t *testing.T) {
292 p, err := ioutil.TempDir(t.TempDir(), "waltest")
293 if err != nil {
294 t.Fatal(err)
295 }
296 defer os.RemoveAll(p)
297
298 w, err := Create(zap.NewExample(), p, nil)
299 if err != nil {
300 t.Fatal(err)
301 }
302 defer w.Close()
303
304 state := raftpb.HardState{Term: 1}
305 if err = w.Save(state, nil); err != nil {
306 t.Fatal(err)
307 }
308 if err = w.cut(); err != nil {
309 t.Fatal(err)
310 }
311 wname := walName(1, 1)
312 if g := filepath.Base(w.tail().Name()); g != wname {
313 t.Errorf("name = %s, want %s", g, wname)
314 }
315
316 es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}}
317 if err = w.Save(raftpb.HardState{}, es); err != nil {
318 t.Fatal(err)
319 }
320 if err = w.cut(); err != nil {
321 t.Fatal(err)
322 }
323 snap := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
324 if err = w.SaveSnapshot(snap); err != nil {
325 t.Fatal(err)
326 }
327 wname = walName(2, 2)
328 if g := filepath.Base(w.tail().Name()); g != wname {
329 t.Errorf("name = %s, want %s", g, wname)
330 }
331
332
333
334
335 f, err := os.Open(filepath.Join(p, wname))
336 if err != nil {
337 t.Fatal(err)
338 }
339 defer f.Close()
340 nw := &WAL{
341 decoder: newDecoder(fileutil.NewFileReader(f)),
342 start: snap,
343 }
344 _, gst, _, err := nw.ReadAll()
345 if err != nil {
346 t.Fatal(err)
347 }
348 if !reflect.DeepEqual(gst, state) {
349 t.Errorf("state = %+v, want %+v", gst, state)
350 }
351 }
352
353 func TestSaveWithCut(t *testing.T) {
354 p, err := ioutil.TempDir(t.TempDir(), "waltest")
355 if err != nil {
356 t.Fatal(err)
357 }
358 defer os.RemoveAll(p)
359
360 w, err := Create(zap.NewExample(), p, []byte("metadata"))
361 if err != nil {
362 t.Fatal(err)
363 }
364
365 state := raftpb.HardState{Term: 1}
366 if err = w.Save(state, nil); err != nil {
367 t.Fatal(err)
368 }
369 bigData := make([]byte, 500)
370 strdata := "Hello World!!"
371 copy(bigData, strdata)
372
373 restoreLater := SegmentSizeBytes
374 const EntrySize int = 500
375 SegmentSizeBytes = 2 * 1024
376 defer func() { SegmentSizeBytes = restoreLater }()
377 index := uint64(0)
378 for totalSize := 0; totalSize < int(SegmentSizeBytes); totalSize += EntrySize {
379 ents := []raftpb.Entry{{Index: index, Term: 1, Data: bigData}}
380 if err = w.Save(state, ents); err != nil {
381 t.Fatal(err)
382 }
383 index++
384 }
385
386 w.Close()
387
388 neww, err := Open(zap.NewExample(), p, walpb.Snapshot{})
389 if err != nil {
390 t.Fatalf("err = %v, want nil", err)
391 }
392 defer neww.Close()
393 wname := walName(1, index)
394 if g := filepath.Base(neww.tail().Name()); g != wname {
395 t.Errorf("name = %s, want %s", g, wname)
396 }
397
398 _, newhardstate, entries, err := neww.ReadAll()
399 if err != nil {
400 t.Fatal(err)
401 }
402
403 if !reflect.DeepEqual(newhardstate, state) {
404 t.Errorf("Hard State = %+v, want %+v", newhardstate, state)
405 }
406 if len(entries) != int(SegmentSizeBytes/int64(EntrySize)) {
407 t.Errorf("Number of entries = %d, expected = %d", len(entries), int(SegmentSizeBytes/int64(EntrySize)))
408 }
409 for _, oneent := range entries {
410 if !bytes.Equal(oneent.Data, bigData) {
411 t.Errorf("the saved data does not match at Index %d : found: %s , want :%s", oneent.Index, oneent.Data, bigData)
412 }
413 }
414 }
415
416 func TestRecover(t *testing.T) {
417 cases := []struct {
418 name string
419 size int
420 }{
421 {
422 name: "10MB",
423 size: 10 * 1024 * 1024,
424 },
425 {
426 name: "20MB",
427 size: 20 * 1024 * 1024,
428 },
429 {
430 name: "40MB",
431 size: 40 * 1024 * 1024,
432 },
433 }
434
435 for _, tc := range cases {
436 t.Run(tc.name, func(t *testing.T) {
437 p := t.TempDir()
438
439 w, err := Create(zap.NewExample(), p, []byte("metadata"))
440 if err != nil {
441 t.Fatal(err)
442 }
443 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
444 t.Fatal(err)
445 }
446
447 data := make([]byte, tc.size)
448 n, err := rand.Read(data)
449 assert.Equal(t, tc.size, n)
450 if err != nil {
451 t.Errorf("Unexpected error: %v", err)
452 }
453 ents := []raftpb.Entry{{Index: 1, Term: 1, Data: data}, {Index: 2, Term: 2, Data: data}}
454 if err = w.Save(raftpb.HardState{}, ents); err != nil {
455 t.Fatal(err)
456 }
457 sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
458 for _, s := range sts {
459 if err = w.Save(s, nil); err != nil {
460 t.Fatal(err)
461 }
462 }
463 w.Close()
464
465 if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
466 t.Fatal(err)
467 }
468 metadata, state, entries, err := w.ReadAll()
469 if err != nil {
470 t.Fatal(err)
471 }
472
473 if !bytes.Equal(metadata, []byte("metadata")) {
474 t.Errorf("metadata = %s, want %s", metadata, "metadata")
475 }
476 if !reflect.DeepEqual(entries, ents) {
477 t.Errorf("ents = %+v, want %+v", entries, ents)
478 }
479
480 s := sts[len(sts)-1]
481 if !reflect.DeepEqual(state, s) {
482 t.Errorf("state = %+v, want %+v", state, s)
483 }
484 w.Close()
485 })
486 }
487 }
488
489 func TestSearchIndex(t *testing.T) {
490 tests := []struct {
491 names []string
492 index uint64
493 widx int
494 wok bool
495 }{
496 {
497 []string{
498 "0000000000000000-0000000000000000.wal",
499 "0000000000000001-0000000000001000.wal",
500 "0000000000000002-0000000000002000.wal",
501 },
502 0x1000, 1, true,
503 },
504 {
505 []string{
506 "0000000000000001-0000000000004000.wal",
507 "0000000000000002-0000000000003000.wal",
508 "0000000000000003-0000000000005000.wal",
509 },
510 0x4000, 1, true,
511 },
512 {
513 []string{
514 "0000000000000001-0000000000002000.wal",
515 "0000000000000002-0000000000003000.wal",
516 "0000000000000003-0000000000005000.wal",
517 },
518 0x1000, -1, false,
519 },
520 }
521 for i, tt := range tests {
522 idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index)
523 if idx != tt.widx {
524 t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx)
525 }
526 if ok != tt.wok {
527 t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
528 }
529 }
530 }
531
532 func TestScanWalName(t *testing.T) {
533 tests := []struct {
534 str string
535 wseq, windex uint64
536 wok bool
537 }{
538 {"0000000000000000-0000000000000000.wal", 0, 0, true},
539 {"0000000000000000.wal", 0, 0, false},
540 {"0000000000000000-0000000000000000.snap", 0, 0, false},
541 }
542 for i, tt := range tests {
543 s, index, err := parseWALName(tt.str)
544 if g := err == nil; g != tt.wok {
545 t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok)
546 }
547 if s != tt.wseq {
548 t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq)
549 }
550 if index != tt.windex {
551 t.Errorf("#%d: index = %d, want %d", i, index, tt.windex)
552 }
553 }
554 }
555
556 func TestRecoverAfterCut(t *testing.T) {
557 p, err := ioutil.TempDir(t.TempDir(), "waltest")
558 if err != nil {
559 t.Fatal(err)
560 }
561 defer os.RemoveAll(p)
562
563 md, err := Create(zap.NewExample(), p, []byte("metadata"))
564 if err != nil {
565 t.Fatal(err)
566 }
567 for i := 0; i < 10; i++ {
568 if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i), Term: 1, ConfState: &confState}); err != nil {
569 t.Fatal(err)
570 }
571 es := []raftpb.Entry{{Index: uint64(i)}}
572 if err = md.Save(raftpb.HardState{}, es); err != nil {
573 t.Fatal(err)
574 }
575 if err = md.cut(); err != nil {
576 t.Fatal(err)
577 }
578 }
579 md.Close()
580
581 if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
582 t.Fatal(err)
583 }
584
585 for i := 0; i < 10; i++ {
586 w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i), Term: 1})
587 if err != nil {
588 if i <= 4 {
589 if err != ErrFileNotFound {
590 t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound)
591 }
592 } else {
593 t.Errorf("#%d: err = %v, want nil", i, err)
594 }
595 continue
596 }
597 metadata, _, entries, err := w.ReadAll()
598 if err != nil {
599 t.Errorf("#%d: err = %v, want nil", i, err)
600 continue
601 }
602 if !bytes.Equal(metadata, []byte("metadata")) {
603 t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
604 }
605 for j, e := range entries {
606 if e.Index != uint64(j+i+1) {
607 t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
608 }
609 }
610 w.Close()
611 }
612 }
613
614 func TestOpenAtUncommittedIndex(t *testing.T) {
615 p, err := ioutil.TempDir(t.TempDir(), "waltest")
616 if err != nil {
617 t.Fatal(err)
618 }
619 defer os.RemoveAll(p)
620
621 w, err := Create(zap.NewExample(), p, nil)
622 if err != nil {
623 t.Fatal(err)
624 }
625 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
626 t.Fatal(err)
627 }
628 if err = w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil {
629 t.Fatal(err)
630 }
631 w.Close()
632
633 w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
634 if err != nil {
635 t.Fatal(err)
636 }
637
638 if _, _, _, err = w.ReadAll(); err != nil {
639 t.Errorf("err = %v, want nil", err)
640 }
641 w.Close()
642 }
643
644
645
646
647
648 func TestOpenForRead(t *testing.T) {
649 p, err := ioutil.TempDir(t.TempDir(), "waltest")
650 if err != nil {
651 t.Fatal(err)
652 }
653 defer os.RemoveAll(p)
654
655 w, err := Create(zap.NewExample(), p, nil)
656 if err != nil {
657 t.Fatal(err)
658 }
659 defer w.Close()
660
661 for i := 0; i < 10; i++ {
662 es := []raftpb.Entry{{Index: uint64(i)}}
663 if err = w.Save(raftpb.HardState{}, es); err != nil {
664 t.Fatal(err)
665 }
666 if err = w.cut(); err != nil {
667 t.Fatal(err)
668 }
669 }
670
671 unlockIndex := uint64(5)
672 w.ReleaseLockTo(unlockIndex)
673
674
675 w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
676 if err != nil {
677 t.Fatal(err)
678 }
679 defer w2.Close()
680 _, _, ents, err := w2.ReadAll()
681 if err != nil {
682 t.Fatalf("err = %v, want nil", err)
683 }
684 if g := ents[len(ents)-1].Index; g != 9 {
685 t.Errorf("last index read = %d, want %d", g, 9)
686 }
687 }
688
689 func TestOpenWithMaxIndex(t *testing.T) {
690 p, err := ioutil.TempDir(t.TempDir(), "waltest")
691 if err != nil {
692 t.Fatal(err)
693 }
694 defer os.RemoveAll(p)
695
696 w1, err := Create(zap.NewExample(), p, nil)
697 if err != nil {
698 t.Fatal(err)
699 }
700 defer func() {
701 if w1 != nil {
702 w1.Close()
703 }
704 }()
705
706 es := []raftpb.Entry{{Index: uint64(math.MaxInt64)}}
707 if err = w1.Save(raftpb.HardState{}, es); err != nil {
708 t.Fatal(err)
709 }
710 w1.Close()
711 w1 = nil
712
713 w2, err := Open(zap.NewExample(), p, walpb.Snapshot{})
714 if err != nil {
715 t.Fatal(err)
716 }
717 defer w2.Close()
718
719 _, _, _, err = w2.ReadAll()
720 if err != ErrSliceOutOfRange {
721 t.Fatalf("err = %v, want ErrSliceOutOfRange", err)
722 }
723 }
724
725 func TestSaveEmpty(t *testing.T) {
726 var buf bytes.Buffer
727 var est raftpb.HardState
728 w := WAL{
729 encoder: newEncoder(&buf, 0, 0),
730 }
731 if err := w.saveState(&est); err != nil {
732 t.Errorf("err = %v, want nil", err)
733 }
734 if len(buf.Bytes()) != 0 {
735 t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes()))
736 }
737 }
738
739 func TestReleaseLockTo(t *testing.T) {
740 p, err := ioutil.TempDir(t.TempDir(), "waltest")
741 if err != nil {
742 t.Fatal(err)
743 }
744 defer os.RemoveAll(p)
745
746 w, err := Create(zap.NewExample(), p, nil)
747 defer func() {
748 if err = w.Close(); err != nil {
749 t.Fatal(err)
750 }
751 }()
752 if err != nil {
753 t.Fatal(err)
754 }
755
756
757 err = w.ReleaseLockTo(10)
758 if err != nil {
759 t.Errorf("err = %v, want nil", err)
760 }
761
762
763 for i := 0; i < 10; i++ {
764 es := []raftpb.Entry{{Index: uint64(i)}}
765 if err = w.Save(raftpb.HardState{}, es); err != nil {
766 t.Fatal(err)
767 }
768 if err = w.cut(); err != nil {
769 t.Fatal(err)
770 }
771 }
772
773 unlockIndex := uint64(5)
774 w.ReleaseLockTo(unlockIndex)
775
776
777 if len(w.locks) != 7 {
778 t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7)
779 }
780 for i, l := range w.locks {
781 var lockIndex uint64
782 _, lockIndex, err = parseWALName(filepath.Base(l.Name()))
783 if err != nil {
784 t.Fatal(err)
785 }
786
787 if lockIndex != uint64(i+4) {
788 t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
789 }
790 }
791
792
793 unlockIndex = uint64(15)
794 w.ReleaseLockTo(unlockIndex)
795
796
797 if len(w.locks) != 1 {
798 t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
799 }
800 _, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name()))
801 if err != nil {
802 t.Fatal(err)
803 }
804
805 if lockIndex != uint64(10) {
806 t.Errorf("lockindex = %d, want %d", lockIndex, 10)
807 }
808 }
809
810
811 func TestTailWriteNoSlackSpace(t *testing.T) {
812 p, err := ioutil.TempDir(t.TempDir(), "waltest")
813 if err != nil {
814 t.Fatal(err)
815 }
816 defer os.RemoveAll(p)
817
818
819 w, err := Create(zap.NewExample(), p, []byte("metadata"))
820 if err != nil {
821 t.Fatal(err)
822 }
823
824 for i := 1; i <= 5; i++ {
825 es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
826 if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
827 t.Fatal(err)
828 }
829 }
830
831 off, serr := w.tail().Seek(0, io.SeekCurrent)
832 if serr != nil {
833 t.Fatal(serr)
834 }
835 if terr := w.tail().Truncate(off); terr != nil {
836 t.Fatal(terr)
837 }
838 w.Close()
839
840
841 w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
842 if err != nil {
843 t.Fatal(err)
844 }
845 _, _, ents, rerr := w.ReadAll()
846 if rerr != nil {
847 t.Fatal(rerr)
848 }
849 if len(ents) != 5 {
850 t.Fatalf("got entries %+v, expected 5 entries", ents)
851 }
852
853 for i := 6; i <= 10; i++ {
854 es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
855 if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
856 t.Fatal(err)
857 }
858 }
859 w.Close()
860
861
862 w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
863 if err != nil {
864 t.Fatal(err)
865 }
866 _, _, ents, rerr = w.ReadAll()
867 if rerr != nil {
868 t.Fatal(rerr)
869 }
870 if len(ents) != 10 {
871 t.Fatalf("got entries %+v, expected 10 entries", ents)
872 }
873 w.Close()
874 }
875
876
877 func TestRestartCreateWal(t *testing.T) {
878 p, err := ioutil.TempDir(t.TempDir(), "waltest")
879 if err != nil {
880 t.Fatal(err)
881 }
882 defer os.RemoveAll(p)
883
884
885 tmpdir := filepath.Clean(p) + ".tmp"
886 if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
887 t.Fatal(err)
888 }
889 if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
890 t.Fatal(err)
891 }
892
893 w, werr := Create(zap.NewExample(), p, []byte("abc"))
894 if werr != nil {
895 t.Fatal(werr)
896 }
897 w.Close()
898 if Exist(tmpdir) {
899 t.Fatalf("got %q exists, expected it to not exist", tmpdir)
900 }
901
902 if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
903 t.Fatal(err)
904 }
905 defer w.Close()
906
907 if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" {
908 t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
909 }
910 }
911
912
913 func TestOpenOnTornWrite(t *testing.T) {
914 maxEntries := 40
915 clobberIdx := 20
916 overwriteEntries := 5
917
918 p, err := ioutil.TempDir(t.TempDir(), "waltest")
919 if err != nil {
920 t.Fatal(err)
921 }
922 defer os.RemoveAll(p)
923 w, err := Create(zap.NewExample(), p, nil)
924 defer func() {
925 if err = w.Close(); err != nil && err != os.ErrInvalid {
926 t.Fatal(err)
927 }
928 }()
929 if err != nil {
930 t.Fatal(err)
931 }
932
933
934 offsets := make([]int64, maxEntries)
935 for i := range offsets {
936 es := []raftpb.Entry{{Index: uint64(i)}}
937 if err = w.Save(raftpb.HardState{}, es); err != nil {
938 t.Fatal(err)
939 }
940 if offsets[i], err = w.tail().Seek(0, io.SeekCurrent); err != nil {
941 t.Fatal(err)
942 }
943 }
944
945 fn := filepath.Join(p, filepath.Base(w.tail().Name()))
946 w.Close()
947
948
949 f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
950 if ferr != nil {
951 t.Fatal(ferr)
952 }
953 defer f.Close()
954 _, err = f.Seek(offsets[clobberIdx], io.SeekStart)
955 if err != nil {
956 t.Fatal(err)
957 }
958 zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
959 _, err = f.Write(zeros)
960 if err != nil {
961 t.Fatal(err)
962 }
963 f.Close()
964
965 w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
966 if err != nil {
967 t.Fatal(err)
968 }
969
970 _, _, _, err = w.ReadAll()
971 if err != nil {
972 t.Fatal(err)
973 }
974
975
976 for i := 0; i < overwriteEntries; i++ {
977
978 es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
979 if err = w.Save(raftpb.HardState{}, es); err != nil {
980 t.Fatal(err)
981 }
982 }
983 w.Close()
984
985
986 w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
987 if err != nil {
988 t.Fatal(err)
989 }
990
991 _, _, ents, rerr := w.ReadAll()
992 if rerr != nil {
993
994 t.Fatal(rerr)
995 }
996 wEntries := (clobberIdx - 1) + overwriteEntries
997 if len(ents) != wEntries {
998 t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
999 }
1000 }
1001
1002 func TestRenameFail(t *testing.T) {
1003 p, err := ioutil.TempDir(t.TempDir(), "waltest")
1004 if err != nil {
1005 t.Fatal(err)
1006 }
1007 defer os.RemoveAll(p)
1008
1009 oldSegmentSizeBytes := SegmentSizeBytes
1010 defer func() {
1011 SegmentSizeBytes = oldSegmentSizeBytes
1012 }()
1013 SegmentSizeBytes = math.MaxInt64
1014
1015 tp, terr := ioutil.TempDir(t.TempDir(), "waltest")
1016 if terr != nil {
1017 t.Fatal(terr)
1018 }
1019 os.RemoveAll(tp)
1020
1021 w := &WAL{
1022 lg: zap.NewExample(),
1023 dir: p,
1024 }
1025 w2, werr := w.renameWAL(tp)
1026 if w2 != nil || werr == nil {
1027 t.Fatalf("expected error, got %v", werr)
1028 }
1029 }
1030
1031
1032 func TestReadAllFail(t *testing.T) {
1033 dir, err := ioutil.TempDir(t.TempDir(), "waltest")
1034 if err != nil {
1035 t.Fatal(err)
1036 }
1037 defer os.RemoveAll(dir)
1038
1039
1040 f, err := Create(zap.NewExample(), dir, []byte("metadata"))
1041 if err != nil {
1042 t.Fatal(err)
1043 }
1044 f.Close()
1045
1046 _, _, _, err = f.ReadAll()
1047 if err == nil || err != ErrDecoderNotFound {
1048 t.Fatalf("err = %v, want ErrDecoderNotFound", err)
1049 }
1050 }
1051
1052
1053
1054 func TestValidSnapshotEntries(t *testing.T) {
1055 p, err := ioutil.TempDir(t.TempDir(), "waltest")
1056 if err != nil {
1057 t.Fatal(err)
1058 }
1059 defer os.RemoveAll(p)
1060 snap0 := walpb.Snapshot{}
1061 snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
1062 state1 := raftpb.HardState{Commit: 1, Term: 1}
1063 snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
1064 snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
1065 state2 := raftpb.HardState{Commit: 3, Term: 2}
1066 snap4 := walpb.Snapshot{Index: 4, Term: 2, ConfState: &confState}
1067 func() {
1068 w, err := Create(zap.NewExample(), p, nil)
1069 if err != nil {
1070 t.Fatal(err)
1071 }
1072 defer w.Close()
1073
1074
1075 if err = w.SaveSnapshot(snap1); err != nil {
1076 t.Fatal(err)
1077 }
1078 if err = w.Save(state1, nil); err != nil {
1079 t.Fatal(err)
1080 }
1081 if err = w.SaveSnapshot(snap2); err != nil {
1082 t.Fatal(err)
1083 }
1084 if err = w.SaveSnapshot(snap3); err != nil {
1085 t.Fatal(err)
1086 }
1087 if err = w.Save(state2, nil); err != nil {
1088 t.Fatal(err)
1089 }
1090 if err = w.SaveSnapshot(snap4); err != nil {
1091 t.Fatal(err)
1092 }
1093 }()
1094 walSnaps, err := ValidSnapshotEntries(zap.NewExample(), p)
1095 if err != nil {
1096 t.Fatal(err)
1097 }
1098 expected := []walpb.Snapshot{snap0, snap1, snap2, snap3}
1099 if !reflect.DeepEqual(walSnaps, expected) {
1100 t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps)
1101 }
1102 }
1103
1104
1105
1106 func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
1107 oldSegmentSizeBytes := SegmentSizeBytes
1108 SegmentSizeBytes = 64
1109 defer func() {
1110 SegmentSizeBytes = oldSegmentSizeBytes
1111 }()
1112 p, err := ioutil.TempDir(t.TempDir(), "waltest")
1113 if err != nil {
1114 t.Fatal(err)
1115 }
1116 defer os.RemoveAll(p)
1117 snap0 := walpb.Snapshot{}
1118 snap1 := walpb.Snapshot{Index: 1, Term: 1, ConfState: &confState}
1119 state1 := raftpb.HardState{Commit: 1, Term: 1}
1120 snap2 := walpb.Snapshot{Index: 2, Term: 1, ConfState: &confState}
1121 snap3 := walpb.Snapshot{Index: 3, Term: 2, ConfState: &confState}
1122 state2 := raftpb.HardState{Commit: 3, Term: 2}
1123 func() {
1124 w, err := Create(zap.NewExample(), p, nil)
1125 if err != nil {
1126 t.Fatal(err)
1127 }
1128 defer w.Close()
1129
1130
1131 if err = w.SaveSnapshot(snap1); err != nil {
1132 t.Fatal(err)
1133 }
1134 if err = w.Save(state1, nil); err != nil {
1135 t.Fatal(err)
1136 }
1137 if err = w.SaveSnapshot(snap2); err != nil {
1138 t.Fatal(err)
1139 }
1140 if err = w.SaveSnapshot(snap3); err != nil {
1141 t.Fatal(err)
1142 }
1143 for i := 0; i < 128; i++ {
1144 if err = w.Save(state2, nil); err != nil {
1145 t.Fatal(err)
1146 }
1147 }
1148
1149 }()
1150 files, _, err := selectWALFiles(nil, p, snap0)
1151 if err != nil {
1152 t.Fatal(err)
1153 }
1154 os.Remove(p + "/" + files[0])
1155 _, err = ValidSnapshotEntries(zap.NewExample(), p)
1156 if err != nil {
1157 t.Fatal(err)
1158 }
1159 }
1160
1161 func TestLastRecordLengthExceedFileEnd(t *testing.T) {
1162
1178
1179 t.Log("Generate a WAL file with the last record's length modified.")
1180 data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" +
1181 "\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" +
1182 "\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" +
1183 "\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" +
1184 "\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" +
1185 "\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" +
1186 "\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" +
1187 "\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" +
1188 "\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" +
1189 "\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" +
1190 "\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" +
1191 "\x00\x00\x00")
1192
1193 buf := bytes.NewBuffer(data)
1194 f, err := createFileWithData(t, buf)
1195 fileName := f.Name()
1196 require.NoError(t, err)
1197 t.Logf("fileName: %v", fileName)
1198
1199
1200 t.Log("Verify all records can be parsed correctly.")
1201 rec := &walpb.Record{}
1202 decoder := newDecoder(fileutil.NewFileReader(f))
1203 for {
1204 if err = decoder.decode(rec); err != nil {
1205 require.ErrorIs(t, err, io.ErrUnexpectedEOF)
1206 break
1207 }
1208 if rec.Type == entryType {
1209 e := mustUnmarshalEntry(rec.Data)
1210 t.Logf("Validating normal entry: %v", e)
1211 recData := fmt.Sprintf("waldata%d", e.Index)
1212 require.Equal(t, raftpb.EntryNormal, e.Type)
1213 require.Equal(t, recData, string(e.Data))
1214 }
1215 rec = &walpb.Record{}
1216 }
1217 require.NoError(t, f.Close())
1218
1219
1220 t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain")
1221 newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal")
1222 require.NoError(t, os.Rename(fileName, newFileName))
1223
1224 w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{
1225 Index: 0,
1226 Term: 0,
1227 })
1228 require.NoError(t, err)
1229 defer w.Close()
1230
1231 _, _, _, err = w.ReadAll()
1232
1233
1234 require.ErrorIs(t, err, io.ErrUnexpectedEOF)
1235 }
1236
View as plain text