1
16
17 package main
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "os"
24 "os/exec"
25 "path/filepath"
26 "strings"
27 "time"
28
29 clientv3 "go.etcd.io/etcd/client/v3"
30 "google.golang.org/grpc"
31 "k8s.io/klog/v2"
32 )
33
34
35
36 type CombinedEtcdClient struct {
37 cfg *EtcdMigrateCfg
38 }
39
40
41 func NewEtcdMigrateClient(cfg *EtcdMigrateCfg) (EtcdMigrateClient, error) {
42 return &CombinedEtcdClient{cfg}, nil
43 }
44
45
46 func (e *CombinedEtcdClient) Close() error {
47 return nil
48 }
49
50
51
52
53 func (e *CombinedEtcdClient) SetEtcdVersionKeyValue(version *EtcdVersion) error {
54 return e.Put(version, "etcd_version", version.String())
55 }
56
57
58 func (e *CombinedEtcdClient) Put(version *EtcdVersion, key, value string) error {
59 v3client, err := e.clientV3()
60 if err != nil {
61 return err
62 }
63 defer v3client.Close()
64 _, err = v3client.KV.Put(context.Background(), key, value)
65 return err
66 }
67
68
69 func (e *CombinedEtcdClient) Get(version *EtcdVersion, key string) (string, error) {
70 v3client, err := e.clientV3()
71 if err != nil {
72 return "", err
73 }
74 defer v3client.Close()
75 resp, err := v3client.KV.Get(context.Background(), key)
76 if err != nil {
77 return "", err
78 }
79 kvs := resp.Kvs
80 if len(kvs) != 1 {
81 return "", fmt.Errorf("expected exactly one value for key %s but got %d", key, len(kvs))
82 }
83
84 return string(kvs[0].Value), nil
85 }
86
87 func (e *CombinedEtcdClient) clientV3() (*clientv3.Client, error) {
88 return clientv3.New(clientv3.Config{
89 Endpoints: []string{e.endpoint()},
90 DialTimeout: 20 * time.Second,
91 DialOptions: []grpc.DialOption{
92 grpc.WithBlock(),
93 },
94 })
95 }
96
97
98 func (e *CombinedEtcdClient) Backup(version *EtcdVersion, backupDir string) error {
99
100 if version.Major != 2 {
101 return fmt.Errorf("etcd 2.x required but got version '%s'", version)
102 }
103 return e.runEtcdctlCommand(version,
104 "--debug",
105 "backup",
106 "--data-dir", e.cfg.dataDirectory,
107 "--backup-dir", backupDir,
108 )
109 }
110
111
112
113 func (e *CombinedEtcdClient) Snapshot(version *EtcdVersion, snapshotFile string) error {
114 if version.Major != 3 {
115 return fmt.Errorf("etcd 3.x required but got version '%s'", version)
116 }
117 return e.runEtcdctlCommand(version,
118 "--endpoints", e.endpoint(),
119 "snapshot", "save", snapshotFile,
120 )
121 }
122
123
124 func (e *CombinedEtcdClient) Restore(version *EtcdVersion, snapshotFile string) error {
125
126 if version.Major != 3 {
127 return fmt.Errorf("etcd 3.x required but got version '%s'", version)
128 }
129 return e.runEtcdctlCommand(version,
130 "snapshot", "restore", snapshotFile,
131 "--data-dir", e.cfg.dataDirectory,
132 "--name", e.cfg.name,
133 "--initial-advertise-peer-urls", e.cfg.peerAdvertiseUrls,
134 "--initial-cluster", e.cfg.initialCluster,
135 )
136 }
137
138
139
140 func (e *CombinedEtcdClient) Migrate(version *EtcdVersion) error {
141
142 if version.Major != 3 {
143 return fmt.Errorf("etcd 3.x required but got version '%s'", version)
144 }
145 return e.runEtcdctlCommand(version,
146 "migrate",
147 "--data-dir", e.cfg.dataDirectory,
148 )
149 }
150
151 func (e *CombinedEtcdClient) runEtcdctlCommand(version *EtcdVersion, args ...string) error {
152 etcdctlCmd := exec.Command(filepath.Join(e.cfg.binPath, fmt.Sprintf("etcdctl-%s", version)), args...)
153 etcdctlCmd.Env = []string{fmt.Sprintf("ETCDCTL_API=%d", version.Major)}
154 etcdctlCmd.Stdout = os.Stdout
155 etcdctlCmd.Stderr = os.Stderr
156 return etcdctlCmd.Run()
157 }
158
159
160
161 func (e *CombinedEtcdClient) AttachLease(leaseDuration time.Duration) error {
162 ttlKeysPrefix := e.cfg.ttlKeysDirectory
163
164 if !strings.HasSuffix(ttlKeysPrefix, "/") {
165 ttlKeysPrefix += "/"
166 }
167 ctx := context.Background()
168
169 v3client, err := e.clientV3()
170 if err != nil {
171 return err
172 }
173 defer v3client.Close()
174 objectsResp, err := v3client.KV.Get(ctx, ttlKeysPrefix, clientv3.WithPrefix())
175 if err != nil {
176 return fmt.Errorf("error while getting objects to attach to the lease")
177 }
178
179 lease, err := v3client.Lease.Grant(ctx, int64(leaseDuration/time.Second))
180 if err != nil {
181 return fmt.Errorf("error while creating lease: %v", err)
182 }
183 klog.Infof("Lease with TTL: %v created", lease.TTL)
184
185 klog.Infof("Attaching lease to %d entries", len(objectsResp.Kvs))
186 for _, kv := range objectsResp.Kvs {
187 putResp, err := v3client.KV.Put(ctx, string(kv.Key), string(kv.Value), clientv3.WithLease(lease.ID), clientv3.WithPrevKV())
188 if err != nil {
189 klog.Errorf("Error while attaching lease to: %s", string(kv.Key))
190 }
191 if !bytes.Equal(putResp.PrevKv.Value, kv.Value) {
192 return fmt.Errorf("concurrent access to key detected when setting lease on %s, expected previous value of %s but got %s",
193 kv.Key, kv.Value, putResp.PrevKv.Value)
194 }
195 }
196 return nil
197 }
198
199 func (e *CombinedEtcdClient) endpoint() string {
200 return fmt.Sprintf("http://127.0.0.1:%d", e.cfg.port)
201 }
202
View as plain text