1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package wal
16
17 import (
18 "bytes"
19 "errors"
20 "fmt"
21 "hash/crc32"
22 "io"
23 "os"
24 "path/filepath"
25 "strings"
26 "sync"
27 "time"
28
29 "go.etcd.io/etcd/client/pkg/v3/fileutil"
30 "go.etcd.io/etcd/pkg/v3/pbutil"
31 "go.etcd.io/etcd/raft/v3"
32 "go.etcd.io/etcd/raft/v3/raftpb"
33 "go.etcd.io/etcd/server/v3/wal/walpb"
34
35 "go.uber.org/zap"
36 )
37
38 const (
39 metadataType int64 = iota + 1
40 entryType
41 stateType
42 crcType
43 snapshotType
44
45
46
47 warnSyncDuration = time.Second
48 )
49
50 var (
51
52
53
54
55 SegmentSizeBytes int64 = 64 * 1000 * 1000
56
57 ErrMetadataConflict = errors.New("wal: conflicting metadata found")
58 ErrFileNotFound = errors.New("wal: file not found")
59 ErrCRCMismatch = errors.New("wal: crc mismatch")
60 ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
61 ErrSnapshotNotFound = errors.New("wal: snapshot not found")
62 ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
63 ErrDecoderNotFound = errors.New("wal: decoder not found")
64 crcTable = crc32.MakeTable(crc32.Castagnoli)
65 )
66
67
68
69
70
71
72 type WAL struct {
73 lg *zap.Logger
74
75 dir string
76
77
78 dirFile *os.File
79
80 metadata []byte
81 state raftpb.HardState
82
83 start walpb.Snapshot
84 decoder *decoder
85 readClose func() error
86
87 unsafeNoSync bool
88
89 mu sync.Mutex
90 enti uint64
91 encoder *encoder
92
93 locks []*fileutil.LockedFile
94 fp *filePipeline
95 }
96
97
98
99
100 func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
101 if Exist(dirpath) {
102 return nil, os.ErrExist
103 }
104
105 if lg == nil {
106 lg = zap.NewNop()
107 }
108
109
110 tmpdirpath := filepath.Clean(dirpath) + ".tmp"
111 if fileutil.Exist(tmpdirpath) {
112 if err := os.RemoveAll(tmpdirpath); err != nil {
113 return nil, err
114 }
115 }
116 defer os.RemoveAll(tmpdirpath)
117
118 if err := fileutil.CreateDirAll(lg, tmpdirpath); err != nil {
119 lg.Warn(
120 "failed to create a temporary WAL directory",
121 zap.String("tmp-dir-path", tmpdirpath),
122 zap.String("dir-path", dirpath),
123 zap.Error(err),
124 )
125 return nil, err
126 }
127
128 p := filepath.Join(tmpdirpath, walName(0, 0))
129 f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
130 if err != nil {
131 lg.Warn(
132 "failed to flock an initial WAL file",
133 zap.String("path", p),
134 zap.Error(err),
135 )
136 return nil, err
137 }
138 if _, err = f.Seek(0, io.SeekEnd); err != nil {
139 lg.Warn(
140 "failed to seek an initial WAL file",
141 zap.String("path", p),
142 zap.Error(err),
143 )
144 return nil, err
145 }
146 if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
147 lg.Warn(
148 "failed to preallocate an initial WAL file",
149 zap.String("path", p),
150 zap.Int64("segment-bytes", SegmentSizeBytes),
151 zap.Error(err),
152 )
153 return nil, err
154 }
155
156 w := &WAL{
157 lg: lg,
158 dir: dirpath,
159 metadata: metadata,
160 }
161 w.encoder, err = newFileEncoder(f.File, 0)
162 if err != nil {
163 return nil, err
164 }
165 w.locks = append(w.locks, f)
166 if err = w.saveCrc(0); err != nil {
167 return nil, err
168 }
169 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
170 return nil, err
171 }
172 if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
173 return nil, err
174 }
175
176 logDirPath := w.dir
177 if w, err = w.renameWAL(tmpdirpath); err != nil {
178 lg.Warn(
179 "failed to rename the temporary WAL directory",
180 zap.String("tmp-dir-path", tmpdirpath),
181 zap.String("dir-path", logDirPath),
182 zap.Error(err),
183 )
184 return nil, err
185 }
186
187 var perr error
188 defer func() {
189 if perr != nil {
190 w.cleanupWAL(lg)
191 }
192 }()
193
194
195 pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
196 if perr != nil {
197 lg.Warn(
198 "failed to open the parent data directory",
199 zap.String("parent-dir-path", filepath.Dir(w.dir)),
200 zap.String("dir-path", w.dir),
201 zap.Error(perr),
202 )
203 return nil, perr
204 }
205 dirCloser := func() error {
206 if perr = pdir.Close(); perr != nil {
207 lg.Warn(
208 "failed to close the parent data directory file",
209 zap.String("parent-dir-path", filepath.Dir(w.dir)),
210 zap.String("dir-path", w.dir),
211 zap.Error(perr),
212 )
213 return perr
214 }
215 return nil
216 }
217 start := time.Now()
218 if perr = fileutil.Fsync(pdir); perr != nil {
219 dirCloser()
220 lg.Warn(
221 "failed to fsync the parent data directory file",
222 zap.String("parent-dir-path", filepath.Dir(w.dir)),
223 zap.String("dir-path", w.dir),
224 zap.Error(perr),
225 )
226 return nil, perr
227 }
228 walFsyncSec.Observe(time.Since(start).Seconds())
229 if err = dirCloser(); err != nil {
230 return nil, err
231 }
232
233 return w, nil
234 }
235
236 func (w *WAL) SetUnsafeNoFsync() {
237 w.unsafeNoSync = true
238 }
239
240 func (w *WAL) cleanupWAL(lg *zap.Logger) {
241 var err error
242 if err = w.Close(); err != nil {
243 lg.Panic("failed to close WAL during cleanup", zap.Error(err))
244 }
245 brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999"))
246 if err = os.Rename(w.dir, brokenDirName); err != nil {
247 lg.Panic(
248 "failed to rename WAL during cleanup",
249 zap.Error(err),
250 zap.String("source-path", w.dir),
251 zap.String("rename-path", brokenDirName),
252 )
253 }
254 }
255
256 func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
257 if err := os.RemoveAll(w.dir); err != nil {
258 return nil, err
259 }
260
261
262
263
264
265
266 if err := os.Rename(tmpdirpath, w.dir); err != nil {
267 if _, ok := err.(*os.LinkError); ok {
268 return w.renameWALUnlock(tmpdirpath)
269 }
270 return nil, err
271 }
272 w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
273 df, err := fileutil.OpenDir(w.dir)
274 w.dirFile = df
275 return w, err
276 }
277
278 func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
279
280
281 w.lg.Info(
282 "closing WAL to release flock and retry directory renaming",
283 zap.String("from", tmpdirpath),
284 zap.String("to", w.dir),
285 )
286 w.Close()
287
288 if err := os.Rename(tmpdirpath, w.dir); err != nil {
289 return nil, err
290 }
291
292
293 newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
294 if oerr != nil {
295 return nil, oerr
296 }
297 if _, _, _, err := newWAL.ReadAll(); err != nil {
298 newWAL.Close()
299 return nil, err
300 }
301 return newWAL, nil
302 }
303
304
305
306
307
308
309
310 func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
311 w, err := openAtIndex(lg, dirpath, snap, true)
312 if err != nil {
313 return nil, err
314 }
315 if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
316 return nil, err
317 }
318 return w, nil
319 }
320
321
322
323 func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
324 return openAtIndex(lg, dirpath, snap, false)
325 }
326
327 func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
328 if lg == nil {
329 lg = zap.NewNop()
330 }
331 names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
332 if err != nil {
333 return nil, err
334 }
335
336 rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
337 if err != nil {
338 return nil, err
339 }
340
341
342 w := &WAL{
343 lg: lg,
344 dir: dirpath,
345 start: snap,
346 decoder: newDecoder(rs...),
347 readClose: closer,
348 locks: ls,
349 }
350
351 if write {
352
353
354 w.readClose = nil
355 if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
356 closer()
357 return nil, err
358 }
359 w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
360 }
361
362 return w, nil
363 }
364
365 func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
366 names, err := readWALNames(lg, dirpath)
367 if err != nil {
368 return nil, -1, err
369 }
370
371 nameIndex, ok := searchIndex(lg, names, snap.Index)
372 if !ok || !isValidSeq(lg, names[nameIndex:]) {
373 err = ErrFileNotFound
374 return nil, -1, err
375 }
376
377 return names, nameIndex, nil
378 }
379
380 func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) {
381 rcs := make([]io.ReadCloser, 0)
382 rs := make([]fileutil.FileReader, 0)
383 ls := make([]*fileutil.LockedFile, 0)
384 for _, name := range names[nameIndex:] {
385 p := filepath.Join(dirpath, name)
386 var f *os.File
387 if write {
388 l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
389 if err != nil {
390 closeAll(lg, rcs...)
391 return nil, nil, nil, err
392 }
393 ls = append(ls, l)
394 rcs = append(rcs, l)
395 f = l.File
396 } else {
397 rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
398 if err != nil {
399 closeAll(lg, rcs...)
400 return nil, nil, nil, err
401 }
402 ls = append(ls, nil)
403 rcs = append(rcs, rf)
404 f = rf
405 }
406 fileReader := fileutil.NewFileReader(f)
407 rs = append(rs, fileReader)
408 }
409
410 closer := func() error { return closeAll(lg, rcs...) }
411
412 return rs, ls, closer, nil
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432 func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
433 w.mu.Lock()
434 defer w.mu.Unlock()
435
436 rec := &walpb.Record{}
437
438 if w.decoder == nil {
439 return nil, state, nil, ErrDecoderNotFound
440 }
441 decoder := w.decoder
442
443 var match bool
444 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
445 switch rec.Type {
446 case entryType:
447 e := mustUnmarshalEntry(rec.Data)
448
449 if e.Index > w.start.Index {
450
451 up := e.Index - w.start.Index - 1
452 if up > uint64(len(ents)) {
453
454 return nil, state, nil, ErrSliceOutOfRange
455 }
456
457 ents = append(ents[:up], e)
458 }
459 w.enti = e.Index
460
461 case stateType:
462 state = mustUnmarshalState(rec.Data)
463
464 case metadataType:
465 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
466 state.Reset()
467 return nil, state, nil, ErrMetadataConflict
468 }
469 metadata = rec.Data
470
471 case crcType:
472 crc := decoder.crc.Sum32()
473
474
475 if crc != 0 && rec.Validate(crc) != nil {
476 state.Reset()
477 return nil, state, nil, ErrCRCMismatch
478 }
479 decoder.updateCRC(rec.Crc)
480
481 case snapshotType:
482 var snap walpb.Snapshot
483 pbutil.MustUnmarshal(&snap, rec.Data)
484 if snap.Index == w.start.Index {
485 if snap.Term != w.start.Term {
486 state.Reset()
487 return nil, state, nil, ErrSnapshotMismatch
488 }
489 match = true
490 }
491
492 default:
493 state.Reset()
494 return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
495 }
496 }
497
498 switch w.tail() {
499 case nil:
500
501
502
503 if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
504 state.Reset()
505 return nil, state, nil, err
506 }
507 default:
508
509 if !errors.Is(err, io.EOF) {
510 state.Reset()
511 return nil, state, nil, err
512 }
513
514
515
516
517
518
519 if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
520 return nil, state, nil, err
521 }
522 if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
523 return nil, state, nil, err
524 }
525 }
526
527 err = nil
528 if !match {
529 err = ErrSnapshotNotFound
530 }
531
532
533 if w.readClose != nil {
534 w.readClose()
535 w.readClose = nil
536 }
537 w.start = walpb.Snapshot{}
538
539 w.metadata = metadata
540
541 if w.tail() != nil {
542
543 w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
544 if err != nil {
545 return
546 }
547 }
548 w.decoder = nil
549
550 return metadata, state, ents, err
551 }
552
553
554
555 func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
556 var snaps []walpb.Snapshot
557 var state raftpb.HardState
558 var err error
559
560 rec := &walpb.Record{}
561 names, err := readWALNames(lg, walDir)
562 if err != nil {
563 return nil, err
564 }
565
566
567
568 rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
569 if err != nil {
570 return nil, err
571 }
572 defer func() {
573 if closer != nil {
574 closer()
575 }
576 }()
577
578
579 decoder := newDecoder(rs...)
580
581 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
582 switch rec.Type {
583 case snapshotType:
584 var loadedSnap walpb.Snapshot
585 pbutil.MustUnmarshal(&loadedSnap, rec.Data)
586 snaps = append(snaps, loadedSnap)
587 case stateType:
588 state = mustUnmarshalState(rec.Data)
589 case crcType:
590 crc := decoder.crc.Sum32()
591
592
593 if crc != 0 && rec.Validate(crc) != nil {
594 return nil, ErrCRCMismatch
595 }
596 decoder.updateCRC(rec.Crc)
597 }
598 }
599
600
601 if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
602 return nil, err
603 }
604
605
606 n := 0
607 for _, s := range snaps {
608 if s.Index <= state.Commit {
609 snaps[n] = s
610 n++
611 }
612 }
613 snaps = snaps[:n:n]
614 return snaps, nil
615 }
616
617
618
619
620
621
622
623
624 func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardState, error) {
625 var metadata []byte
626 var err error
627 var match bool
628 var state raftpb.HardState
629
630 rec := &walpb.Record{}
631
632 if lg == nil {
633 lg = zap.NewNop()
634 }
635 names, nameIndex, err := selectWALFiles(lg, walDir, snap)
636 if err != nil {
637 return nil, err
638 }
639
640
641
642 rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
643 if err != nil {
644 return nil, err
645 }
646 defer func() {
647 if closer != nil {
648 closer()
649 }
650 }()
651
652
653 decoder := newDecoder(rs...)
654
655 for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
656 switch rec.Type {
657 case metadataType:
658 if metadata != nil && !bytes.Equal(metadata, rec.Data) {
659 return nil, ErrMetadataConflict
660 }
661 metadata = rec.Data
662 case crcType:
663 crc := decoder.crc.Sum32()
664
665
666 if crc != 0 && rec.Validate(crc) != nil {
667 return nil, ErrCRCMismatch
668 }
669 decoder.updateCRC(rec.Crc)
670 case snapshotType:
671 var loadedSnap walpb.Snapshot
672 pbutil.MustUnmarshal(&loadedSnap, rec.Data)
673 if loadedSnap.Index == snap.Index {
674 if loadedSnap.Term != snap.Term {
675 return nil, ErrSnapshotMismatch
676 }
677 match = true
678 }
679
680
681 case entryType:
682 case stateType:
683 pbutil.MustUnmarshal(&state, rec.Data)
684 default:
685 return nil, fmt.Errorf("unexpected block type %d", rec.Type)
686 }
687 }
688
689
690
691 if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
692 return nil, err
693 }
694
695 if !match {
696 return nil, ErrSnapshotNotFound
697 }
698
699 return &state, nil
700 }
701
702
703
704
705 func (w *WAL) cut() error {
706
707 off, serr := w.tail().Seek(0, io.SeekCurrent)
708 if serr != nil {
709 return serr
710 }
711
712 if err := w.tail().Truncate(off); err != nil {
713 return err
714 }
715
716 if err := w.sync(); err != nil {
717 return err
718 }
719
720 fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
721
722
723 newTail, err := w.fp.Open()
724 if err != nil {
725 return err
726 }
727
728
729 w.locks = append(w.locks, newTail)
730 prevCrc := w.encoder.crc.Sum32()
731 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
732 if err != nil {
733 return err
734 }
735
736 if err = w.saveCrc(prevCrc); err != nil {
737 return err
738 }
739
740 if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
741 return err
742 }
743
744 if err = w.saveState(&w.state); err != nil {
745 return err
746 }
747
748
749 if err = w.sync(); err != nil {
750 return err
751 }
752
753 off, err = w.tail().Seek(0, io.SeekCurrent)
754 if err != nil {
755 return err
756 }
757
758 if err = os.Rename(newTail.Name(), fpath); err != nil {
759 return err
760 }
761 start := time.Now()
762 if err = fileutil.Fsync(w.dirFile); err != nil {
763 return err
764 }
765 walFsyncSec.Observe(time.Since(start).Seconds())
766
767
768 newTail.Close()
769
770 if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
771 return err
772 }
773 if _, err = newTail.Seek(off, io.SeekStart); err != nil {
774 return err
775 }
776
777 w.locks[len(w.locks)-1] = newTail
778
779 prevCrc = w.encoder.crc.Sum32()
780 w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
781 if err != nil {
782 return err
783 }
784
785 w.lg.Info("created a new WAL segment", zap.String("path", fpath))
786 return nil
787 }
788
789 func (w *WAL) sync() error {
790 if w.encoder != nil {
791 if err := w.encoder.flush(); err != nil {
792 return err
793 }
794 }
795
796 if w.unsafeNoSync {
797 return nil
798 }
799
800 start := time.Now()
801 err := fileutil.Fdatasync(w.tail().File)
802
803 took := time.Since(start)
804 if took > warnSyncDuration {
805 w.lg.Warn(
806 "slow fdatasync",
807 zap.Duration("took", took),
808 zap.Duration("expected-duration", warnSyncDuration),
809 )
810 }
811 walFsyncSec.Observe(took.Seconds())
812
813 return err
814 }
815
816 func (w *WAL) Sync() error {
817 return w.sync()
818 }
819
820
821
822
823
824 func (w *WAL) ReleaseLockTo(index uint64) error {
825 w.mu.Lock()
826 defer w.mu.Unlock()
827
828 if len(w.locks) == 0 {
829 return nil
830 }
831
832 var smaller int
833 found := false
834 for i, l := range w.locks {
835 _, lockIndex, err := parseWALName(filepath.Base(l.Name()))
836 if err != nil {
837 return err
838 }
839 if lockIndex >= index {
840 smaller = i - 1
841 found = true
842 break
843 }
844 }
845
846
847
848 if !found {
849 smaller = len(w.locks) - 1
850 }
851
852 if smaller <= 0 {
853 return nil
854 }
855
856 for i := 0; i < smaller; i++ {
857 if w.locks[i] == nil {
858 continue
859 }
860 w.locks[i].Close()
861 }
862 w.locks = w.locks[smaller:]
863
864 return nil
865 }
866
867
868 func (w *WAL) Close() error {
869 w.mu.Lock()
870 defer w.mu.Unlock()
871
872 if w.fp != nil {
873 w.fp.Close()
874 w.fp = nil
875 }
876
877 if w.tail() != nil {
878 if err := w.sync(); err != nil {
879 return err
880 }
881 }
882 for _, l := range w.locks {
883 if l == nil {
884 continue
885 }
886 if err := l.Close(); err != nil {
887 w.lg.Error("failed to close WAL", zap.Error(err))
888 }
889 }
890
891 return w.dirFile.Close()
892 }
893
894 func (w *WAL) saveEntry(e *raftpb.Entry) error {
895
896 b := pbutil.MustMarshal(e)
897 rec := &walpb.Record{Type: entryType, Data: b}
898 if err := w.encoder.encode(rec); err != nil {
899 return err
900 }
901 w.enti = e.Index
902 return nil
903 }
904
905 func (w *WAL) saveState(s *raftpb.HardState) error {
906 if raft.IsEmptyHardState(*s) {
907 return nil
908 }
909 w.state = *s
910 b := pbutil.MustMarshal(s)
911 rec := &walpb.Record{Type: stateType, Data: b}
912 return w.encoder.encode(rec)
913 }
914
915 func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
916 w.mu.Lock()
917 defer w.mu.Unlock()
918
919
920 if raft.IsEmptyHardState(st) && len(ents) == 0 {
921 return nil
922 }
923
924 mustSync := raft.MustSync(st, w.state, len(ents))
925
926
927 for i := range ents {
928 if err := w.saveEntry(&ents[i]); err != nil {
929 return err
930 }
931 }
932 if err := w.saveState(&st); err != nil {
933 return err
934 }
935
936 curOff, err := w.tail().Seek(0, io.SeekCurrent)
937 if err != nil {
938 return err
939 }
940 if curOff < SegmentSizeBytes {
941 if mustSync {
942
943 err = w.sync()
944
945 return err
946 }
947 return nil
948 }
949
950 return w.cut()
951 }
952
953 func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
954 if err := walpb.ValidateSnapshotForWrite(&e); err != nil {
955 return err
956 }
957
958 b := pbutil.MustMarshal(&e)
959
960 w.mu.Lock()
961 defer w.mu.Unlock()
962
963 rec := &walpb.Record{Type: snapshotType, Data: b}
964 if err := w.encoder.encode(rec); err != nil {
965 return err
966 }
967
968 if w.enti < e.Index {
969 w.enti = e.Index
970 }
971 return w.sync()
972 }
973
974 func (w *WAL) saveCrc(prevCrc uint32) error {
975 return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
976 }
977
978 func (w *WAL) tail() *fileutil.LockedFile {
979 if len(w.locks) > 0 {
980 return w.locks[len(w.locks)-1]
981 }
982 return nil
983 }
984
985 func (w *WAL) seq() uint64 {
986 t := w.tail()
987 if t == nil {
988 return 0
989 }
990 seq, _, err := parseWALName(filepath.Base(t.Name()))
991 if err != nil {
992 w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
993 }
994 return seq
995 }
996
997 func closeAll(lg *zap.Logger, rcs ...io.ReadCloser) error {
998 stringArr := make([]string, 0)
999 for _, f := range rcs {
1000 if err := f.Close(); err != nil {
1001 lg.Warn("failed to close: ", zap.Error(err))
1002 stringArr = append(stringArr, err.Error())
1003 }
1004 }
1005 if len(stringArr) == 0 {
1006 return nil
1007 }
1008 return errors.New(strings.Join(stringArr, ", "))
1009 }
1010
View as plain text