...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "fmt"
19 "os"
20 "time"
21
22 "go.etcd.io/etcd/raft/v3/raftpb"
23 "go.etcd.io/etcd/server/v3/config"
24 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
25 "go.etcd.io/etcd/server/v3/etcdserver/cindex"
26 "go.etcd.io/etcd/server/v3/mvcc/backend"
27
28 "go.uber.org/zap"
29 )
30
31 func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
32 bcfg := backend.DefaultBackendConfig()
33 bcfg.Path = cfg.BackendPath()
34 bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
35 if cfg.BackendBatchLimit != 0 {
36 bcfg.BatchLimit = cfg.BackendBatchLimit
37 if cfg.Logger != nil {
38 cfg.Logger.Info("setting backend batch limit", zap.Int("batch limit", cfg.BackendBatchLimit))
39 }
40 }
41 if cfg.BackendBatchInterval != 0 {
42 bcfg.BatchInterval = cfg.BackendBatchInterval
43 if cfg.Logger != nil {
44 cfg.Logger.Info("setting backend batch interval", zap.Duration("batch interval", cfg.BackendBatchInterval))
45 }
46 }
47 bcfg.BackendFreelistType = cfg.BackendFreelistType
48 bcfg.Logger = cfg.Logger
49 if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
50
51 bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
52 }
53 bcfg.Mlock = cfg.ExperimentalMemoryMlock
54 bcfg.Hooks = hooks
55 return backend.New(bcfg)
56 }
57
58
59 func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
60 snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
61 if err != nil {
62 return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
63 }
64 if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
65 return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
66 }
67 return openBackend(cfg, hooks), nil
68 }
69
70
71 func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
72 fn := cfg.BackendPath()
73
74 now, beOpened := time.Now(), make(chan backend.Backend)
75 go func() {
76 beOpened <- newBackend(cfg, hooks)
77 }()
78
79 select {
80 case be := <-beOpened:
81 cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now)))
82 return be
83
84 case <-time.After(10 * time.Second):
85 cfg.Logger.Info(
86 "db file is flocked by another process, or taking too long",
87 zap.String("path", fn),
88 zap.Duration("took", time.Since(now)),
89 )
90 }
91
92 return <-beOpened
93 }
94
95
96
97
98
99 func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
100 consistentIndex := uint64(0)
101 if beExist {
102 consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx())
103 }
104 if snapshot.Metadata.Index <= consistentIndex {
105 return oldbe, nil
106 }
107 oldbe.Close()
108 return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
109 }
110
View as plain text