...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "errors"
19 "io"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "go.etcd.io/etcd/client/pkg/v3/types"
23 "go.etcd.io/etcd/pkg/v3/pbutil"
24 "go.etcd.io/etcd/raft/v3/raftpb"
25 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
26 "go.etcd.io/etcd/server/v3/wal"
27 "go.etcd.io/etcd/server/v3/wal/walpb"
28
29 "go.uber.org/zap"
30 )
31
32 type Storage interface {
33
34
35 Save(st raftpb.HardState, ents []raftpb.Entry) error
36
37 SaveSnap(snap raftpb.Snapshot) error
38
39 Close() error
40
41 Release(snap raftpb.Snapshot) error
42
43 Sync() error
44 }
45
46 type storage struct {
47 *wal.WAL
48 *snap.Snapshotter
49 }
50
51 func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
52 return &storage{w, s}
53 }
54
55
56 func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
57 walsnap := walpb.Snapshot{
58 Index: snap.Metadata.Index,
59 Term: snap.Metadata.Term,
60 ConfState: &snap.Metadata.ConfState,
61 }
62
63
64
65 err := st.Snapshotter.SaveSnap(snap)
66 if err != nil {
67 return err
68 }
69
70
71 return st.WAL.SaveSnapshot(walsnap)
72 }
73
74
75
76
77 func (st *storage) Release(snap raftpb.Snapshot) error {
78 if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil {
79 return err
80 }
81 return st.Snapshotter.ReleaseSnapDBs(snap)
82 }
83
84
85
86
87 func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
88 var (
89 err error
90 wmetadata []byte
91 )
92
93 repaired := false
94 for {
95 if w, err = wal.Open(lg, waldir, snap); err != nil {
96 lg.Fatal("failed to open WAL", zap.Error(err))
97 }
98 if unsafeNoFsync {
99 w.SetUnsafeNoFsync()
100 }
101 if wmetadata, st, ents, err = w.ReadAll(); err != nil {
102 w.Close()
103
104 if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
105 lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
106 }
107 if !wal.Repair(lg, waldir) {
108 lg.Fatal("failed to repair WAL", zap.Error(err))
109 } else {
110 lg.Info("repaired WAL", zap.Error(err))
111 repaired = true
112 }
113 continue
114 }
115 break
116 }
117 var metadata pb.Metadata
118 pbutil.MustUnmarshal(&metadata, wmetadata)
119 id = types.ID(metadata.NodeID)
120 cid = types.ID(metadata.ClusterID)
121 return w, id, cid, st, ents
122 }
123
View as plain text