
Source file src/k8s.io/kubernetes/cmd/kubeadm/app/util/config/cluster.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/util/config

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package config
    19  import (
    20  	"context"
    21  	"crypto/x509"
    22  	"fmt"
    23  	"path/filepath"
    24  	"strings"
    25  	"time"
    27  	"github.com/pkg/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	errorsutil "k8s.io/apimachinery/pkg/util/errors"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	"k8s.io/client-go/tools/clientcmd"
    35  	certutil "k8s.io/client-go/util/cert"
    36  	"k8s.io/klog/v2"
    38  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    39  	kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme"
    40  	kubeadmapiv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
    41  	"k8s.io/kubernetes/cmd/kubeadm/app/componentconfigs"
    42  	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
    43  	"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
    44  	"k8s.io/kubernetes/cmd/kubeadm/app/util/config/strict"
    45  	"k8s.io/kubernetes/cmd/kubeadm/app/util/output"
    46  )
    48  // FetchInitConfigurationFromCluster fetches configuration from a ConfigMap in the cluster
    49  func FetchInitConfigurationFromCluster(client clientset.Interface, printer output.Printer, logPrefix string, newControlPlane, skipComponentConfigs bool) (*kubeadmapi.InitConfiguration, error) {
    50  	if printer == nil {
    51  		printer = &output.TextPrinter{}
    52  	}
    53  	printer.Printf("[%s] Reading configuration from the cluster...\n", logPrefix)
    54  	printer.Printf("[%s] FYI: You can look at this config file with 'kubectl -n %s get cm %s -o yaml'\n", logPrefix, metav1.NamespaceSystem, constants.KubeadmConfigConfigMap)
    56  	// Fetch the actual config from cluster
    57  	cfg, err := getInitConfigurationFromCluster(constants.KubernetesDir, client, newControlPlane, skipComponentConfigs)
    58  	if err != nil {
    59  		return nil, err
    60  	}
    62  	// Apply dynamic defaults
    63  	// NB. skip CRI detection here because it won't be used at all and will be overridden later
    64  	if err := SetInitDynamicDefaults(cfg, true); err != nil {
    65  		return nil, err
    66  	}
    68  	return cfg, nil
    69  }
    71  // getInitConfigurationFromCluster is separate only for testing purposes, don't call it directly, use FetchInitConfigurationFromCluster instead
    72  func getInitConfigurationFromCluster(kubeconfigDir string, client clientset.Interface, newControlPlane, skipComponentConfigs bool) (*kubeadmapi.InitConfiguration, error) {
    73  	// Also, the config map really should be KubeadmConfigConfigMap...
    74  	configMap, err := apiclient.GetConfigMapWithShortRetry(client, metav1.NamespaceSystem, constants.KubeadmConfigConfigMap)
    75  	if err != nil {
    76  		return nil, errors.Wrap(err, "failed to get config map")
    77  	}
    79  	// Take an empty versioned InitConfiguration, statically default it and convert it to the internal type
    80  	versionedInitcfg := &kubeadmapiv1.InitConfiguration{}
    81  	kubeadmscheme.Scheme.Default(versionedInitcfg)
    82  	initcfg := &kubeadmapi.InitConfiguration{}
    83  	if err := kubeadmscheme.Scheme.Convert(versionedInitcfg, initcfg, nil); err != nil {
    84  		return nil, errors.Wrap(err, "could not prepare a defaulted InitConfiguration")
    85  	}
    87  	// gets ClusterConfiguration from kubeadm-config
    88  	clusterConfigurationData, ok := configMap.Data[constants.ClusterConfigurationConfigMapKey]
    89  	if !ok {
    90  		return nil, errors.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", constants.ClusterConfigurationConfigMapKey)
    91  	}
    92  	// If ClusterConfiguration was patched by something other than kubeadm, it may have errors. Warn about them.
    93  	if err := strict.VerifyUnmarshalStrict([]*runtime.Scheme{kubeadmscheme.Scheme},
    94  		kubeadmapiv1.SchemeGroupVersion.WithKind(constants.ClusterConfigurationKind),
    95  		[]byte(clusterConfigurationData)); err != nil {
    96  		klog.Warning(err.Error())
    97  	}
    98  	if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterConfigurationData), &initcfg.ClusterConfiguration); err != nil {
    99  		return nil, errors.Wrap(err, "failed to decode cluster configuration data")
   100  	}
   102  	if !skipComponentConfigs {
   103  		// get the component configs from the corresponding config maps
   104  		if err := componentconfigs.FetchFromCluster(&initcfg.ClusterConfiguration, client); err != nil {
   105  			return nil, errors.Wrap(err, "failed to get component configs")
   106  		}
   107  	}
   109  	// if this isn't a new controlplane instance (e.g. in case of kubeadm upgrades)
   110  	// get nodes specific information as well
   111  	if !newControlPlane {
   112  		// gets the nodeRegistration for the current from the node object
   113  		kubeconfigFile := filepath.Join(kubeconfigDir, constants.KubeletKubeConfigFileName)
   114  		if err := GetNodeRegistration(kubeconfigFile, client, &initcfg.NodeRegistration); err != nil {
   115  			return nil, errors.Wrap(err, "failed to get node registration")
   116  		}
   117  		// gets the APIEndpoint for the current node
   118  		if err := getAPIEndpoint(client, initcfg.NodeRegistration.Name, &initcfg.LocalAPIEndpoint); err != nil {
   119  			return nil, errors.Wrap(err, "failed to getAPIEndpoint")
   120  		}
   121  	}
   122  	return initcfg, nil
   123  }
   125  // GetNodeRegistration returns the nodeRegistration for the current node
   126  func GetNodeRegistration(kubeconfigFile string, client clientset.Interface, nodeRegistration *kubeadmapi.NodeRegistrationOptions) error {
   127  	// gets the name of the current node
   128  	nodeName, err := getNodeNameFromKubeletConfig(kubeconfigFile)
   129  	if err != nil {
   130  		return errors.Wrap(err, "failed to get node name from kubelet config")
   131  	}
   133  	// gets the corresponding node and retrieves attributes stored there.
   134  	node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
   135  	if err != nil {
   136  		return errors.Wrap(err, "failed to get corresponding node")
   137  	}
   139  	criSocket, ok := node.ObjectMeta.Annotations[constants.AnnotationKubeadmCRISocket]
   140  	if !ok {
   141  		return errors.Errorf("node %s doesn't have %s annotation", nodeName, constants.AnnotationKubeadmCRISocket)
   142  	}
   144  	// returns the nodeRegistration attributes
   145  	nodeRegistration.Name = nodeName
   146  	nodeRegistration.CRISocket = criSocket
   147  	nodeRegistration.Taints = node.Spec.Taints
   148  	// NB. currently nodeRegistration.KubeletExtraArgs isn't stored at node level but only in the kubeadm-flags.env
   149  	//     that isn't modified during upgrades
   150  	//     in future we might reconsider this thus enabling changes to the kubeadm-flags.env during upgrades as well
   151  	return nil
   152  }
   154  // getNodeNameFromKubeletConfig gets the node name from a kubelet config file
   155  // TODO: in future we want to switch to a more canonical way for doing this e.g. by having this
   156  // information in the local kubelet config.yaml
   157  func getNodeNameFromKubeletConfig(fileName string) (string, error) {
   158  	// loads the kubelet.conf file
   159  	config, err := clientcmd.LoadFromFile(fileName)
   160  	if err != nil {
   161  		return "", err
   162  	}
   164  	// gets the info about the current user
   165  	currentContext, exists := config.Contexts[config.CurrentContext]
   166  	if !exists {
   167  		return "", errors.Errorf("invalid kubeconfig file %s: missing context %s", fileName, config.CurrentContext)
   168  	}
   169  	authInfo, exists := config.AuthInfos[currentContext.AuthInfo]
   170  	if !exists {
   171  		return "", errors.Errorf("invalid kubeconfig file %s: missing AuthInfo %s", fileName, currentContext.AuthInfo)
   172  	}
   174  	// gets the X509 certificate with current user credentials
   175  	var certs []*x509.Certificate
   176  	if len(authInfo.ClientCertificateData) > 0 {
   177  		// if the config file uses an embedded x509 certificate (e.g. kubelet.conf created by kubeadm), parse it
   178  		if certs, err = certutil.ParseCertsPEM(authInfo.ClientCertificateData); err != nil {
   179  			return "", err
   180  		}
   181  	} else if len(authInfo.ClientCertificate) > 0 {
   182  		// if the config file links an external x509 certificate (e.g. kubelet.conf created by TLS bootstrap), load it
   183  		if certs, err = certutil.CertsFromFile(authInfo.ClientCertificate); err != nil {
   184  			return "", err
   185  		}
   186  	} else {
   187  		return "", errors.Errorf("invalid kubeconfig file %s. x509 certificate expected", fileName)
   188  	}
   190  	// Safely pick the first one because the sender's certificate must come first in the list.
   191  	// For details, see: https://www.rfc-editor.org/rfc/rfc4346#section-7.4.2
   192  	cert := certs[0]
   194  	// gets the node name from the certificate common name
   195  	return strings.TrimPrefix(cert.Subject.CommonName, constants.NodesUserPrefix), nil
   196  }
   198  func getAPIEndpoint(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint) error {
   199  	return getAPIEndpointWithRetry(client, nodeName, apiEndpoint,
   200  		constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration)
   201  }
   203  func getAPIEndpointWithRetry(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint,
   204  	interval, timeout time.Duration) error {
   205  	var err error
   206  	var errs []error
   208  	if err = getAPIEndpointFromPodAnnotation(client, nodeName, apiEndpoint, interval, timeout); err == nil {
   209  		return nil
   210  	}
   211  	errs = append(errs, errors.WithMessagef(err, "could not retrieve API endpoints for node %q using pod annotations", nodeName))
   212  	return errorsutil.NewAggregate(errs)
   213  }
   215  func getAPIEndpointFromPodAnnotation(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint,
   216  	interval, timeout time.Duration) error {
   217  	var rawAPIEndpoint string
   218  	var lastErr error
   219  	// Let's tolerate some unexpected transient failures from the API server or load balancers. Also, if
   220  	// static pods were not yet mirrored into the API server we want to wait for this propagation.
   221  	err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true,
   222  		func(ctx context.Context) (bool, error) {
   223  			rawAPIEndpoint, lastErr = getRawAPIEndpointFromPodAnnotationWithoutRetry(ctx, client, nodeName)
   224  			return lastErr == nil, nil
   225  		})
   226  	if err != nil {
   227  		return err
   228  	}
   229  	parsedAPIEndpoint, err := kubeadmapi.APIEndpointFromString(rawAPIEndpoint)
   230  	if err != nil {
   231  		return errors.Wrapf(err, "could not parse API endpoint for node %q", nodeName)
   232  	}
   233  	*apiEndpoint = parsedAPIEndpoint
   234  	return nil
   235  }
   237  func getRawAPIEndpointFromPodAnnotationWithoutRetry(ctx context.Context, client clientset.Interface, nodeName string) (string, error) {
   238  	podList, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(
   239  		ctx,
   240  		metav1.ListOptions{
   241  			FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
   242  			LabelSelector: fmt.Sprintf("component=%s,tier=%s", constants.KubeAPIServer, constants.ControlPlaneTier),
   243  		},
   244  	)
   245  	if err != nil {
   246  		return "", errors.Wrap(err, "could not retrieve list of pods to determine api server endpoints")
   247  	}
   248  	if len(podList.Items) != 1 {
   249  		return "", errors.Errorf("API server pod for node name %q has %d entries, only one was expected", nodeName, len(podList.Items))
   250  	}
   251  	if apiServerEndpoint, ok := podList.Items[0].Annotations[constants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey]; ok {
   252  		return apiServerEndpoint, nil
   253  	}
   254  	return "", errors.Errorf("API server pod for node name %q hasn't got a %q annotation, cannot retrieve API endpoint", nodeName, constants.KubeAPIServerAdvertiseAddressEndpointAnnotationKey)
   255  }

View as plain text