...

Source file src/k8s.io/kubernetes/cluster/images/etcd/migrate/migrate_client.go

Documentation: k8s.io/kubernetes/cluster/images/etcd/migrate

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package main
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"os"
    24  	"os/exec"
    25  	"path/filepath"
    26  	"strings"
    27  	"time"
    28  
    29  	clientv3 "go.etcd.io/etcd/client/v3"
    30  	"google.golang.org/grpc"
    31  	"k8s.io/klog/v2"
    32  )
    33  
    34  // CombinedEtcdClient provides an implementation of EtcdMigrateClient using a combination of the etcd v2 client, v3 client
    35  // and etcdctl commands called via the shell.
    36  type CombinedEtcdClient struct {
    37  	cfg *EtcdMigrateCfg
    38  }
    39  
    40  // NewEtcdMigrateClient creates a new EtcdMigrateClient from the given EtcdMigrateCfg.
    41  func NewEtcdMigrateClient(cfg *EtcdMigrateCfg) (EtcdMigrateClient, error) {
    42  	return &CombinedEtcdClient{cfg}, nil
    43  }
    44  
    45  // Close closes the client and releases any resources it holds.
    46  func (e *CombinedEtcdClient) Close() error {
    47  	return nil
    48  }
    49  
    50  // SetEtcdVersionKeyValue writes the given version to the etcd 'etcd_version' key.
    51  // If no error is returned, the write was successful, indicating the etcd server is available
    52  // and able to perform consensus writes.
    53  func (e *CombinedEtcdClient) SetEtcdVersionKeyValue(version *EtcdVersion) error {
    54  	return e.Put(version, "etcd_version", version.String())
    55  }
    56  
    57  // Put write a single key value pair to etcd.
    58  func (e *CombinedEtcdClient) Put(version *EtcdVersion, key, value string) error {
    59  	v3client, err := e.clientV3()
    60  	if err != nil {
    61  		return err
    62  	}
    63  	defer v3client.Close()
    64  	_, err = v3client.KV.Put(context.Background(), key, value)
    65  	return err
    66  }
    67  
    68  // Get reads a single value for a given key.
    69  func (e *CombinedEtcdClient) Get(version *EtcdVersion, key string) (string, error) {
    70  	v3client, err := e.clientV3()
    71  	if err != nil {
    72  		return "", err
    73  	}
    74  	defer v3client.Close()
    75  	resp, err := v3client.KV.Get(context.Background(), key)
    76  	if err != nil {
    77  		return "", err
    78  	}
    79  	kvs := resp.Kvs
    80  	if len(kvs) != 1 {
    81  		return "", fmt.Errorf("expected exactly one value for key %s but got %d", key, len(kvs))
    82  	}
    83  
    84  	return string(kvs[0].Value), nil
    85  }
    86  
    87  func (e *CombinedEtcdClient) clientV3() (*clientv3.Client, error) {
    88  	return clientv3.New(clientv3.Config{
    89  		Endpoints:   []string{e.endpoint()},
    90  		DialTimeout: 20 * time.Second,
    91  		DialOptions: []grpc.DialOption{
    92  			grpc.WithBlock(), // block until the underlying connection is up
    93  		},
    94  	})
    95  }
    96  
    97  // Backup creates a backup of an etcd2 data directory at the given backupDir.
    98  func (e *CombinedEtcdClient) Backup(version *EtcdVersion, backupDir string) error {
    99  	// We cannot use etcd/client (v2) to make this call. It is implemented in the etcdctl client code.
   100  	if version.Major != 2 {
   101  		return fmt.Errorf("etcd 2.x required but got version '%s'", version)
   102  	}
   103  	return e.runEtcdctlCommand(version,
   104  		"--debug",
   105  		"backup",
   106  		"--data-dir", e.cfg.dataDirectory,
   107  		"--backup-dir", backupDir,
   108  	)
   109  }
   110  
   111  // Snapshot captures a snapshot from a running etcd3 server and saves it to the given snapshotFile.
   112  // We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
   113  func (e *CombinedEtcdClient) Snapshot(version *EtcdVersion, snapshotFile string) error {
   114  	if version.Major != 3 {
   115  		return fmt.Errorf("etcd 3.x required but got version '%s'", version)
   116  	}
   117  	return e.runEtcdctlCommand(version,
   118  		"--endpoints", e.endpoint(),
   119  		"snapshot", "save", snapshotFile,
   120  	)
   121  }
   122  
   123  // Restore restores a given snapshotFile into the data directory specified this clients config.
   124  func (e *CombinedEtcdClient) Restore(version *EtcdVersion, snapshotFile string) error {
   125  	// We cannot use etcd/clientv3 to make this call. It is implemented in the etcdctl client code.
   126  	if version.Major != 3 {
   127  		return fmt.Errorf("etcd 3.x required but got version '%s'", version)
   128  	}
   129  	return e.runEtcdctlCommand(version,
   130  		"snapshot", "restore", snapshotFile,
   131  		"--data-dir", e.cfg.dataDirectory,
   132  		"--name", e.cfg.name,
   133  		"--initial-advertise-peer-urls", e.cfg.peerAdvertiseUrls,
   134  		"--initial-cluster", e.cfg.initialCluster,
   135  	)
   136  }
   137  
   138  // Migrate upgrades a 'etcd2' storage version data directory to a 'etcd3' storage version
   139  // data directory.
   140  func (e *CombinedEtcdClient) Migrate(version *EtcdVersion) error {
   141  	// We cannot use etcd/clientv3 to make this call as it is implemented in etcd/etcdctl.
   142  	if version.Major != 3 {
   143  		return fmt.Errorf("etcd 3.x required but got version '%s'", version)
   144  	}
   145  	return e.runEtcdctlCommand(version,
   146  		"migrate",
   147  		"--data-dir", e.cfg.dataDirectory,
   148  	)
   149  }
   150  
   151  func (e *CombinedEtcdClient) runEtcdctlCommand(version *EtcdVersion, args ...string) error {
   152  	etcdctlCmd := exec.Command(filepath.Join(e.cfg.binPath, fmt.Sprintf("etcdctl-%s", version)), args...)
   153  	etcdctlCmd.Env = []string{fmt.Sprintf("ETCDCTL_API=%d", version.Major)}
   154  	etcdctlCmd.Stdout = os.Stdout
   155  	etcdctlCmd.Stderr = os.Stderr
   156  	return etcdctlCmd.Run()
   157  }
   158  
   159  // AttachLease attaches leases of the given leaseDuration to all the  etcd objects under
   160  // ttlKeysDirectory specified in this client's config.
   161  func (e *CombinedEtcdClient) AttachLease(leaseDuration time.Duration) error {
   162  	ttlKeysPrefix := e.cfg.ttlKeysDirectory
   163  	// Make sure that ttlKeysPrefix is ended with "/" so that we only get children "directories".
   164  	if !strings.HasSuffix(ttlKeysPrefix, "/") {
   165  		ttlKeysPrefix += "/"
   166  	}
   167  	ctx := context.Background()
   168  
   169  	v3client, err := e.clientV3()
   170  	if err != nil {
   171  		return err
   172  	}
   173  	defer v3client.Close()
   174  	objectsResp, err := v3client.KV.Get(ctx, ttlKeysPrefix, clientv3.WithPrefix())
   175  	if err != nil {
   176  		return fmt.Errorf("error while getting objects to attach to the lease")
   177  	}
   178  
   179  	lease, err := v3client.Lease.Grant(ctx, int64(leaseDuration/time.Second))
   180  	if err != nil {
   181  		return fmt.Errorf("error while creating lease: %v", err)
   182  	}
   183  	klog.Infof("Lease with TTL: %v created", lease.TTL)
   184  
   185  	klog.Infof("Attaching lease to %d entries", len(objectsResp.Kvs))
   186  	for _, kv := range objectsResp.Kvs {
   187  		putResp, err := v3client.KV.Put(ctx, string(kv.Key), string(kv.Value), clientv3.WithLease(lease.ID), clientv3.WithPrevKV())
   188  		if err != nil {
   189  			klog.Errorf("Error while attaching lease to: %s", string(kv.Key))
   190  		}
   191  		if !bytes.Equal(putResp.PrevKv.Value, kv.Value) {
   192  			return fmt.Errorf("concurrent access to key detected when setting lease on %s, expected previous value of %s but got %s",
   193  				kv.Key, kv.Value, putResp.PrevKv.Value)
   194  		}
   195  	}
   196  	return nil
   197  }
   198  
   199  func (e *CombinedEtcdClient) endpoint() string {
   200  	return fmt.Sprintf("http://127.0.0.1:%d", e.cfg.port)
   201  }
   202  

View as plain text