1
16
17 package main
18
19 import (
20 "fmt"
21 "os"
22 "time"
23
24 "github.com/blang/semver/v4"
25 "k8s.io/klog/v2"
26 )
27
28
29 type EtcdMigrateCfg struct {
30 binPath string
31 name string
32 initialCluster string
33 port uint64
34 peerListenUrls string
35 peerAdvertiseUrls string
36 clientListenUrls string
37 etcdDataPrefix string
38 ttlKeysDirectory string
39 supportedVersions SupportedVersions
40 dataDirectory string
41 etcdServerArgs string
42 }
43
44
45 type EtcdMigrateClient interface {
46 SetEtcdVersionKeyValue(version *EtcdVersion) error
47 Get(version *EtcdVersion, key string) (string, error)
48 Put(version *EtcdVersion, key, value string) error
49 Backup(version *EtcdVersion, backupDir string) error
50 Snapshot(version *EtcdVersion, snapshotFile string) error
51 Restore(version *EtcdVersion, snapshotFile string) error
52 Migrate(version *EtcdVersion) error
53 AttachLease(leaseDuration time.Duration) error
54 Close() error
55 }
56
57
58 type Migrator struct {
59 cfg *EtcdMigrateCfg
60 dataDirectory *DataDirectory
61 client EtcdMigrateClient
62 }
63
64
65 func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
66 klog.Infof("Starting migration to %s", target)
67 err := m.dataDirectory.Initialize(target)
68 if err != nil {
69 return fmt.Errorf("failed to initialize data directory %s: %v", m.dataDirectory.path, err)
70 }
71
72 var current *EtcdVersionPair
73 vfExists, err := m.dataDirectory.versionFile.Exists()
74 if err != nil {
75 return err
76 }
77 if vfExists {
78 current, err = m.dataDirectory.versionFile.Read()
79 if err != nil {
80 return err
81 }
82 } else {
83 return fmt.Errorf("existing data directory '%s' is missing version.txt file, unable to migrate", m.dataDirectory.path)
84 }
85
86 for {
87 klog.Infof("Converging current version '%s' to target version '%s'", current, target)
88 currentNextMinorVersion := &EtcdVersion{Version: semver.Version{Major: current.version.Major, Minor: current.version.Minor + 1}}
89 switch {
90 case current.version.MajorMinorEquals(target.version) || currentNextMinorVersion.MajorMinorEquals(target.version):
91 klog.Infof("current version '%s' equals or is one minor version previous of target version '%s' - migration complete", current, target)
92 err = m.dataDirectory.versionFile.Write(target)
93 if err != nil {
94 return fmt.Errorf("failed to write version.txt to '%s': %v", m.dataDirectory.path, err)
95 }
96 return nil
97 case current.storageVersion == storageEtcd2 && target.storageVersion == storageEtcd3:
98 return fmt.Errorf("upgrading from etcd2 storage to etcd3 storage is not supported")
99 case current.version.Major == 3 && target.version.Major == 2:
100 return fmt.Errorf("downgrading from etcd 3.x to 2.x is not supported")
101 case current.version.Major == target.version.Major && current.version.Minor < target.version.Minor:
102 stepVersion := m.cfg.supportedVersions.NextVersionPair(current)
103 klog.Infof("upgrading etcd from %s to %s", current, stepVersion)
104 current, err = m.minorVersionUpgrade(current, stepVersion)
105 case current.version.Major == 3 && target.version.Major == 3 && current.version.Minor > target.version.Minor:
106 klog.Infof("rolling etcd back from %s to %s", current, target)
107 current, err = m.rollbackEtcd3MinorVersion(current, target)
108 }
109 if err != nil {
110 return err
111 }
112 }
113 }
114
115 func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
116 if target.version.Minor != current.version.Minor-1 {
117 return nil, fmt.Errorf("rollback from %s to %s not supported, only rollbacks to the previous minor version are supported", current.version, target.version)
118 }
119
120 klog.Infof("Performing etcd %s -> %s rollback", current.version, target.version)
121 err := m.dataDirectory.Backup()
122 if err != nil {
123 return nil, err
124 }
125
126 snapshotFilename := fmt.Sprintf("%s.snapshot.db", m.dataDirectory.path)
127 err = os.Remove(snapshotFilename)
128 if err != nil && !os.IsNotExist(err) {
129 return nil, fmt.Errorf("failed to clean snapshot file before rollback: %v", err)
130 }
131
132
133 runner := m.newServer()
134 klog.Infof("Starting etcd version %s to capture rollback snapshot.", current.version)
135 err = runner.Start(current.version)
136 if err != nil {
137 klog.Fatalf("Unable to automatically downgrade etcd: starting etcd version %s to capture rollback snapshot failed: %v", current.version, err)
138 return nil, err
139 }
140
141 klog.Infof("Snapshotting etcd %s to %s", current.version, snapshotFilename)
142 err = m.client.Snapshot(current.version, snapshotFilename)
143 if err != nil {
144 return nil, err
145 }
146
147 err = runner.Stop()
148 if err != nil {
149 return nil, err
150 }
151
152 klog.Info("Backing up data before rolling back")
153 backupDir := fmt.Sprintf("%s.bak", m.dataDirectory)
154 err = os.RemoveAll(backupDir)
155 if err != nil {
156 return nil, err
157 }
158 origInfo, err := os.Stat(m.dataDirectory.path)
159 if err != nil {
160 return nil, err
161 }
162 err = os.Rename(m.dataDirectory.path, backupDir)
163 if err != nil {
164 return nil, err
165 }
166
167 klog.Infof("Restoring etcd %s from %s", target.version, snapshotFilename)
168 err = m.client.Restore(target.version, snapshotFilename)
169 if err != nil {
170 return nil, err
171 }
172 err = os.Chmod(m.dataDirectory.path, origInfo.Mode())
173 if err != nil {
174 return nil, err
175 }
176
177 return target, nil
178 }
179
180 func (m *Migrator) minorVersionUpgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
181 runner := m.newServer()
182
183
184 err := runner.Start(target.version)
185 if err != nil {
186 return nil, err
187 }
188 err = runner.Stop()
189 return target, err
190 }
191
192 func (m *Migrator) newServer() *EtcdMigrateServer {
193 return NewEtcdMigrateServer(m.cfg, m.client)
194 }
195
View as plain text