...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package snapshot
16
17 import (
18 "context"
19 "crypto/sha256"
20 "fmt"
21 "io"
22 "os"
23 "time"
24
25 "github.com/dustin/go-humanize"
26 "go.etcd.io/etcd/client/pkg/v3/fileutil"
27 "go.etcd.io/etcd/client/v3"
28 "go.uber.org/zap"
29 )
30
31
32
33 func hasChecksum(n int64) bool {
34
35
36 return (n % 512) == sha256.Size
37 }
38
39
40
41
42
43
44
45
46 func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) error {
47 cfg.Logger = lg.Named("client")
48 if len(cfg.Endpoints) != 1 {
49 return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
50 }
51 cli, err := clientv3.New(cfg)
52 if err != nil {
53 return err
54 }
55 defer cli.Close()
56
57 partpath := dbPath + ".part"
58 defer os.RemoveAll(partpath)
59
60 var f *os.File
61 f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
62 if err != nil {
63 return fmt.Errorf("could not open %s (%v)", partpath, err)
64 }
65 lg.Info("created temporary db file", zap.String("path", partpath))
66
67 now := time.Now()
68 var rd io.ReadCloser
69 rd, err = cli.Snapshot(ctx)
70 if err != nil {
71 return err
72 }
73 lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
74 var size int64
75 size, err = io.Copy(f, rd)
76 if err != nil {
77 return err
78 }
79 if !hasChecksum(size) {
80 return fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
81 }
82 if err = fileutil.Fsync(f); err != nil {
83 return err
84 }
85 if err = f.Close(); err != nil {
86 return err
87 }
88 lg.Info("fetched snapshot",
89 zap.String("endpoint", cfg.Endpoints[0]),
90 zap.String("size", humanize.Bytes(uint64(size))),
91 zap.String("took", humanize.Time(now)),
92 )
93
94 if err = os.Rename(partpath, dbPath); err != nil {
95 return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
96 }
97 lg.Info("saved", zap.String("path", dbPath))
98 return nil
99 }
100
View as plain text