...

Source file src/k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd/local.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package etcd
    18  
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"os"
    23  	"path/filepath"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    27  
    28  	"github.com/pkg/errors"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/resource"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/klog/v2"
    34  	utilsnet "k8s.io/utils/net"
    35  
    36  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    37  	kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
    38  	"k8s.io/kubernetes/cmd/kubeadm/app/features"
    39  	"k8s.io/kubernetes/cmd/kubeadm/app/images"
    40  	kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
    41  	etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
    42  	staticpodutil "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
    43  	"k8s.io/kubernetes/cmd/kubeadm/app/util/users"
    44  )
    45  
    46  const (
    47  	etcdVolumeName           = "etcd-data"
    48  	certsVolumeName          = "etcd-certs"
    49  	etcdHealthyCheckInterval = 5 * time.Second
    50  	etcdHealthyCheckRetries  = 8
    51  )
    52  
    53  // CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file.
    54  // This function is used by init - when the etcd cluster is empty - or by kubeadm
    55  // upgrade - when the etcd cluster is already up and running (and the --initial-cluster flag have no impact)
    56  func CreateLocalEtcdStaticPodManifestFile(manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, isDryRun bool) error {
    57  	if cfg.Etcd.External != nil {
    58  		return errors.New("etcd static pod manifest cannot be generated for cluster using external etcd")
    59  	}
    60  
    61  	if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, []etcdutil.Member{}, isDryRun); err != nil {
    62  		return err
    63  	}
    64  
    65  	klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
    66  	return nil
    67  }
    68  
    69  // CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member
    70  func CheckLocalEtcdClusterStatus(client clientset.Interface, certificatesDir string) error {
    71  	klog.V(1).Info("[etcd] Checking etcd cluster health")
    72  
    73  	// creates an etcd client that connects to all the local/stacked etcd members
    74  	klog.V(1).Info("creating etcd client that connects to etcd pods")
    75  	etcdClient, err := etcdutil.NewFromCluster(client, certificatesDir)
    76  	if err != nil {
    77  		return err
    78  	}
    79  
    80  	// Checking health state
    81  	err = etcdClient.CheckClusterHealth()
    82  	if err != nil {
    83  		return errors.Wrap(err, "etcd cluster is not healthy")
    84  	}
    85  
    86  	return nil
    87  }
    88  
    89  // RemoveStackedEtcdMemberFromCluster will remove a local etcd member from etcd cluster,
    90  // when reset the control plane node.
    91  func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
    92  	// creates an etcd client that connects to all the local/stacked etcd members
    93  	klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods")
    94  	etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
    95  	if err != nil {
    96  		return err
    97  	}
    98  
    99  	members, err := etcdClient.ListMembers()
   100  	if err != nil {
   101  		return err
   102  	}
   103  	// If this is the only remaining stacked etcd member in the cluster, calling RemoveMember()
   104  	// is not needed.
   105  	if len(members) == 1 {
   106  		etcdClientAddress := etcdutil.GetClientURL(&cfg.LocalAPIEndpoint)
   107  		for _, endpoint := range etcdClient.Endpoints {
   108  			if endpoint == etcdClientAddress {
   109  				klog.V(1).Info("[etcd] This is the only remaining etcd member in the etcd cluster, skip removing it")
   110  				return nil
   111  			}
   112  		}
   113  	}
   114  
   115  	// notifies the other members of the etcd cluster about the removing member
   116  	etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint)
   117  
   118  	klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
   119  	id, err := etcdClient.GetMemberID(etcdPeerAddress)
   120  	if err != nil {
   121  		if errors.Is(etcdutil.ErrNoMemberIDForPeerURL, err) {
   122  			klog.V(5).Infof("[etcd] member was already removed, because no member id exists for peer %s", etcdPeerAddress)
   123  			return nil
   124  		}
   125  		return err
   126  	}
   127  
   128  	klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id)
   129  	members, err = etcdClient.RemoveMember(id)
   130  	if err != nil {
   131  		return err
   132  	}
   133  	klog.V(1).Infof("[etcd] Updated etcd member list: %v", members)
   134  
   135  	return nil
   136  }
   137  
   138  // CreateStackedEtcdStaticPodManifestFile will write local etcd static pod manifest file
   139  // for an additional etcd member that is joining an existing local/stacked etcd cluster.
   140  // Other members of the etcd cluster will be notified of the joining node in beforehand as well.
   141  func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, isDryRun bool, certificatesDir string) error {
   142  	// creates an etcd client that connects to all the local/stacked etcd members
   143  	klog.V(1).Info("creating etcd client that connects to etcd pods")
   144  	etcdClient, err := etcdutil.NewFromCluster(client, certificatesDir)
   145  	if err != nil {
   146  		return err
   147  	}
   148  
   149  	etcdPeerAddress := etcdutil.GetPeerURL(endpoint)
   150  
   151  	var cluster []etcdutil.Member
   152  	if isDryRun {
   153  		fmt.Printf("[etcd] Would add etcd member: %s\n", etcdPeerAddress)
   154  	} else {
   155  		klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
   156  		if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
   157  			cluster, err = etcdClient.AddMemberAsLearner(nodeName, etcdPeerAddress)
   158  		} else {
   159  			cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
   160  		}
   161  		if err != nil {
   162  			return err
   163  		}
   164  		fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
   165  		klog.V(1).Infof("Updated etcd member list: %v", cluster)
   166  	}
   167  
   168  	fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)
   169  
   170  	if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, cluster, isDryRun); err != nil {
   171  		return err
   172  	}
   173  
   174  	if isDryRun {
   175  		fmt.Println("[etcd] Would wait for the new etcd member to join the cluster")
   176  		return nil
   177  	}
   178  
   179  	if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
   180  		learnerID, err := etcdClient.GetMemberID(etcdPeerAddress)
   181  		if err != nil {
   182  			return err
   183  		}
   184  		err = etcdClient.MemberPromote(learnerID)
   185  		if err != nil {
   186  			return err
   187  		}
   188  	}
   189  
   190  	fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
   191  	if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
   192  		return err
   193  	}
   194  
   195  	return nil
   196  }
   197  
   198  // GetEtcdPodSpec returns the etcd static Pod actualized to the context of the current configuration
   199  // NB. GetEtcdPodSpec methods holds the information about how kubeadm creates etcd static pod manifests.
   200  func GetEtcdPodSpec(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member) v1.Pod {
   201  	pathType := v1.HostPathDirectoryOrCreate
   202  	etcdMounts := map[string]v1.Volume{
   203  		etcdVolumeName:  staticpodutil.NewVolume(etcdVolumeName, cfg.Etcd.Local.DataDir, &pathType),
   204  		certsVolumeName: staticpodutil.NewVolume(certsVolumeName, cfg.CertificatesDir+"/etcd", &pathType),
   205  	}
   206  	componentHealthCheckTimeout := kubeadmapi.GetActiveTimeouts().ControlPlaneComponentHealthCheck
   207  
   208  	// probeHostname returns the correct localhost IP address family based on the endpoint AdvertiseAddress
   209  	probeHostname, probePort, probeScheme := staticpodutil.GetEtcdProbeEndpoint(&cfg.Etcd, utilsnet.IsIPv6String(endpoint.AdvertiseAddress))
   210  	return staticpodutil.ComponentPod(
   211  		v1.Container{
   212  			Name:            kubeadmconstants.Etcd,
   213  			Command:         getEtcdCommand(cfg, endpoint, nodeName, initialCluster),
   214  			Image:           images.GetEtcdImage(cfg),
   215  			ImagePullPolicy: v1.PullIfNotPresent,
   216  			// Mount the etcd datadir path read-write so etcd can store data in a more persistent manner
   217  			VolumeMounts: []v1.VolumeMount{
   218  				staticpodutil.NewVolumeMount(etcdVolumeName, cfg.Etcd.Local.DataDir, false),
   219  				staticpodutil.NewVolumeMount(certsVolumeName, cfg.CertificatesDir+"/etcd", false),
   220  			},
   221  			Resources: v1.ResourceRequirements{
   222  				Requests: v1.ResourceList{
   223  					v1.ResourceCPU:    resource.MustParse("100m"),
   224  					v1.ResourceMemory: resource.MustParse("100Mi"),
   225  				},
   226  			},
   227  			LivenessProbe: staticpodutil.LivenessProbe(probeHostname, "/health?exclude=NOSPACE&serializable=true", probePort, probeScheme),
   228  			StartupProbe:  staticpodutil.StartupProbe(probeHostname, "/health?serializable=false", probePort, probeScheme, componentHealthCheckTimeout),
   229  			Env:           kubeadmutil.MergeKubeadmEnvVars(cfg.Etcd.Local.ExtraEnvs),
   230  		},
   231  		etcdMounts,
   232  		// etcd will listen on the advertise address of the API server, in a different port (2379)
   233  		map[string]string{kubeadmconstants.EtcdAdvertiseClientUrlsAnnotationKey: etcdutil.GetClientURL(endpoint)},
   234  	)
   235  }
   236  
   237  // getEtcdCommand builds the right etcd command from the given config object
   238  func getEtcdCommand(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member) []string {
   239  	// localhost IP family should be the same that the AdvertiseAddress
   240  	etcdLocalhostAddress := "127.0.0.1"
   241  	if utilsnet.IsIPv6String(endpoint.AdvertiseAddress) {
   242  		etcdLocalhostAddress = "::1"
   243  	}
   244  	defaultArguments := []kubeadmapi.Arg{
   245  		{Name: "name", Value: nodeName},
   246  		// TODO: start using --initial-corrupt-check once the graduated flag is available,
   247  		// https://github.com/kubernetes/kubeadm/issues/2676
   248  		{Name: "experimental-initial-corrupt-check", Value: "true"},
   249  		{Name: "listen-client-urls", Value: fmt.Sprintf("%s,%s", etcdutil.GetClientURLByIP(etcdLocalhostAddress), etcdutil.GetClientURL(endpoint))},
   250  		{Name: "advertise-client-urls", Value: etcdutil.GetClientURL(endpoint)},
   251  		{Name: "listen-peer-urls", Value: etcdutil.GetPeerURL(endpoint)},
   252  		{Name: "initial-advertise-peer-urls", Value: etcdutil.GetPeerURL(endpoint)},
   253  		{Name: "data-dir", Value: cfg.Etcd.Local.DataDir},
   254  		{Name: "cert-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerCertName)},
   255  		{Name: "key-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerKeyName)},
   256  		{Name: "trusted-ca-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdCACertName)},
   257  		{Name: "client-cert-auth", Value: "true"},
   258  		{Name: "peer-cert-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdPeerCertName)},
   259  		{Name: "peer-key-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdPeerKeyName)},
   260  		{Name: "peer-trusted-ca-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdCACertName)},
   261  		{Name: "peer-client-cert-auth", Value: "true"},
   262  		{Name: "snapshot-count", Value: "10000"},
   263  		{Name: "listen-metrics-urls", Value: fmt.Sprintf("http://%s", net.JoinHostPort(etcdLocalhostAddress, strconv.Itoa(kubeadmconstants.EtcdMetricsPort)))},
   264  		{Name: "experimental-watch-progress-notify-interval", Value: "5s"},
   265  	}
   266  
   267  	if len(initialCluster) == 0 {
   268  		defaultArguments = kubeadmapi.SetArgValues(defaultArguments, "initial-cluster", fmt.Sprintf("%s=%s", nodeName, etcdutil.GetPeerURL(endpoint)), 1)
   269  	} else {
   270  		// NB. the joining etcd member should be part of the initialCluster list
   271  		endpoints := []string{}
   272  		for _, member := range initialCluster {
   273  			endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL))
   274  		}
   275  
   276  		defaultArguments = kubeadmapi.SetArgValues(defaultArguments, "initial-cluster", strings.Join(endpoints, ","), 1)
   277  		defaultArguments = kubeadmapi.SetArgValues(defaultArguments, "initial-cluster-state", "existing", 1)
   278  	}
   279  
   280  	command := []string{"etcd"}
   281  	command = append(command, kubeadmutil.ArgumentsToCommand(defaultArguments, cfg.Etcd.Local.ExtraArgs)...)
   282  	return command
   283  }
   284  
   285  func prepareAndWriteEtcdStaticPod(manifestDir string, patchesDir string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member, isDryRun bool) error {
   286  	// gets etcd StaticPodSpec, actualized for the current ClusterConfiguration and the new list of etcd members
   287  	spec := GetEtcdPodSpec(cfg, endpoint, nodeName, initialCluster)
   288  
   289  	var usersAndGroups *users.UsersAndGroups
   290  	var err error
   291  	if features.Enabled(cfg.FeatureGates, features.RootlessControlPlane) {
   292  		if isDryRun {
   293  			fmt.Printf("[etcd] Would create users and groups for %q to run as non-root\n", kubeadmconstants.Etcd)
   294  			fmt.Printf("[etcd] Would update static pod manifest for %q to run run as non-root\n", kubeadmconstants.Etcd)
   295  		} else {
   296  			usersAndGroups, err = staticpodutil.GetUsersAndGroups()
   297  			if err != nil {
   298  				return errors.Wrap(err, "failed to create users and groups")
   299  			}
   300  			// usersAndGroups is nil on non-linux.
   301  			if usersAndGroups != nil {
   302  				if err := staticpodutil.RunComponentAsNonRoot(kubeadmconstants.Etcd, &spec, usersAndGroups, cfg); err != nil {
   303  					return errors.Wrapf(err, "failed to run component %q as non-root", kubeadmconstants.Etcd)
   304  				}
   305  			}
   306  		}
   307  	}
   308  
   309  	// if patchesDir is defined, patch the static Pod manifest
   310  	if patchesDir != "" {
   311  		patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
   312  		if err != nil {
   313  			return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", kubeadmconstants.Etcd)
   314  		}
   315  		spec = *patchedSpec
   316  	}
   317  
   318  	// writes etcd StaticPod to disk
   319  	if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
   320  		return err
   321  	}
   322  
   323  	return nil
   324  }
   325  

View as plain text