...

Source file src/k8s.io/kubernetes/pkg/kubelet/nodestatus/setters.go

Documentation: k8s.io/kubernetes/pkg/kubelet/nodestatus

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nodestatus
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"net"
    24  	goruntime "runtime"
    25  	"strings"
    26  	"time"
    27  
    28  	cadvisorapiv1 "github.com/google/cadvisor/info/v1"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/resource"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/util/errors"
    34  	utilnet "k8s.io/apimachinery/pkg/util/net"
    35  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    36  	cloudprovider "k8s.io/cloud-provider"
    37  	cloudproviderapi "k8s.io/cloud-provider/api"
    38  	cloudprovidernodeutil "k8s.io/cloud-provider/node/helpers"
    39  	"k8s.io/component-base/version"
    40  	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    41  	"k8s.io/kubernetes/pkg/features"
    42  	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
    43  	"k8s.io/kubernetes/pkg/kubelet/cm"
    44  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    45  	"k8s.io/kubernetes/pkg/kubelet/events"
    46  	"k8s.io/kubernetes/pkg/volume"
    47  	netutils "k8s.io/utils/net"
    48  
    49  	"k8s.io/klog/v2"
    50  )
    51  
    52  const (
    53  	// MaxNamesPerImageInNodeStatus is max number of names
    54  	// per image stored in the node status.
    55  	MaxNamesPerImageInNodeStatus = 5
    56  )
    57  
    58  // Setter modifies the node in-place, and returns an error if the modification failed.
    59  // Setters may partially mutate the node before returning an error.
    60  type Setter func(ctx context.Context, node *v1.Node) error
    61  
    62  // NodeAddress returns a Setter that updates address-related information on the node.
    63  func NodeAddress(nodeIPs []net.IP, // typically Kubelet.nodeIPs
    64  	validateNodeIPFunc func(net.IP) error, // typically Kubelet.nodeIPValidator
    65  	hostname string, // typically Kubelet.hostname
    66  	hostnameOverridden bool, // was the hostname force set?
    67  	externalCloudProvider bool, // typically Kubelet.externalCloudProvider
    68  	cloud cloudprovider.Interface, // typically Kubelet.cloud
    69  	nodeAddressesFunc func() ([]v1.NodeAddress, error), // typically Kubelet.cloudResourceSyncManager.NodeAddresses
    70  ) Setter {
    71  	var nodeIP, secondaryNodeIP net.IP
    72  	if len(nodeIPs) > 0 {
    73  		nodeIP = nodeIPs[0]
    74  	}
    75  	preferIPv4 := nodeIP == nil || nodeIP.To4() != nil
    76  	isPreferredIPFamily := func(ip net.IP) bool { return (ip.To4() != nil) == preferIPv4 }
    77  	nodeIPSpecified := nodeIP != nil && !nodeIP.IsUnspecified()
    78  
    79  	if len(nodeIPs) > 1 {
    80  		secondaryNodeIP = nodeIPs[1]
    81  	}
    82  	secondaryNodeIPSpecified := secondaryNodeIP != nil && !secondaryNodeIP.IsUnspecified()
    83  
    84  	return func(ctx context.Context, node *v1.Node) error {
    85  		if nodeIPSpecified {
    86  			if err := validateNodeIPFunc(nodeIP); err != nil {
    87  				return fmt.Errorf("failed to validate nodeIP: %v", err)
    88  			}
    89  			klog.V(4).InfoS("Using node IP", "IP", nodeIP.String())
    90  		}
    91  		if secondaryNodeIPSpecified {
    92  			if err := validateNodeIPFunc(secondaryNodeIP); err != nil {
    93  				return fmt.Errorf("failed to validate secondaryNodeIP: %v", err)
    94  			}
    95  			klog.V(4).InfoS("Using secondary node IP", "IP", secondaryNodeIP.String())
    96  		}
    97  
    98  		if (externalCloudProvider || cloud != nil) && nodeIPSpecified {
    99  			// Annotate the Node object with nodeIP for external cloud provider.
   100  			//
   101  			// We do this even when external CCM is not configured to cover a situation
   102  			// during migration from legacy to external CCM: when CCM is running the
   103  			// node controller in the cluster but kubelet is still running the in-tree
   104  			// provider. Adding this annotation in all cases ensures that while
   105  			// Addresses flap between the competing controllers, they at least flap
   106  			// consistently.
   107  			//
   108  			// We do not add the annotation in the case where there is no cloud
   109  			// controller at all, as we don't expect to migrate these clusters to use an
   110  			// external CCM.
   111  			if node.ObjectMeta.Annotations == nil {
   112  				node.ObjectMeta.Annotations = make(map[string]string)
   113  			}
   114  			annotation := nodeIP.String()
   115  			if secondaryNodeIPSpecified {
   116  				annotation += "," + secondaryNodeIP.String()
   117  			}
   118  			node.ObjectMeta.Annotations[cloudproviderapi.AnnotationAlphaProvidedIPAddr] = annotation
   119  		} else if node.ObjectMeta.Annotations != nil {
   120  			// Clean up stale annotations if no longer using a cloud provider or
   121  			// no longer overriding node IP.
   122  			delete(node.ObjectMeta.Annotations, cloudproviderapi.AnnotationAlphaProvidedIPAddr)
   123  		}
   124  
   125  		if externalCloudProvider {
   126  			// If --cloud-provider=external and node address is already set,
   127  			// then we return early because provider set addresses should take precedence.
   128  			// Otherwise, we try to use the node IP defined via flags and let the cloud provider override it later
   129  			// This should alleviate a lot of the bootstrapping issues with out-of-tree providers
   130  			if len(node.Status.Addresses) > 0 {
   131  				return nil
   132  			}
   133  			// If nodeIPs are not specified wait for the external cloud-provider to set the node addresses.
   134  			// Otherwise uses them on the assumption that the installer/administrator has the previous knowledge
   135  			// required to ensure the external cloud provider will use the same addresses to avoid the issues explained
   136  			// in https://github.com/kubernetes/kubernetes/issues/120720.
   137  			// We are already hinting the external cloud provider via the annotation AnnotationAlphaProvidedIPAddr.
   138  			if !nodeIPSpecified {
   139  				return nil
   140  			}
   141  		}
   142  		if cloud != nil {
   143  			cloudNodeAddresses, err := nodeAddressesFunc()
   144  			if err != nil {
   145  				return err
   146  			}
   147  
   148  			nodeAddresses, err := cloudprovidernodeutil.GetNodeAddressesFromNodeIPLegacy(nodeIP, cloudNodeAddresses)
   149  			if err != nil {
   150  				return err
   151  			}
   152  
   153  			switch {
   154  			case len(cloudNodeAddresses) == 0:
   155  				// the cloud provider didn't specify any addresses
   156  				nodeAddresses = append(nodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname})
   157  
   158  			case !hasAddressType(cloudNodeAddresses, v1.NodeHostName) && hasAddressValue(cloudNodeAddresses, hostname):
   159  				// the cloud provider didn't specify an address of type Hostname,
   160  				// but the auto-detected hostname matched an address reported by the cloud provider,
   161  				// so we can add it and count on the value being verifiable via cloud provider metadata
   162  				nodeAddresses = append(nodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname})
   163  
   164  			case hostnameOverridden:
   165  				// the hostname was force-set via flag/config.
   166  				// this means the hostname might not be able to be validated via cloud provider metadata,
   167  				// but was a choice by the kubelet deployer we should honor
   168  				var existingHostnameAddress *v1.NodeAddress
   169  				for i := range nodeAddresses {
   170  					if nodeAddresses[i].Type == v1.NodeHostName {
   171  						existingHostnameAddress = &nodeAddresses[i]
   172  						break
   173  					}
   174  				}
   175  
   176  				if existingHostnameAddress == nil {
   177  					// no existing Hostname address found, add it
   178  					klog.InfoS("Adding overridden hostname to cloudprovider-reported addresses", "hostname", hostname)
   179  					nodeAddresses = append(nodeAddresses, v1.NodeAddress{Type: v1.NodeHostName, Address: hostname})
   180  				} else if existingHostnameAddress.Address != hostname {
   181  					// override the Hostname address reported by the cloud provider
   182  					klog.InfoS("Replacing cloudprovider-reported hostname with overridden hostname", "cloudProviderHostname", existingHostnameAddress.Address, "overriddenHostname", hostname)
   183  					existingHostnameAddress.Address = hostname
   184  				}
   185  			}
   186  			node.Status.Addresses = nodeAddresses
   187  		} else if nodeIPSpecified && secondaryNodeIPSpecified {
   188  			node.Status.Addresses = []v1.NodeAddress{
   189  				{Type: v1.NodeInternalIP, Address: nodeIP.String()},
   190  				{Type: v1.NodeInternalIP, Address: secondaryNodeIP.String()},
   191  				{Type: v1.NodeHostName, Address: hostname},
   192  			}
   193  		} else {
   194  			var ipAddr net.IP
   195  			var err error
   196  
   197  			// 1) Use nodeIP if set (and not "0.0.0.0"/"::")
   198  			// 2) If the user has specified an IP to HostnameOverride, use it
   199  			// 3) Lookup the IP from node name by DNS
   200  			// 4) Try to get the IP from the network interface used as default gateway
   201  			//
   202  			// For steps 3 and 4, IPv4 addresses are preferred to IPv6 addresses
   203  			// unless nodeIP is "::", in which case it is reversed.
   204  			if nodeIPSpecified {
   205  				ipAddr = nodeIP
   206  			} else if addr := netutils.ParseIPSloppy(hostname); addr != nil {
   207  				ipAddr = addr
   208  			} else {
   209  				var addrs []net.IP
   210  				addrs, _ = net.LookupIP(node.Name)
   211  				for _, addr := range addrs {
   212  					if err = validateNodeIPFunc(addr); err == nil {
   213  						if isPreferredIPFamily(addr) {
   214  							ipAddr = addr
   215  							break
   216  						} else if ipAddr == nil {
   217  							ipAddr = addr
   218  						}
   219  					}
   220  				}
   221  
   222  				if ipAddr == nil {
   223  					ipAddr, err = utilnet.ResolveBindAddress(nodeIP)
   224  				}
   225  			}
   226  
   227  			if ipAddr == nil {
   228  				// We tried everything we could, but the IP address wasn't fetchable; error out
   229  				return fmt.Errorf("can't get ip address of node %s. error: %v", node.Name, err)
   230  			}
   231  			node.Status.Addresses = []v1.NodeAddress{
   232  				{Type: v1.NodeInternalIP, Address: ipAddr.String()},
   233  				{Type: v1.NodeHostName, Address: hostname},
   234  			}
   235  		}
   236  		return nil
   237  	}
   238  }
   239  
   240  func hasAddressType(addresses []v1.NodeAddress, addressType v1.NodeAddressType) bool {
   241  	for _, address := range addresses {
   242  		if address.Type == addressType {
   243  			return true
   244  		}
   245  	}
   246  	return false
   247  }
   248  func hasAddressValue(addresses []v1.NodeAddress, addressValue string) bool {
   249  	for _, address := range addresses {
   250  		if address.Address == addressValue {
   251  			return true
   252  		}
   253  	}
   254  	return false
   255  }
   256  
   257  // MachineInfo returns a Setter that updates machine-related information on the node.
   258  func MachineInfo(nodeName string,
   259  	maxPods int,
   260  	podsPerCore int,
   261  	machineInfoFunc func() (*cadvisorapiv1.MachineInfo, error), // typically Kubelet.GetCachedMachineInfo
   262  	capacityFunc func(localStorageCapacityIsolation bool) v1.ResourceList, // typically Kubelet.containerManager.GetCapacity
   263  	devicePluginResourceCapacityFunc func() (v1.ResourceList, v1.ResourceList, []string), // typically Kubelet.containerManager.GetDevicePluginResourceCapacity
   264  	nodeAllocatableReservationFunc func() v1.ResourceList, // typically Kubelet.containerManager.GetNodeAllocatableReservation
   265  	recordEventFunc func(eventType, event, message string), // typically Kubelet.recordEvent
   266  	localStorageCapacityIsolation bool,
   267  ) Setter {
   268  	return func(ctx context.Context, node *v1.Node) error {
   269  		// Note: avoid blindly overwriting the capacity in case opaque
   270  		//       resources are being advertised.
   271  		if node.Status.Capacity == nil {
   272  			node.Status.Capacity = v1.ResourceList{}
   273  		}
   274  
   275  		var devicePluginAllocatable v1.ResourceList
   276  		var devicePluginCapacity v1.ResourceList
   277  		var removedDevicePlugins []string
   278  
   279  		// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
   280  		// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
   281  		info, err := machineInfoFunc()
   282  		if err != nil {
   283  			// TODO(roberthbailey): This is required for test-cmd.sh to pass.
   284  			// See if the test should be updated instead.
   285  			node.Status.Capacity[v1.ResourceCPU] = *resource.NewMilliQuantity(0, resource.DecimalSI)
   286  			node.Status.Capacity[v1.ResourceMemory] = resource.MustParse("0Gi")
   287  			node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(int64(maxPods), resource.DecimalSI)
   288  			klog.ErrorS(err, "Error getting machine info")
   289  		} else {
   290  			node.Status.NodeInfo.MachineID = info.MachineID
   291  			node.Status.NodeInfo.SystemUUID = info.SystemUUID
   292  
   293  			for rName, rCap := range cadvisor.CapacityFromMachineInfo(info) {
   294  				node.Status.Capacity[rName] = rCap
   295  			}
   296  
   297  			if podsPerCore > 0 {
   298  				node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
   299  					int64(math.Min(float64(info.NumCores*podsPerCore), float64(maxPods))), resource.DecimalSI)
   300  			} else {
   301  				node.Status.Capacity[v1.ResourcePods] = *resource.NewQuantity(
   302  					int64(maxPods), resource.DecimalSI)
   303  			}
   304  
   305  			if node.Status.NodeInfo.BootID != "" &&
   306  				node.Status.NodeInfo.BootID != info.BootID {
   307  				// TODO: This requires a transaction, either both node status is updated
   308  				// and event is recorded or neither should happen, see issue #6055.
   309  				recordEventFunc(v1.EventTypeWarning, events.NodeRebooted,
   310  					fmt.Sprintf("Node %s has been rebooted, boot id: %s", nodeName, info.BootID))
   311  			}
   312  			node.Status.NodeInfo.BootID = info.BootID
   313  
   314  			// TODO: all the node resources should use ContainerManager.GetCapacity instead of deriving the
   315  			// capacity for every node status request
   316  			initialCapacity := capacityFunc(localStorageCapacityIsolation)
   317  			if initialCapacity != nil {
   318  				if v, exists := initialCapacity[v1.ResourceEphemeralStorage]; exists {
   319  					node.Status.Capacity[v1.ResourceEphemeralStorage] = v
   320  				}
   321  			}
   322  			//}
   323  
   324  			devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = devicePluginResourceCapacityFunc()
   325  			for k, v := range devicePluginCapacity {
   326  				if old, ok := node.Status.Capacity[k]; !ok || old.Value() != v.Value() {
   327  					klog.V(2).InfoS("Updated capacity for device plugin", "plugin", k, "capacity", v.Value())
   328  				}
   329  				node.Status.Capacity[k] = v
   330  			}
   331  
   332  			for _, removedResource := range removedDevicePlugins {
   333  				klog.V(2).InfoS("Set capacity for removed resource to 0 on device removal", "device", removedResource)
   334  				// Set the capacity of the removed resource to 0 instead of
   335  				// removing the resource from the node status. This is to indicate
   336  				// that the resource is managed by device plugin and had been
   337  				// registered before.
   338  				//
   339  				// This is required to differentiate the device plugin managed
   340  				// resources and the cluster-level resources, which are absent in
   341  				// node status.
   342  				node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
   343  			}
   344  		}
   345  
   346  		// Set Allocatable.
   347  		if node.Status.Allocatable == nil {
   348  			node.Status.Allocatable = make(v1.ResourceList)
   349  		}
   350  		// Remove extended resources from allocatable that are no longer
   351  		// present in capacity.
   352  		for k := range node.Status.Allocatable {
   353  			_, found := node.Status.Capacity[k]
   354  			if !found && v1helper.IsExtendedResourceName(k) {
   355  				delete(node.Status.Allocatable, k)
   356  			}
   357  		}
   358  		allocatableReservation := nodeAllocatableReservationFunc()
   359  		for k, v := range node.Status.Capacity {
   360  			value := v.DeepCopy()
   361  			if res, exists := allocatableReservation[k]; exists {
   362  				value.Sub(res)
   363  			}
   364  			if value.Sign() < 0 {
   365  				// Negative Allocatable resources don't make sense.
   366  				value.Set(0)
   367  			}
   368  			node.Status.Allocatable[k] = value
   369  		}
   370  
   371  		for k, v := range devicePluginAllocatable {
   372  			if old, ok := node.Status.Allocatable[k]; !ok || old.Value() != v.Value() {
   373  				klog.V(2).InfoS("Updated allocatable", "device", k, "allocatable", v.Value())
   374  			}
   375  			node.Status.Allocatable[k] = v
   376  		}
   377  		// for every huge page reservation, we need to remove it from allocatable memory
   378  		for k, v := range node.Status.Capacity {
   379  			if v1helper.IsHugePageResourceName(k) {
   380  				allocatableMemory := node.Status.Allocatable[v1.ResourceMemory]
   381  				value := v.DeepCopy()
   382  				allocatableMemory.Sub(value)
   383  				if allocatableMemory.Sign() < 0 {
   384  					// Negative Allocatable resources don't make sense.
   385  					allocatableMemory.Set(0)
   386  				}
   387  				node.Status.Allocatable[v1.ResourceMemory] = allocatableMemory
   388  			}
   389  		}
   390  		return nil
   391  	}
   392  }
   393  
   394  // VersionInfo returns a Setter that updates version-related information on the node.
   395  func VersionInfo(versionInfoFunc func() (*cadvisorapiv1.VersionInfo, error), // typically Kubelet.cadvisor.VersionInfo
   396  	runtimeTypeFunc func() string, // typically Kubelet.containerRuntime.Type
   397  	runtimeVersionFunc func(ctx context.Context) (kubecontainer.Version, error), // typically Kubelet.containerRuntime.Version
   398  ) Setter {
   399  	return func(ctx context.Context, node *v1.Node) error {
   400  		verinfo, err := versionInfoFunc()
   401  		if err != nil {
   402  			return fmt.Errorf("error getting version info: %v", err)
   403  		}
   404  
   405  		node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
   406  		node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion
   407  
   408  		runtimeVersion := "Unknown"
   409  		if runtimeVer, err := runtimeVersionFunc(ctx); err == nil {
   410  			runtimeVersion = runtimeVer.String()
   411  		}
   412  		node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", runtimeTypeFunc(), runtimeVersion)
   413  
   414  		node.Status.NodeInfo.KubeletVersion = version.Get().String()
   415  
   416  		if utilfeature.DefaultFeatureGate.Enabled(features.DisableNodeKubeProxyVersion) {
   417  			// This field is deprecated and should be cleared if it was previously set.
   418  			node.Status.NodeInfo.KubeProxyVersion = ""
   419  		} else {
   420  			node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
   421  		}
   422  
   423  		return nil
   424  	}
   425  }
   426  
   427  // DaemonEndpoints returns a Setter that updates the daemon endpoints on the node.
   428  func DaemonEndpoints(daemonEndpoints *v1.NodeDaemonEndpoints) Setter {
   429  	return func(ctx context.Context, node *v1.Node) error {
   430  		node.Status.DaemonEndpoints = *daemonEndpoints
   431  		return nil
   432  	}
   433  }
   434  
   435  // Images returns a Setter that updates the images on the node.
   436  // imageListFunc is expected to return a list of images sorted in descending order by image size.
   437  // nodeStatusMaxImages is ignored if set to -1.
   438  func Images(nodeStatusMaxImages int32,
   439  	imageListFunc func() ([]kubecontainer.Image, error), // typically Kubelet.imageManager.GetImageList
   440  ) Setter {
   441  	return func(ctx context.Context, node *v1.Node) error {
   442  		// Update image list of this node
   443  		var imagesOnNode []v1.ContainerImage
   444  		containerImages, err := imageListFunc()
   445  		if err != nil {
   446  			node.Status.Images = imagesOnNode
   447  			return fmt.Errorf("error getting image list: %v", err)
   448  		}
   449  		// we expect imageListFunc to return a sorted list, so we just need to truncate
   450  		if int(nodeStatusMaxImages) > -1 &&
   451  			int(nodeStatusMaxImages) < len(containerImages) {
   452  			containerImages = containerImages[0:nodeStatusMaxImages]
   453  		}
   454  
   455  		for _, image := range containerImages {
   456  			// make a copy to avoid modifying slice members of the image items in the list
   457  			names := append([]string{}, image.RepoDigests...)
   458  			names = append(names, image.RepoTags...)
   459  			// Report up to MaxNamesPerImageInNodeStatus names per image.
   460  			if len(names) > MaxNamesPerImageInNodeStatus {
   461  				names = names[0:MaxNamesPerImageInNodeStatus]
   462  			}
   463  			imagesOnNode = append(imagesOnNode, v1.ContainerImage{
   464  				Names:     names,
   465  				SizeBytes: image.Size,
   466  			})
   467  		}
   468  
   469  		node.Status.Images = imagesOnNode
   470  		return nil
   471  	}
   472  }
   473  
   474  // GoRuntime returns a Setter that sets GOOS and GOARCH on the node.
   475  func GoRuntime() Setter {
   476  	return func(ctx context.Context, node *v1.Node) error {
   477  		node.Status.NodeInfo.OperatingSystem = goruntime.GOOS
   478  		node.Status.NodeInfo.Architecture = goruntime.GOARCH
   479  		return nil
   480  	}
   481  }
   482  
   483  // RuntimeHandlers returns a Setter that sets RuntimeHandlers on the node.
   484  func RuntimeHandlers(fn func() []kubecontainer.RuntimeHandler) Setter {
   485  	return func(ctx context.Context, node *v1.Node) error {
   486  		if !utilfeature.DefaultFeatureGate.Enabled(features.RecursiveReadOnlyMounts) {
   487  			return nil
   488  		}
   489  		handlers := fn()
   490  		node.Status.RuntimeHandlers = make([]v1.NodeRuntimeHandler, len(handlers))
   491  		for i, h := range handlers {
   492  			node.Status.RuntimeHandlers[i] = v1.NodeRuntimeHandler{
   493  				Name: h.Name,
   494  				Features: &v1.NodeRuntimeHandlerFeatures{
   495  					RecursiveReadOnlyMounts: &h.SupportsRecursiveReadOnlyMounts,
   496  				},
   497  			}
   498  		}
   499  		return nil
   500  	}
   501  }
   502  
   503  // ReadyCondition returns a Setter that updates the v1.NodeReady condition on the node.
   504  func ReadyCondition(
   505  	nowFunc func() time.Time, // typically Kubelet.clock.Now
   506  	runtimeErrorsFunc func() error, // typically Kubelet.runtimeState.runtimeErrors
   507  	networkErrorsFunc func() error, // typically Kubelet.runtimeState.networkErrors
   508  	storageErrorsFunc func() error, // typically Kubelet.runtimeState.storageErrors
   509  	cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
   510  	nodeShutdownManagerErrorsFunc func() error, // typically kubelet.shutdownManager.errors.
   511  	recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
   512  	localStorageCapacityIsolation bool,
   513  ) Setter {
   514  	return func(ctx context.Context, node *v1.Node) error {
   515  		// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
   516  		// This is due to an issue with version skewed kubelet and master components.
   517  		// ref: https://github.com/kubernetes/kubernetes/issues/16961
   518  		currentTime := metav1.NewTime(nowFunc())
   519  		newNodeReadyCondition := v1.NodeCondition{
   520  			Type:              v1.NodeReady,
   521  			Status:            v1.ConditionTrue,
   522  			Reason:            "KubeletReady",
   523  			Message:           "kubelet is posting ready status",
   524  			LastHeartbeatTime: currentTime,
   525  		}
   526  		errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc()}
   527  		requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
   528  		if localStorageCapacityIsolation {
   529  			requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
   530  		}
   531  		missingCapacities := []string{}
   532  		for _, resource := range requiredCapacities {
   533  			if _, found := node.Status.Capacity[resource]; !found {
   534  				missingCapacities = append(missingCapacities, string(resource))
   535  			}
   536  		}
   537  		if len(missingCapacities) > 0 {
   538  			errs = append(errs, fmt.Errorf("missing node capacity for resources: %s", strings.Join(missingCapacities, ", ")))
   539  		}
   540  		if aggregatedErr := errors.NewAggregate(errs); aggregatedErr != nil {
   541  			newNodeReadyCondition = v1.NodeCondition{
   542  				Type:              v1.NodeReady,
   543  				Status:            v1.ConditionFalse,
   544  				Reason:            "KubeletNotReady",
   545  				Message:           aggregatedErr.Error(),
   546  				LastHeartbeatTime: currentTime,
   547  			}
   548  		}
   549  
   550  		// Record any soft requirements that were not met in the container manager.
   551  		status := cmStatusFunc()
   552  		if status.SoftRequirements != nil {
   553  			newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
   554  		}
   555  
   556  		readyConditionUpdated := false
   557  		needToRecordEvent := false
   558  		for i := range node.Status.Conditions {
   559  			if node.Status.Conditions[i].Type == v1.NodeReady {
   560  				if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
   561  					newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
   562  				} else {
   563  					newNodeReadyCondition.LastTransitionTime = currentTime
   564  					needToRecordEvent = true
   565  				}
   566  				node.Status.Conditions[i] = newNodeReadyCondition
   567  				readyConditionUpdated = true
   568  				break
   569  			}
   570  		}
   571  		if !readyConditionUpdated {
   572  			newNodeReadyCondition.LastTransitionTime = currentTime
   573  			node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
   574  		}
   575  		if needToRecordEvent {
   576  			if newNodeReadyCondition.Status == v1.ConditionTrue {
   577  				recordEventFunc(v1.EventTypeNormal, events.NodeReady)
   578  			} else {
   579  				recordEventFunc(v1.EventTypeNormal, events.NodeNotReady)
   580  				klog.InfoS("Node became not ready", "node", klog.KObj(node), "condition", newNodeReadyCondition)
   581  			}
   582  		}
   583  		return nil
   584  	}
   585  }
   586  
   587  // MemoryPressureCondition returns a Setter that updates the v1.NodeMemoryPressure condition on the node.
   588  func MemoryPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
   589  	pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderMemoryPressure
   590  	recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
   591  ) Setter {
   592  	return func(ctx context.Context, node *v1.Node) error {
   593  		currentTime := metav1.NewTime(nowFunc())
   594  		var condition *v1.NodeCondition
   595  
   596  		// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
   597  		for i := range node.Status.Conditions {
   598  			if node.Status.Conditions[i].Type == v1.NodeMemoryPressure {
   599  				condition = &node.Status.Conditions[i]
   600  			}
   601  		}
   602  
   603  		newCondition := false
   604  		// If the NodeMemoryPressure condition doesn't exist, create one
   605  		if condition == nil {
   606  			condition = &v1.NodeCondition{
   607  				Type:   v1.NodeMemoryPressure,
   608  				Status: v1.ConditionUnknown,
   609  			}
   610  			// cannot be appended to node.Status.Conditions here because it gets
   611  			// copied to the slice. So if we append to the slice here none of the
   612  			// updates we make below are reflected in the slice.
   613  			newCondition = true
   614  		}
   615  
   616  		// Update the heartbeat time
   617  		condition.LastHeartbeatTime = currentTime
   618  
   619  		// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
   620  		// created and as well as the case when the condition already exists. When a new condition
   621  		// is created its status is set to v1.ConditionUnknown which matches either
   622  		// condition.Status != v1.ConditionTrue or
   623  		// condition.Status != v1.ConditionFalse in the conditions below depending on whether
   624  		// the kubelet is under memory pressure or not.
   625  		if pressureFunc() {
   626  			if condition.Status != v1.ConditionTrue {
   627  				condition.Status = v1.ConditionTrue
   628  				condition.Reason = "KubeletHasInsufficientMemory"
   629  				condition.Message = "kubelet has insufficient memory available"
   630  				condition.LastTransitionTime = currentTime
   631  				recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientMemory")
   632  			}
   633  		} else if condition.Status != v1.ConditionFalse {
   634  			condition.Status = v1.ConditionFalse
   635  			condition.Reason = "KubeletHasSufficientMemory"
   636  			condition.Message = "kubelet has sufficient memory available"
   637  			condition.LastTransitionTime = currentTime
   638  			recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientMemory")
   639  		}
   640  
   641  		if newCondition {
   642  			node.Status.Conditions = append(node.Status.Conditions, *condition)
   643  		}
   644  		return nil
   645  	}
   646  }
   647  
   648  // PIDPressureCondition returns a Setter that updates the v1.NodePIDPressure condition on the node.
   649  func PIDPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
   650  	pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderPIDPressure
   651  	recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
   652  ) Setter {
   653  	return func(ctx context.Context, node *v1.Node) error {
   654  		currentTime := metav1.NewTime(nowFunc())
   655  		var condition *v1.NodeCondition
   656  
   657  		// Check if NodePIDPressure condition already exists and if it does, just pick it up for update.
   658  		for i := range node.Status.Conditions {
   659  			if node.Status.Conditions[i].Type == v1.NodePIDPressure {
   660  				condition = &node.Status.Conditions[i]
   661  			}
   662  		}
   663  
   664  		newCondition := false
   665  		// If the NodePIDPressure condition doesn't exist, create one
   666  		if condition == nil {
   667  			condition = &v1.NodeCondition{
   668  				Type:   v1.NodePIDPressure,
   669  				Status: v1.ConditionUnknown,
   670  			}
   671  			// cannot be appended to node.Status.Conditions here because it gets
   672  			// copied to the slice. So if we append to the slice here none of the
   673  			// updates we make below are reflected in the slice.
   674  			newCondition = true
   675  		}
   676  
   677  		// Update the heartbeat time
   678  		condition.LastHeartbeatTime = currentTime
   679  
   680  		// Note: The conditions below take care of the case when a new NodePIDPressure condition is
   681  		// created and as well as the case when the condition already exists. When a new condition
   682  		// is created its status is set to v1.ConditionUnknown which matches either
   683  		// condition.Status != v1.ConditionTrue or
   684  		// condition.Status != v1.ConditionFalse in the conditions below depending on whether
   685  		// the kubelet is under PID pressure or not.
   686  		if pressureFunc() {
   687  			if condition.Status != v1.ConditionTrue {
   688  				condition.Status = v1.ConditionTrue
   689  				condition.Reason = "KubeletHasInsufficientPID"
   690  				condition.Message = "kubelet has insufficient PID available"
   691  				condition.LastTransitionTime = currentTime
   692  				recordEventFunc(v1.EventTypeNormal, "NodeHasInsufficientPID")
   693  			}
   694  		} else if condition.Status != v1.ConditionFalse {
   695  			condition.Status = v1.ConditionFalse
   696  			condition.Reason = "KubeletHasSufficientPID"
   697  			condition.Message = "kubelet has sufficient PID available"
   698  			condition.LastTransitionTime = currentTime
   699  			recordEventFunc(v1.EventTypeNormal, "NodeHasSufficientPID")
   700  		}
   701  
   702  		if newCondition {
   703  			node.Status.Conditions = append(node.Status.Conditions, *condition)
   704  		}
   705  		return nil
   706  	}
   707  }
   708  
   709  // DiskPressureCondition returns a Setter that updates the v1.NodeDiskPressure condition on the node.
   710  func DiskPressureCondition(nowFunc func() time.Time, // typically Kubelet.clock.Now
   711  	pressureFunc func() bool, // typically Kubelet.evictionManager.IsUnderDiskPressure
   712  	recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
   713  ) Setter {
   714  	return func(ctx context.Context, node *v1.Node) error {
   715  		currentTime := metav1.NewTime(nowFunc())
   716  		var condition *v1.NodeCondition
   717  
   718  		// Check if NodeDiskPressure condition already exists and if it does, just pick it up for update.
   719  		for i := range node.Status.Conditions {
   720  			if node.Status.Conditions[i].Type == v1.NodeDiskPressure {
   721  				condition = &node.Status.Conditions[i]
   722  			}
   723  		}
   724  
   725  		newCondition := false
   726  		// If the NodeDiskPressure condition doesn't exist, create one
   727  		if condition == nil {
   728  			condition = &v1.NodeCondition{
   729  				Type:   v1.NodeDiskPressure,
   730  				Status: v1.ConditionUnknown,
   731  			}
   732  			// cannot be appended to node.Status.Conditions here because it gets
   733  			// copied to the slice. So if we append to the slice here none of the
   734  			// updates we make below are reflected in the slice.
   735  			newCondition = true
   736  		}
   737  
   738  		// Update the heartbeat time
   739  		condition.LastHeartbeatTime = currentTime
   740  
   741  		// Note: The conditions below take care of the case when a new NodeDiskPressure condition is
   742  		// created and as well as the case when the condition already exists. When a new condition
   743  		// is created its status is set to v1.ConditionUnknown which matches either
   744  		// condition.Status != v1.ConditionTrue or
   745  		// condition.Status != v1.ConditionFalse in the conditions below depending on whether
   746  		// the kubelet is under disk pressure or not.
   747  		if pressureFunc() {
   748  			if condition.Status != v1.ConditionTrue {
   749  				condition.Status = v1.ConditionTrue
   750  				condition.Reason = "KubeletHasDiskPressure"
   751  				condition.Message = "kubelet has disk pressure"
   752  				condition.LastTransitionTime = currentTime
   753  				recordEventFunc(v1.EventTypeNormal, "NodeHasDiskPressure")
   754  			}
   755  		} else if condition.Status != v1.ConditionFalse {
   756  			condition.Status = v1.ConditionFalse
   757  			condition.Reason = "KubeletHasNoDiskPressure"
   758  			condition.Message = "kubelet has no disk pressure"
   759  			condition.LastTransitionTime = currentTime
   760  			recordEventFunc(v1.EventTypeNormal, "NodeHasNoDiskPressure")
   761  		}
   762  
   763  		if newCondition {
   764  			node.Status.Conditions = append(node.Status.Conditions, *condition)
   765  		}
   766  		return nil
   767  	}
   768  }
   769  
   770  // VolumesInUse returns a Setter that updates the volumes in use on the node.
   771  func VolumesInUse(syncedFunc func() bool, // typically Kubelet.volumeManager.ReconcilerStatesHasBeenSynced
   772  	volumesInUseFunc func() []v1.UniqueVolumeName, // typically Kubelet.volumeManager.GetVolumesInUse
   773  ) Setter {
   774  	return func(ctx context.Context, node *v1.Node) error {
   775  		// Make sure to only update node status after reconciler starts syncing up states
   776  		if syncedFunc() {
   777  			node.Status.VolumesInUse = volumesInUseFunc()
   778  		}
   779  		return nil
   780  	}
   781  }
   782  
   783  // VolumeLimits returns a Setter that updates the volume limits on the node.
   784  func VolumeLimits(volumePluginListFunc func() []volume.VolumePluginWithAttachLimits, // typically Kubelet.volumePluginMgr.ListVolumePluginWithLimits
   785  ) Setter {
   786  	return func(ctx context.Context, node *v1.Node) error {
   787  		if node.Status.Capacity == nil {
   788  			node.Status.Capacity = v1.ResourceList{}
   789  		}
   790  		if node.Status.Allocatable == nil {
   791  			node.Status.Allocatable = v1.ResourceList{}
   792  		}
   793  
   794  		pluginWithLimits := volumePluginListFunc()
   795  		for _, volumePlugin := range pluginWithLimits {
   796  			attachLimits, err := volumePlugin.GetVolumeLimits()
   797  			if err != nil {
   798  				klog.V(4).InfoS("Skipping volume limits for volume plugin", "plugin", volumePlugin.GetPluginName())
   799  				continue
   800  			}
   801  			for limitKey, value := range attachLimits {
   802  				node.Status.Capacity[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
   803  				node.Status.Allocatable[v1.ResourceName(limitKey)] = *resource.NewQuantity(value, resource.DecimalSI)
   804  			}
   805  		}
   806  		return nil
   807  	}
   808  }
   809  

View as plain text