...

Source file src/k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod/utils.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package staticpod
    18  
    19  import (
    20  	"bytes"
    21  	"crypto/md5"
    22  	"fmt"
    23  	"hash"
    24  	"io"
    25  	"math"
    26  	"net/url"
    27  	"os"
    28  	"sort"
    29  	"strings"
    30  	"sync"
    31  
    32  	"github.com/pkg/errors"
    33  	"github.com/pmezard/go-difflib/difflib"
    34  
    35  	v1 "k8s.io/api/core/v1"
    36  	"k8s.io/apimachinery/pkg/api/resource"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/util/dump"
    39  	"k8s.io/apimachinery/pkg/util/intstr"
    40  
    41  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    42  	kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
    43  	kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
    44  	"k8s.io/kubernetes/cmd/kubeadm/app/util/patches"
    45  	"k8s.io/kubernetes/cmd/kubeadm/app/util/users"
    46  )
    47  
    48  const (
    49  	// kubeControllerManagerBindAddressArg represents the bind-address argument of the kube-controller-manager configuration.
    50  	kubeControllerManagerBindAddressArg = "bind-address"
    51  
    52  	// kubeSchedulerBindAddressArg represents the bind-address argument of the kube-scheduler configuration.
    53  	kubeSchedulerBindAddressArg = "bind-address"
    54  )
    55  
    56  var (
    57  	usersAndGroups     *users.UsersAndGroups
    58  	usersAndGroupsOnce sync.Once
    59  )
    60  
    61  // ComponentPod returns a Pod object from the container, volume and annotations specifications
    62  func ComponentPod(container v1.Container, volumes map[string]v1.Volume, annotations map[string]string) v1.Pod {
    63  	// priority value for system-node-critical class
    64  	priority := int32(2000001000)
    65  	return v1.Pod{
    66  		TypeMeta: metav1.TypeMeta{
    67  			APIVersion: "v1",
    68  			Kind:       "Pod",
    69  		},
    70  		ObjectMeta: metav1.ObjectMeta{
    71  			Name:      container.Name,
    72  			Namespace: metav1.NamespaceSystem,
    73  			// The component and tier labels are useful for quickly identifying the control plane Pods when doing a .List()
    74  			// against Pods in the kube-system namespace. Can for example be used together with the WaitForPodsWithLabel function
    75  			Labels:      map[string]string{"component": container.Name, "tier": kubeadmconstants.ControlPlaneTier},
    76  			Annotations: annotations,
    77  		},
    78  		Spec: v1.PodSpec{
    79  			Containers:        []v1.Container{container},
    80  			Priority:          &priority,
    81  			PriorityClassName: "system-node-critical",
    82  			HostNetwork:       true,
    83  			Volumes:           VolumeMapToSlice(volumes),
    84  			SecurityContext: &v1.PodSecurityContext{
    85  				SeccompProfile: &v1.SeccompProfile{
    86  					Type: v1.SeccompProfileTypeRuntimeDefault,
    87  				},
    88  			},
    89  		},
    90  	}
    91  }
    92  
    93  // ComponentResources returns the v1.ResourceRequirements object needed for allocating a specified amount of the CPU
    94  func ComponentResources(cpu string) v1.ResourceRequirements {
    95  	return v1.ResourceRequirements{
    96  		Requests: v1.ResourceList{
    97  			v1.ResourceCPU: resource.MustParse(cpu),
    98  		},
    99  	}
   100  }
   101  
   102  // NewVolume creates a v1.Volume with a hostPath mount to the specified location
   103  func NewVolume(name, path string, pathType *v1.HostPathType) v1.Volume {
   104  	return v1.Volume{
   105  		Name: name,
   106  		VolumeSource: v1.VolumeSource{
   107  			HostPath: &v1.HostPathVolumeSource{
   108  				Path: path,
   109  				Type: pathType,
   110  			},
   111  		},
   112  	}
   113  }
   114  
   115  // NewVolumeMount creates a v1.VolumeMount to the specified location
   116  func NewVolumeMount(name, path string, readOnly bool) v1.VolumeMount {
   117  	return v1.VolumeMount{
   118  		Name:      name,
   119  		MountPath: path,
   120  		ReadOnly:  readOnly,
   121  	}
   122  }
   123  
   124  // VolumeMapToSlice returns a slice of volumes from a map's values
   125  func VolumeMapToSlice(volumes map[string]v1.Volume) []v1.Volume {
   126  	v := make([]v1.Volume, 0, len(volumes))
   127  
   128  	for _, vol := range volumes {
   129  		v = append(v, vol)
   130  	}
   131  
   132  	sort.Slice(v, func(i, j int) bool {
   133  		return strings.Compare(v[i].Name, v[j].Name) == -1
   134  	})
   135  
   136  	return v
   137  }
   138  
   139  // VolumeMountMapToSlice returns a slice of volumes from a map's values
   140  func VolumeMountMapToSlice(volumeMounts map[string]v1.VolumeMount) []v1.VolumeMount {
   141  	v := make([]v1.VolumeMount, 0, len(volumeMounts))
   142  
   143  	for _, volMount := range volumeMounts {
   144  		v = append(v, volMount)
   145  	}
   146  
   147  	sort.Slice(v, func(i, j int) bool {
   148  		return strings.Compare(v[i].Name, v[j].Name) == -1
   149  	})
   150  
   151  	return v
   152  }
   153  
   154  // GetExtraParameters builds a list of flag arguments two string-string maps, one with default, base commands and one with overrides
   155  func GetExtraParameters(overrides map[string]string, defaults map[string]string) []string {
   156  	var command []string
   157  	for k, v := range overrides {
   158  		if len(v) > 0 {
   159  			command = append(command, fmt.Sprintf("--%s=%s", k, v))
   160  		}
   161  	}
   162  	for k, v := range defaults {
   163  		if _, overrideExists := overrides[k]; !overrideExists {
   164  			command = append(command, fmt.Sprintf("--%s=%s", k, v))
   165  		}
   166  	}
   167  	return command
   168  }
   169  
   170  // PatchStaticPod applies patches stored in patchesDir to a static Pod.
   171  func PatchStaticPod(pod *v1.Pod, patchesDir string, output io.Writer) (*v1.Pod, error) {
   172  	// Marshal the Pod manifest into YAML.
   173  	podYAML, err := kubeadmutil.MarshalToYaml(pod, v1.SchemeGroupVersion)
   174  	if err != nil {
   175  		return pod, errors.Wrapf(err, "failed to marshal Pod manifest to YAML")
   176  	}
   177  
   178  	patchManager, err := patches.GetPatchManagerForPath(patchesDir, patches.KnownTargets(), output)
   179  	if err != nil {
   180  		return pod, err
   181  	}
   182  
   183  	patchTarget := &patches.PatchTarget{
   184  		Name:                      pod.Name,
   185  		StrategicMergePatchObject: v1.Pod{},
   186  		Data:                      podYAML,
   187  	}
   188  	if err := patchManager.ApplyPatchesToTarget(patchTarget); err != nil {
   189  		return pod, err
   190  	}
   191  
   192  	obj, err := kubeadmutil.UniversalUnmarshal(patchTarget.Data)
   193  	if err != nil {
   194  		return pod, errors.Wrap(err, "failed to unmarshal patched manifest")
   195  	}
   196  
   197  	pod2, ok := obj.(*v1.Pod)
   198  	if !ok {
   199  		return pod, errors.Wrap(err, "patched manifest is not a valid Pod object")
   200  	}
   201  
   202  	return pod2, nil
   203  }
   204  
   205  // WriteStaticPodToDisk writes a static pod file to disk
   206  func WriteStaticPodToDisk(componentName, manifestDir string, pod v1.Pod) error {
   207  
   208  	// creates target folder if not already exists
   209  	if err := os.MkdirAll(manifestDir, 0700); err != nil {
   210  		return errors.Wrapf(err, "failed to create directory %q", manifestDir)
   211  	}
   212  
   213  	// writes the pod to disk
   214  	serialized, err := kubeadmutil.MarshalToYaml(&pod, v1.SchemeGroupVersion)
   215  	if err != nil {
   216  		return errors.Wrapf(err, "failed to marshal manifest for %q to YAML", componentName)
   217  	}
   218  
   219  	filename := kubeadmconstants.GetStaticPodFilepath(componentName, manifestDir)
   220  
   221  	if err := os.WriteFile(filename, serialized, 0600); err != nil {
   222  		return errors.Wrapf(err, "failed to write static pod manifest file for %q (%q)", componentName, filename)
   223  	}
   224  
   225  	return nil
   226  }
   227  
   228  // ReadStaticPodFromDisk reads a static pod file from disk
   229  func ReadStaticPodFromDisk(manifestPath string) (*v1.Pod, error) {
   230  	buf, err := os.ReadFile(manifestPath)
   231  	if err != nil {
   232  		return &v1.Pod{}, errors.Wrapf(err, "failed to read manifest for %q", manifestPath)
   233  	}
   234  
   235  	obj, err := kubeadmutil.UniversalUnmarshal(buf)
   236  	if err != nil {
   237  		return &v1.Pod{}, errors.Errorf("failed to unmarshal manifest for %q: %v", manifestPath, err)
   238  	}
   239  
   240  	pod, ok := obj.(*v1.Pod)
   241  	if !ok {
   242  		return &v1.Pod{}, errors.Errorf("failed to parse Pod object defined in %q", manifestPath)
   243  	}
   244  
   245  	return pod, nil
   246  }
   247  
   248  // LivenessProbe creates a Probe object with a HTTPGet handler
   249  func LivenessProbe(host, path string, port int32, scheme v1.URIScheme) *v1.Probe {
   250  	// sets initialDelaySeconds same as periodSeconds to skip one period before running a check
   251  	return createHTTPProbe(host, path, port, scheme, 10, 15, 8, 10)
   252  }
   253  
   254  // ReadinessProbe creates a Probe object with a HTTPGet handler
   255  func ReadinessProbe(host, path string, port int32, scheme v1.URIScheme) *v1.Probe {
   256  	// sets initialDelaySeconds as '0' because we don't want to delay user infrastructure checks
   257  	// looking for "ready" status on kubeadm static Pods
   258  	return createHTTPProbe(host, path, port, scheme, 0, 15, 3, 1)
   259  }
   260  
   261  // StartupProbe creates a Probe object with a HTTPGet handler
   262  func StartupProbe(host, path string, port int32, scheme v1.URIScheme, timeoutForControlPlane *metav1.Duration) *v1.Probe {
   263  	periodSeconds, timeoutForControlPlaneSeconds := int32(10), kubeadmconstants.ControlPlaneComponentHealthCheckTimeout.Seconds()
   264  	if timeoutForControlPlane != nil {
   265  		timeoutForControlPlaneSeconds = timeoutForControlPlane.Seconds()
   266  	}
   267  	// sets failureThreshold big enough to guarantee the full timeout can cover the worst case scenario for the control-plane to come alive
   268  	// we ignore initialDelaySeconds in the calculation here for simplicity
   269  	failureThreshold := int32(math.Ceil(timeoutForControlPlaneSeconds / float64(periodSeconds)))
   270  	// sets initialDelaySeconds same as periodSeconds to skip one period before running a check
   271  	return createHTTPProbe(host, path, port, scheme, periodSeconds, 15, failureThreshold, periodSeconds)
   272  }
   273  
   274  func createHTTPProbe(host, path string, port int32, scheme v1.URIScheme, initialDelaySeconds, timeoutSeconds, failureThreshold, periodSeconds int32) *v1.Probe {
   275  	return &v1.Probe{
   276  		ProbeHandler: v1.ProbeHandler{
   277  			HTTPGet: &v1.HTTPGetAction{
   278  				Host:   host,
   279  				Path:   path,
   280  				Port:   intstr.FromInt32(port),
   281  				Scheme: scheme,
   282  			},
   283  		},
   284  		InitialDelaySeconds: initialDelaySeconds,
   285  		TimeoutSeconds:      timeoutSeconds,
   286  		FailureThreshold:    failureThreshold,
   287  		PeriodSeconds:       periodSeconds,
   288  	}
   289  }
   290  
   291  // GetAPIServerProbeAddress returns the probe address for the API server
   292  func GetAPIServerProbeAddress(endpoint *kubeadmapi.APIEndpoint) string {
   293  	if endpoint != nil && endpoint.AdvertiseAddress != "" {
   294  		return getProbeAddress(endpoint.AdvertiseAddress)
   295  	}
   296  
   297  	return "127.0.0.1"
   298  }
   299  
   300  // GetControllerManagerProbeAddress returns the kubernetes controller manager probe address
   301  func GetControllerManagerProbeAddress(cfg *kubeadmapi.ClusterConfiguration) string {
   302  	if addr, idx := kubeadmapi.GetArgValue(cfg.ControllerManager.ExtraArgs, kubeControllerManagerBindAddressArg, -1); idx > -1 {
   303  		return getProbeAddress(addr)
   304  	}
   305  	return "127.0.0.1"
   306  }
   307  
   308  // GetSchedulerProbeAddress returns the kubernetes scheduler probe address
   309  func GetSchedulerProbeAddress(cfg *kubeadmapi.ClusterConfiguration) string {
   310  	if addr, idx := kubeadmapi.GetArgValue(cfg.Scheduler.ExtraArgs, kubeSchedulerBindAddressArg, -1); idx > -1 {
   311  		return getProbeAddress(addr)
   312  	}
   313  	return "127.0.0.1"
   314  }
   315  
   316  // GetEtcdProbeEndpoint takes a kubeadm Etcd configuration object and attempts to parse
   317  // the first URL in the listen-metrics-urls argument, returning an etcd probe hostname,
   318  // port and scheme
   319  func GetEtcdProbeEndpoint(cfg *kubeadmapi.Etcd, isIPv6 bool) (string, int32, v1.URIScheme) {
   320  	localhost := "127.0.0.1"
   321  	if isIPv6 {
   322  		localhost = "::1"
   323  	}
   324  	if cfg.Local == nil || cfg.Local.ExtraArgs == nil {
   325  		return localhost, kubeadmconstants.EtcdMetricsPort, v1.URISchemeHTTP
   326  	}
   327  	if arg, idx := kubeadmapi.GetArgValue(cfg.Local.ExtraArgs, "listen-metrics-urls", -1); idx > -1 {
   328  		// Use the first url in the listen-metrics-urls if multiple URL's are specified.
   329  		arg = strings.Split(arg, ",")[0]
   330  		parsedURL, err := url.Parse(arg)
   331  		if err != nil {
   332  			return localhost, kubeadmconstants.EtcdMetricsPort, v1.URISchemeHTTP
   333  		}
   334  		// Parse scheme
   335  		scheme := v1.URISchemeHTTP
   336  		if parsedURL.Scheme == "https" {
   337  			scheme = v1.URISchemeHTTPS
   338  		}
   339  		// Parse hostname
   340  		hostname := parsedURL.Hostname()
   341  		if len(hostname) == 0 {
   342  			hostname = localhost
   343  		}
   344  		// Parse port
   345  		port := kubeadmconstants.EtcdMetricsPort
   346  		portStr := parsedURL.Port()
   347  		if len(portStr) != 0 {
   348  			p, err := kubeadmutil.ParsePort(portStr)
   349  			if err == nil {
   350  				port = p
   351  			}
   352  		}
   353  		return hostname, int32(port), scheme
   354  	}
   355  	return localhost, kubeadmconstants.EtcdMetricsPort, v1.URISchemeHTTP
   356  }
   357  
   358  // ManifestFilesAreEqual compares 2 files. It returns true if their contents are equal, false otherwise
   359  func ManifestFilesAreEqual(path1, path2 string) (bool, string, error) {
   360  	pod1, err := ReadStaticPodFromDisk(path1)
   361  	if err != nil {
   362  		return false, "", err
   363  	}
   364  	pod2, err := ReadStaticPodFromDisk(path2)
   365  	if err != nil {
   366  		return false, "", err
   367  	}
   368  
   369  	hasher := md5.New()
   370  	DeepHashObject(hasher, pod1)
   371  	hash1 := hasher.Sum(nil)[0:]
   372  	DeepHashObject(hasher, pod2)
   373  	hash2 := hasher.Sum(nil)[0:]
   374  	if bytes.Equal(hash1, hash2) {
   375  		return true, "", nil
   376  	}
   377  
   378  	manifest1, err := kubeadmutil.MarshalToYaml(pod1, v1.SchemeGroupVersion)
   379  	if err != nil {
   380  		return false, "", errors.Wrapf(err, "failed to marshal Pod manifest for %q to YAML", path1)
   381  	}
   382  
   383  	manifest2, err := kubeadmutil.MarshalToYaml(pod2, v1.SchemeGroupVersion)
   384  	if err != nil {
   385  		return false, "", errors.Wrapf(err, "failed to marshal Pod manifest for %q to YAML", path2)
   386  	}
   387  
   388  	diff := difflib.UnifiedDiff{
   389  		A: difflib.SplitLines(string(manifest1)),
   390  		B: difflib.SplitLines(string(manifest2)),
   391  	}
   392  
   393  	diffStr, err := difflib.GetUnifiedDiffString(diff)
   394  	if err != nil {
   395  		return false, "", errors.Wrapf(err, "failed to generate the differences between manifest %q and manifest %q", path1, path2)
   396  	}
   397  
   398  	return false, diffStr, nil
   399  }
   400  
   401  // getProbeAddress returns a valid probe address.
   402  // Kubeadm uses the bind-address to configure the probe address. It's common to use the
   403  // unspecified address "0.0.0.0" or "::" as bind-address when we want to listen in all interfaces,
   404  // however this address can't be used as probe #86504.
   405  // If the address is an unspecified address getProbeAddress returns empty,
   406  // that means that kubelet will use the PodIP as probe address.
   407  func getProbeAddress(addr string) string {
   408  	if addr == "0.0.0.0" || addr == "::" {
   409  		return ""
   410  	}
   411  	return addr
   412  }
   413  
   414  // GetUsersAndGroups returns the local usersAndGroups, but first creates it
   415  // in a thread safe way once.
   416  func GetUsersAndGroups() (*users.UsersAndGroups, error) {
   417  	var err error
   418  	usersAndGroupsOnce.Do(func() {
   419  		usersAndGroups, err = users.AddUsersAndGroups()
   420  	})
   421  	return usersAndGroups, err
   422  }
   423  
   424  // DeepHashObject writes specified object to hash using the spew library
   425  // which follows pointers and prints actual values of the nested objects
   426  // ensuring the hash does not change when a pointer changes.
   427  // Copied from k8s.io/kubernetes/pkg/util/hash/hash.go#DeepHashObject
   428  func DeepHashObject(hasher hash.Hash, objectToWrite interface{}) {
   429  	hasher.Reset()
   430  	fmt.Fprintf(hasher, "%v", dump.ForHash(objectToWrite))
   431  }
   432  

View as plain text