1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package snap
16
17 import (
18 "errors"
19 "fmt"
20 "hash/crc32"
21 "io/ioutil"
22 "os"
23 "path/filepath"
24 "sort"
25 "strconv"
26 "strings"
27 "time"
28
29 pioutil "go.etcd.io/etcd/pkg/v3/ioutil"
30 "go.etcd.io/etcd/pkg/v3/pbutil"
31 "go.etcd.io/etcd/raft/v3"
32 "go.etcd.io/etcd/raft/v3/raftpb"
33 "go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb"
34 "go.etcd.io/etcd/server/v3/wal/walpb"
35
36 "go.uber.org/zap"
37 )
38
39 const snapSuffix = ".snap"
40
41 var (
42 ErrNoSnapshot = errors.New("snap: no available snapshot")
43 ErrEmptySnapshot = errors.New("snap: empty snapshot")
44 ErrCRCMismatch = errors.New("snap: crc mismatch")
45 crcTable = crc32.MakeTable(crc32.Castagnoli)
46
47
48 validFiles = map[string]bool{
49 "db": true,
50 }
51 )
52
53 type Snapshotter struct {
54 lg *zap.Logger
55 dir string
56 }
57
58 func New(lg *zap.Logger, dir string) *Snapshotter {
59 if lg == nil {
60 lg = zap.NewNop()
61 }
62 return &Snapshotter{
63 lg: lg,
64 dir: dir,
65 }
66 }
67
68 func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error {
69 if raft.IsEmptySnap(snapshot) {
70 return nil
71 }
72 return s.save(&snapshot)
73 }
74
75 func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
76 start := time.Now()
77
78 fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
79 b := pbutil.MustMarshal(snapshot)
80 crc := crc32.Update(0, crcTable, b)
81 snap := snappb.Snapshot{Crc: crc, Data: b}
82 d, err := snap.Marshal()
83 if err != nil {
84 return err
85 }
86 snapMarshallingSec.Observe(time.Since(start).Seconds())
87
88 spath := filepath.Join(s.dir, fname)
89
90 fsyncStart := time.Now()
91 err = pioutil.WriteAndSyncFile(spath, d, 0666)
92 snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())
93
94 if err != nil {
95 s.lg.Warn("failed to write a snap file", zap.String("path", spath), zap.Error(err))
96 rerr := os.Remove(spath)
97 if rerr != nil {
98 s.lg.Warn("failed to remove a broken snap file", zap.String("path", spath), zap.Error(err))
99 }
100 return err
101 }
102
103 snapSaveSec.Observe(time.Since(start).Seconds())
104 return nil
105 }
106
107
108 func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
109 return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
110 }
111
112
113 func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
114 return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
115 m := snapshot.Metadata
116 for i := len(walSnaps) - 1; i >= 0; i-- {
117 if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
118 return true
119 }
120 }
121 return false
122 })
123 }
124
125
126 func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
127 names, err := s.snapNames()
128 if err != nil {
129 return nil, err
130 }
131 var snap *raftpb.Snapshot
132 for _, name := range names {
133 if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
134 return snap, nil
135 }
136 }
137 return nil, ErrNoSnapshot
138 }
139
140 func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
141 fpath := filepath.Join(dir, name)
142 snap, err := Read(lg, fpath)
143 if err != nil {
144 brokenPath := fpath + ".broken"
145 if lg != nil {
146 lg.Warn("failed to read a snap file", zap.String("path", fpath), zap.Error(err))
147 }
148 if rerr := os.Rename(fpath, brokenPath); rerr != nil {
149 if lg != nil {
150 lg.Warn("failed to rename a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath), zap.Error(rerr))
151 }
152 } else {
153 if lg != nil {
154 lg.Warn("renamed to a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath))
155 }
156 }
157 }
158 return snap, err
159 }
160
161
162 func Read(lg *zap.Logger, snapname string) (*raftpb.Snapshot, error) {
163 b, err := ioutil.ReadFile(snapname)
164 if err != nil {
165 if lg != nil {
166 lg.Warn("failed to read a snap file", zap.String("path", snapname), zap.Error(err))
167 }
168 return nil, err
169 }
170
171 if len(b) == 0 {
172 if lg != nil {
173 lg.Warn("failed to read empty snapshot file", zap.String("path", snapname))
174 }
175 return nil, ErrEmptySnapshot
176 }
177
178 var serializedSnap snappb.Snapshot
179 if err = serializedSnap.Unmarshal(b); err != nil {
180 if lg != nil {
181 lg.Warn("failed to unmarshal snappb.Snapshot", zap.String("path", snapname), zap.Error(err))
182 }
183 return nil, err
184 }
185
186 if len(serializedSnap.Data) == 0 || serializedSnap.Crc == 0 {
187 if lg != nil {
188 lg.Warn("failed to read empty snapshot data", zap.String("path", snapname))
189 }
190 return nil, ErrEmptySnapshot
191 }
192
193 crc := crc32.Update(0, crcTable, serializedSnap.Data)
194 if crc != serializedSnap.Crc {
195 if lg != nil {
196 lg.Warn("snap file is corrupt",
197 zap.String("path", snapname),
198 zap.Uint32("prev-crc", serializedSnap.Crc),
199 zap.Uint32("new-crc", crc),
200 )
201 }
202 return nil, ErrCRCMismatch
203 }
204
205 var snap raftpb.Snapshot
206 if err = snap.Unmarshal(serializedSnap.Data); err != nil {
207 if lg != nil {
208 lg.Warn("failed to unmarshal raftpb.Snapshot", zap.String("path", snapname), zap.Error(err))
209 }
210 return nil, err
211 }
212 return &snap, nil
213 }
214
215
216
217 func (s *Snapshotter) snapNames() ([]string, error) {
218 dir, err := os.Open(s.dir)
219 if err != nil {
220 return nil, err
221 }
222 defer dir.Close()
223 names, err := dir.Readdirnames(-1)
224 if err != nil {
225 return nil, err
226 }
227 filenames, err := s.cleanupSnapdir(names)
228 if err != nil {
229 return nil, err
230 }
231 snaps := checkSuffix(s.lg, filenames)
232 if len(snaps) == 0 {
233 return nil, ErrNoSnapshot
234 }
235 sort.Sort(sort.Reverse(sort.StringSlice(snaps)))
236 return snaps, nil
237 }
238
239 func checkSuffix(lg *zap.Logger, names []string) []string {
240 snaps := []string{}
241 for i := range names {
242 if strings.HasSuffix(names[i], snapSuffix) {
243 snaps = append(snaps, names[i])
244 } else {
245
246
247 if _, ok := validFiles[names[i]]; !ok {
248 if lg != nil {
249 lg.Warn("found unexpected non-snap file; skipping", zap.String("path", names[i]))
250 }
251 }
252 }
253 }
254 return snaps
255 }
256
257
258
259 func (s *Snapshotter) cleanupSnapdir(filenames []string) (names []string, err error) {
260 names = make([]string, 0, len(filenames))
261 for _, filename := range filenames {
262 if strings.HasPrefix(filename, "db.tmp") {
263 s.lg.Info("found orphaned defragmentation file; deleting", zap.String("path", filename))
264 if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
265 return names, fmt.Errorf("failed to remove orphaned .snap.db file %s: %v", filename, rmErr)
266 }
267 } else {
268 names = append(names, filename)
269 }
270 }
271 return names, nil
272 }
273
274 func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
275 dir, err := os.Open(s.dir)
276 if err != nil {
277 return err
278 }
279 defer dir.Close()
280 filenames, err := dir.Readdirnames(-1)
281 if err != nil {
282 return err
283 }
284 for _, filename := range filenames {
285 if strings.HasSuffix(filename, ".snap.db") {
286 hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
287 index, err := strconv.ParseUint(hexIndex, 16, 64)
288 if err != nil {
289 s.lg.Error("failed to parse index from filename", zap.String("path", filename), zap.String("error", err.Error()))
290 continue
291 }
292 if index < snap.Metadata.Index {
293 s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename))
294 if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
295 s.lg.Error("failed to remove orphaned .snap.db file", zap.String("path", filename), zap.String("error", rmErr.Error()))
296 }
297 }
298 }
299 }
300 return nil
301 }
302
View as plain text