...

Source file src/k8s.io/kubernetes/cluster/images/etcd/migrate/migrator.go

Documentation: k8s.io/kubernetes/cluster/images/etcd/migrate

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package main
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"time"
    23  
    24  	"github.com/blang/semver/v4"
    25  	"k8s.io/klog/v2"
    26  )
    27  
    28  // EtcdMigrateCfg provides all configuration required to perform etcd data upgrade/downgrade migrations.
    29  type EtcdMigrateCfg struct {
    30  	binPath           string
    31  	name              string
    32  	initialCluster    string
    33  	port              uint64
    34  	peerListenUrls    string
    35  	peerAdvertiseUrls string
    36  	clientListenUrls  string
    37  	etcdDataPrefix    string
    38  	ttlKeysDirectory  string
    39  	supportedVersions SupportedVersions
    40  	dataDirectory     string
    41  	etcdServerArgs    string
    42  }
    43  
    44  // EtcdMigrateClient defines the etcd client operations required to perform migrations.
    45  type EtcdMigrateClient interface {
    46  	SetEtcdVersionKeyValue(version *EtcdVersion) error
    47  	Get(version *EtcdVersion, key string) (string, error)
    48  	Put(version *EtcdVersion, key, value string) error
    49  	Backup(version *EtcdVersion, backupDir string) error
    50  	Snapshot(version *EtcdVersion, snapshotFile string) error
    51  	Restore(version *EtcdVersion, snapshotFile string) error
    52  	Migrate(version *EtcdVersion) error
    53  	AttachLease(leaseDuration time.Duration) error
    54  	Close() error
    55  }
    56  
    57  // Migrator manages etcd data migrations.
    58  type Migrator struct {
    59  	cfg           *EtcdMigrateCfg // TODO: don't wire this directly in
    60  	dataDirectory *DataDirectory
    61  	client        EtcdMigrateClient
    62  }
    63  
    64  // MigrateIfNeeded upgrades or downgrades the etcd data directory to the given target version.
    65  func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
    66  	klog.Infof("Starting migration to %s", target)
    67  	err := m.dataDirectory.Initialize(target)
    68  	if err != nil {
    69  		return fmt.Errorf("failed to initialize data directory %s: %v", m.dataDirectory.path, err)
    70  	}
    71  
    72  	var current *EtcdVersionPair
    73  	vfExists, err := m.dataDirectory.versionFile.Exists()
    74  	if err != nil {
    75  		return err
    76  	}
    77  	if vfExists {
    78  		current, err = m.dataDirectory.versionFile.Read()
    79  		if err != nil {
    80  			return err
    81  		}
    82  	} else {
    83  		return fmt.Errorf("existing data directory '%s' is missing version.txt file, unable to migrate", m.dataDirectory.path)
    84  	}
    85  
    86  	for {
    87  		klog.Infof("Converging current version '%s' to target version '%s'", current, target)
    88  		currentNextMinorVersion := &EtcdVersion{Version: semver.Version{Major: current.version.Major, Minor: current.version.Minor + 1}}
    89  		switch {
    90  		case current.version.MajorMinorEquals(target.version) || currentNextMinorVersion.MajorMinorEquals(target.version):
    91  			klog.Infof("current version '%s' equals or is one minor version previous of target version '%s' - migration complete", current, target)
    92  			err = m.dataDirectory.versionFile.Write(target)
    93  			if err != nil {
    94  				return fmt.Errorf("failed to write version.txt to '%s': %v", m.dataDirectory.path, err)
    95  			}
    96  			return nil
    97  		case current.storageVersion == storageEtcd2 && target.storageVersion == storageEtcd3:
    98  			return fmt.Errorf("upgrading from etcd2 storage to etcd3 storage is not supported")
    99  		case current.version.Major == 3 && target.version.Major == 2:
   100  			return fmt.Errorf("downgrading from etcd 3.x to 2.x is not supported")
   101  		case current.version.Major == target.version.Major && current.version.Minor < target.version.Minor:
   102  			stepVersion := m.cfg.supportedVersions.NextVersionPair(current)
   103  			klog.Infof("upgrading etcd from %s to %s", current, stepVersion)
   104  			current, err = m.minorVersionUpgrade(current, stepVersion)
   105  		case current.version.Major == 3 && target.version.Major == 3 && current.version.Minor > target.version.Minor:
   106  			klog.Infof("rolling etcd back from %s to %s", current, target)
   107  			current, err = m.rollbackEtcd3MinorVersion(current, target)
   108  		}
   109  		if err != nil {
   110  			return err
   111  		}
   112  	}
   113  }
   114  
   115  func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
   116  	if target.version.Minor != current.version.Minor-1 {
   117  		return nil, fmt.Errorf("rollback from %s to %s not supported, only rollbacks to the previous minor version are supported", current.version, target.version)
   118  	}
   119  
   120  	klog.Infof("Performing etcd %s -> %s rollback", current.version, target.version)
   121  	err := m.dataDirectory.Backup()
   122  	if err != nil {
   123  		return nil, err
   124  	}
   125  
   126  	snapshotFilename := fmt.Sprintf("%s.snapshot.db", m.dataDirectory.path)
   127  	err = os.Remove(snapshotFilename)
   128  	if err != nil && !os.IsNotExist(err) {
   129  		return nil, fmt.Errorf("failed to clean snapshot file before rollback: %v", err)
   130  	}
   131  
   132  	// Start current version of etcd.
   133  	runner := m.newServer()
   134  	klog.Infof("Starting etcd version %s to capture rollback snapshot.", current.version)
   135  	err = runner.Start(current.version)
   136  	if err != nil {
   137  		klog.Fatalf("Unable to automatically downgrade etcd: starting etcd version %s to capture rollback snapshot failed: %v", current.version, err)
   138  		return nil, err
   139  	}
   140  
   141  	klog.Infof("Snapshotting etcd %s to %s", current.version, snapshotFilename)
   142  	err = m.client.Snapshot(current.version, snapshotFilename)
   143  	if err != nil {
   144  		return nil, err
   145  	}
   146  
   147  	err = runner.Stop()
   148  	if err != nil {
   149  		return nil, err
   150  	}
   151  
   152  	klog.Info("Backing up data before rolling back")
   153  	backupDir := fmt.Sprintf("%s.bak", m.dataDirectory)
   154  	err = os.RemoveAll(backupDir)
   155  	if err != nil {
   156  		return nil, err
   157  	}
   158  	origInfo, err := os.Stat(m.dataDirectory.path)
   159  	if err != nil {
   160  		return nil, err
   161  	}
   162  	err = os.Rename(m.dataDirectory.path, backupDir)
   163  	if err != nil {
   164  		return nil, err
   165  	}
   166  
   167  	klog.Infof("Restoring etcd %s from %s", target.version, snapshotFilename)
   168  	err = m.client.Restore(target.version, snapshotFilename)
   169  	if err != nil {
   170  		return nil, err
   171  	}
   172  	err = os.Chmod(m.dataDirectory.path, origInfo.Mode())
   173  	if err != nil {
   174  		return nil, err
   175  	}
   176  
   177  	return target, nil
   178  }
   179  
   180  func (m *Migrator) minorVersionUpgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
   181  	runner := m.newServer()
   182  
   183  	// Do the migration step, by just starting etcd in the target version.
   184  	err := runner.Start(target.version)
   185  	if err != nil {
   186  		return nil, err
   187  	}
   188  	err = runner.Stop()
   189  	return target, err
   190  }
   191  
   192  func (m *Migrator) newServer() *EtcdMigrateServer {
   193  	return NewEtcdMigrateServer(m.cfg, m.client)
   194  }
   195  

View as plain text