...

Source file src/k8s.io/kubernetes/pkg/kubelet/kubelet_node_status.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	goruntime "runtime"
    24  	"sort"
    25  	"strings"
    26  	"time"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	"k8s.io/apimachinery/pkg/api/resource"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	cloudprovider "k8s.io/cloud-provider"
    36  	cloudproviderapi "k8s.io/cloud-provider/api"
    37  	nodeutil "k8s.io/component-helpers/node/util"
    38  	"k8s.io/klog/v2"
    39  	kubeletapis "k8s.io/kubelet/pkg/apis"
    40  	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    41  	"k8s.io/kubernetes/pkg/kubelet/events"
    42  	"k8s.io/kubernetes/pkg/kubelet/nodestatus"
    43  	"k8s.io/kubernetes/pkg/kubelet/util"
    44  	taintutil "k8s.io/kubernetes/pkg/util/taints"
    45  	volutil "k8s.io/kubernetes/pkg/volume/util"
    46  )
    47  
    48  // registerWithAPIServer registers the node with the cluster master. It is safe
    49  // to call multiple times, but not concurrently (kl.registrationCompleted is
    50  // not locked).
    51  func (kl *Kubelet) registerWithAPIServer() {
    52  	if kl.registrationCompleted {
    53  		return
    54  	}
    55  
    56  	kl.nodeStartupLatencyTracker.RecordAttemptRegisterNode()
    57  
    58  	step := 100 * time.Millisecond
    59  
    60  	for {
    61  		time.Sleep(step)
    62  		step = step * 2
    63  		if step >= 7*time.Second {
    64  			step = 7 * time.Second
    65  		}
    66  
    67  		node, err := kl.initialNode(context.TODO())
    68  		if err != nil {
    69  			klog.ErrorS(err, "Unable to construct v1.Node object for kubelet")
    70  			continue
    71  		}
    72  
    73  		klog.InfoS("Attempting to register node", "node", klog.KObj(node))
    74  		registered := kl.tryRegisterWithAPIServer(node)
    75  		if registered {
    76  			klog.InfoS("Successfully registered node", "node", klog.KObj(node))
    77  			kl.registrationCompleted = true
    78  			return
    79  		}
    80  	}
    81  }
    82  
    83  // tryRegisterWithAPIServer makes an attempt to register the given node with
    84  // the API server, returning a boolean indicating whether the attempt was
    85  // successful.  If a node with the same name already exists, it reconciles the
    86  // value of the annotation for controller-managed attach-detach of attachable
    87  // persistent volumes for the node.
    88  func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
    89  	_, err := kl.kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
    90  	if err == nil {
    91  		kl.nodeStartupLatencyTracker.RecordRegisteredNewNode()
    92  		return true
    93  	}
    94  
    95  	if !apierrors.IsAlreadyExists(err) {
    96  		klog.ErrorS(err, "Unable to register node with API server", "node", klog.KObj(node))
    97  		return false
    98  	}
    99  
   100  	existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), metav1.GetOptions{})
   101  	if err != nil {
   102  		klog.ErrorS(err, "Unable to register node with API server, error getting existing node", "node", klog.KObj(node))
   103  		return false
   104  	}
   105  	if existingNode == nil {
   106  		klog.InfoS("Unable to register node with API server, no node instance returned", "node", klog.KObj(node))
   107  		return false
   108  	}
   109  
   110  	originalNode := existingNode.DeepCopy()
   111  
   112  	klog.InfoS("Node was previously registered", "node", klog.KObj(node))
   113  
   114  	// Edge case: the node was previously registered; reconcile
   115  	// the value of the controller-managed attach-detach
   116  	// annotation.
   117  	requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
   118  	requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
   119  	requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
   120  	requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
   121  	if requiresUpdate {
   122  		if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
   123  			klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
   124  			return false
   125  		}
   126  	}
   127  
   128  	return true
   129  }
   130  
   131  // reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
   132  func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
   133  	requiresUpdate := updateDefaultResources(initialNode, existingNode)
   134  	supportedHugePageResources := sets.String{}
   135  
   136  	for resourceName := range initialNode.Status.Capacity {
   137  		if !v1helper.IsHugePageResourceName(resourceName) {
   138  			continue
   139  		}
   140  		supportedHugePageResources.Insert(string(resourceName))
   141  
   142  		initialCapacity := initialNode.Status.Capacity[resourceName]
   143  		initialAllocatable := initialNode.Status.Allocatable[resourceName]
   144  
   145  		capacity, resourceIsSupported := existingNode.Status.Capacity[resourceName]
   146  		allocatable := existingNode.Status.Allocatable[resourceName]
   147  
   148  		// Add or update capacity if it the size was previously unsupported or has changed
   149  		if !resourceIsSupported || capacity.Cmp(initialCapacity) != 0 {
   150  			existingNode.Status.Capacity[resourceName] = initialCapacity.DeepCopy()
   151  			requiresUpdate = true
   152  		}
   153  
   154  		// Add or update allocatable if it the size was previously unsupported or has changed
   155  		if !resourceIsSupported || allocatable.Cmp(initialAllocatable) != 0 {
   156  			existingNode.Status.Allocatable[resourceName] = initialAllocatable.DeepCopy()
   157  			requiresUpdate = true
   158  		}
   159  
   160  	}
   161  
   162  	for resourceName := range existingNode.Status.Capacity {
   163  		if !v1helper.IsHugePageResourceName(resourceName) {
   164  			continue
   165  		}
   166  
   167  		// If huge page size no longer is supported, we remove it from the node
   168  		if !supportedHugePageResources.Has(string(resourceName)) {
   169  			delete(existingNode.Status.Capacity, resourceName)
   170  			delete(existingNode.Status.Allocatable, resourceName)
   171  			klog.InfoS("Removing huge page resource which is no longer supported", "resourceName", resourceName)
   172  			requiresUpdate = true
   173  		}
   174  	}
   175  	return requiresUpdate
   176  }
   177  
   178  // Zeros out extended resource capacity during reconciliation.
   179  func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
   180  	requiresUpdate := updateDefaultResources(initialNode, node)
   181  	// Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available
   182  	if kl.containerManager.ShouldResetExtendedResourceCapacity() {
   183  		for k := range node.Status.Capacity {
   184  			if v1helper.IsExtendedResourceName(k) {
   185  				klog.InfoS("Zero out resource capacity in existing node", "resourceName", k, "node", klog.KObj(node))
   186  				node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
   187  				node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
   188  				requiresUpdate = true
   189  			}
   190  		}
   191  	}
   192  	return requiresUpdate
   193  }
   194  
   195  // updateDefaultResources will set the default resources on the existing node according to the initial node
   196  func updateDefaultResources(initialNode, existingNode *v1.Node) bool {
   197  	requiresUpdate := false
   198  	if existingNode.Status.Capacity == nil {
   199  		if initialNode.Status.Capacity != nil {
   200  			existingNode.Status.Capacity = initialNode.Status.Capacity.DeepCopy()
   201  			requiresUpdate = true
   202  		} else {
   203  			existingNode.Status.Capacity = make(map[v1.ResourceName]resource.Quantity)
   204  		}
   205  	}
   206  
   207  	if existingNode.Status.Allocatable == nil {
   208  		if initialNode.Status.Allocatable != nil {
   209  			existingNode.Status.Allocatable = initialNode.Status.Allocatable.DeepCopy()
   210  			requiresUpdate = true
   211  		} else {
   212  			existingNode.Status.Allocatable = make(map[v1.ResourceName]resource.Quantity)
   213  		}
   214  	}
   215  	return requiresUpdate
   216  }
   217  
   218  // updateDefaultLabels will set the default labels on the node
   219  func (kl *Kubelet) updateDefaultLabels(initialNode, existingNode *v1.Node) bool {
   220  	defaultLabels := []string{
   221  		v1.LabelHostname,
   222  		v1.LabelTopologyZone,
   223  		v1.LabelTopologyRegion,
   224  		v1.LabelFailureDomainBetaZone,
   225  		v1.LabelFailureDomainBetaRegion,
   226  		v1.LabelInstanceTypeStable,
   227  		v1.LabelInstanceType,
   228  		v1.LabelOSStable,
   229  		v1.LabelArchStable,
   230  		v1.LabelWindowsBuild,
   231  		kubeletapis.LabelOS,
   232  		kubeletapis.LabelArch,
   233  	}
   234  
   235  	needsUpdate := false
   236  	if existingNode.Labels == nil {
   237  		existingNode.Labels = make(map[string]string)
   238  	}
   239  	//Set default labels but make sure to not set labels with empty values
   240  	for _, label := range defaultLabels {
   241  		if _, hasInitialValue := initialNode.Labels[label]; !hasInitialValue {
   242  			continue
   243  		}
   244  
   245  		if existingNode.Labels[label] != initialNode.Labels[label] {
   246  			existingNode.Labels[label] = initialNode.Labels[label]
   247  			needsUpdate = true
   248  		}
   249  
   250  		if existingNode.Labels[label] == "" {
   251  			delete(existingNode.Labels, label)
   252  		}
   253  	}
   254  
   255  	return needsUpdate
   256  }
   257  
   258  // reconcileCMADAnnotationWithExistingNode reconciles the controller-managed
   259  // attach-detach annotation on a new node and the existing node, returning
   260  // whether the existing node must be updated.
   261  func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *v1.Node) bool {
   262  	var (
   263  		existingCMAAnnotation    = existingNode.Annotations[volutil.ControllerManagedAttachAnnotation]
   264  		newCMAAnnotation, newSet = node.Annotations[volutil.ControllerManagedAttachAnnotation]
   265  	)
   266  
   267  	if newCMAAnnotation == existingCMAAnnotation {
   268  		return false
   269  	}
   270  
   271  	// If the just-constructed node and the existing node do
   272  	// not have the same value, update the existing node with
   273  	// the correct value of the annotation.
   274  	if !newSet {
   275  		klog.InfoS("Controller attach-detach setting changed to false; updating existing Node")
   276  		delete(existingNode.Annotations, volutil.ControllerManagedAttachAnnotation)
   277  	} else {
   278  		klog.InfoS("Controller attach-detach setting changed to true; updating existing Node")
   279  		if existingNode.Annotations == nil {
   280  			existingNode.Annotations = make(map[string]string)
   281  		}
   282  		existingNode.Annotations[volutil.ControllerManagedAttachAnnotation] = newCMAAnnotation
   283  	}
   284  
   285  	return true
   286  }
   287  
   288  // initialNode constructs the initial v1.Node for this Kubelet, incorporating node
   289  // labels, information from the cloud provider, and Kubelet configuration.
   290  func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
   291  	node := &v1.Node{
   292  		ObjectMeta: metav1.ObjectMeta{
   293  			Name: string(kl.nodeName),
   294  			Labels: map[string]string{
   295  				v1.LabelHostname:      kl.hostname,
   296  				v1.LabelOSStable:      goruntime.GOOS,
   297  				v1.LabelArchStable:    goruntime.GOARCH,
   298  				kubeletapis.LabelOS:   goruntime.GOOS,
   299  				kubeletapis.LabelArch: goruntime.GOARCH,
   300  			},
   301  		},
   302  		Spec: v1.NodeSpec{
   303  			Unschedulable: !kl.registerSchedulable,
   304  		},
   305  	}
   306  	osLabels, err := getOSSpecificLabels()
   307  	if err != nil {
   308  		return nil, err
   309  	}
   310  	for label, value := range osLabels {
   311  		node.Labels[label] = value
   312  	}
   313  
   314  	nodeTaints := make([]v1.Taint, len(kl.registerWithTaints))
   315  	copy(nodeTaints, kl.registerWithTaints)
   316  	unschedulableTaint := v1.Taint{
   317  		Key:    v1.TaintNodeUnschedulable,
   318  		Effect: v1.TaintEffectNoSchedule,
   319  	}
   320  
   321  	// Taint node with TaintNodeUnschedulable when initializing
   322  	// node to avoid race condition; refer to #63897 for more detail.
   323  	if node.Spec.Unschedulable &&
   324  		!taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
   325  		nodeTaints = append(nodeTaints, unschedulableTaint)
   326  	}
   327  
   328  	if kl.externalCloudProvider {
   329  		taint := v1.Taint{
   330  			Key:    cloudproviderapi.TaintExternalCloudProvider,
   331  			Value:  "true",
   332  			Effect: v1.TaintEffectNoSchedule,
   333  		}
   334  
   335  		nodeTaints = append(nodeTaints, taint)
   336  	}
   337  	if len(nodeTaints) > 0 {
   338  		node.Spec.Taints = nodeTaints
   339  	}
   340  	// Initially, set NodeNetworkUnavailable to true.
   341  	if kl.providerRequiresNetworkingConfiguration() {
   342  		node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
   343  			Type:               v1.NodeNetworkUnavailable,
   344  			Status:             v1.ConditionTrue,
   345  			Reason:             "NoRouteCreated",
   346  			Message:            "Node created without a route",
   347  			LastTransitionTime: metav1.NewTime(kl.clock.Now()),
   348  		})
   349  	}
   350  
   351  	if kl.enableControllerAttachDetach {
   352  		if node.Annotations == nil {
   353  			node.Annotations = make(map[string]string)
   354  		}
   355  
   356  		klog.V(2).InfoS("Setting node annotation to enable volume controller attach/detach")
   357  		node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
   358  	} else {
   359  		klog.V(2).InfoS("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
   360  	}
   361  
   362  	if kl.keepTerminatedPodVolumes {
   363  		if node.Annotations == nil {
   364  			node.Annotations = make(map[string]string)
   365  		}
   366  		klog.V(2).InfoS("Setting node annotation to keep pod volumes of terminated pods attached to the node")
   367  		node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
   368  	}
   369  
   370  	// @question: should this be place after the call to the cloud provider? which also applies labels
   371  	for k, v := range kl.nodeLabels {
   372  		if cv, found := node.ObjectMeta.Labels[k]; found {
   373  			klog.InfoS("the node label will overwrite default setting", "labelKey", k, "labelValue", v, "default", cv)
   374  		}
   375  		node.ObjectMeta.Labels[k] = v
   376  	}
   377  
   378  	if kl.providerID != "" {
   379  		node.Spec.ProviderID = kl.providerID
   380  	}
   381  
   382  	if kl.cloud != nil {
   383  		instances, ok := kl.cloud.Instances()
   384  		if !ok {
   385  			return nil, fmt.Errorf("failed to get instances from cloud provider")
   386  		}
   387  
   388  		// TODO: We can't assume that the node has credentials to talk to the
   389  		// cloudprovider from arbitrary nodes. At most, we should talk to a
   390  		// local metadata server here.
   391  		var err error
   392  		if node.Spec.ProviderID == "" {
   393  			node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(ctx, kl.cloud, kl.nodeName)
   394  			if err != nil {
   395  				return nil, err
   396  			}
   397  		}
   398  
   399  		instanceType, err := instances.InstanceType(ctx, kl.nodeName)
   400  		if err != nil {
   401  			return nil, err
   402  		}
   403  		if instanceType != "" {
   404  			klog.InfoS("Adding label from cloud provider", "labelKey", v1.LabelInstanceType, "labelValue", instanceType)
   405  			node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
   406  			klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelInstanceTypeStable, "labelValue", instanceType)
   407  			node.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType
   408  		}
   409  		// If the cloud has zone information, label the node with the zone information
   410  		zones, ok := kl.cloud.Zones()
   411  		if ok {
   412  			zone, err := zones.GetZone(ctx)
   413  			if err != nil {
   414  				return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
   415  			}
   416  			if zone.FailureDomain != "" {
   417  				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaZone, "labelValue", zone.FailureDomain)
   418  				node.ObjectMeta.Labels[v1.LabelFailureDomainBetaZone] = zone.FailureDomain
   419  				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyZone, "labelValue", zone.FailureDomain)
   420  				node.ObjectMeta.Labels[v1.LabelTopologyZone] = zone.FailureDomain
   421  			}
   422  			if zone.Region != "" {
   423  				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaRegion, "labelValue", zone.Region)
   424  				node.ObjectMeta.Labels[v1.LabelFailureDomainBetaRegion] = zone.Region
   425  				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyRegion, "labelValue", zone.Region)
   426  				node.ObjectMeta.Labels[v1.LabelTopologyRegion] = zone.Region
   427  			}
   428  		}
   429  	}
   430  
   431  	kl.setNodeStatus(ctx, node)
   432  
   433  	return node, nil
   434  }
   435  
   436  // fastNodeStatusUpdate is a "lightweight" version of syncNodeStatus which doesn't hit the
   437  // apiserver except for the final run, to be called by fastStatusUpdateOnce in each loop.
   438  // It holds the same lock as syncNodeStatus and is thread-safe when called concurrently with
   439  // syncNodeStatus. Its return value indicates whether the loop running it should exit
   440  // (final run), and it also sets kl.containerRuntimeReadyExpected.
   441  func (kl *Kubelet) fastNodeStatusUpdate(ctx context.Context, timeout bool) (completed bool) {
   442  	kl.syncNodeStatusMux.Lock()
   443  	defer func() {
   444  		kl.syncNodeStatusMux.Unlock()
   445  
   446  		if completed {
   447  			// containerRuntimeReadyExpected is read by updateRuntimeUp().
   448  			// Not going for a more granular mutex as this path runs only once.
   449  			kl.updateRuntimeMux.Lock()
   450  			defer kl.updateRuntimeMux.Unlock()
   451  			kl.containerRuntimeReadyExpected = true
   452  		}
   453  	}()
   454  
   455  	if timeout {
   456  		klog.ErrorS(nil, "Node not becoming ready in time after startup")
   457  		return true
   458  	}
   459  
   460  	originalNode, err := kl.GetNode()
   461  	if err != nil {
   462  		klog.ErrorS(err, "Error getting the current node from lister")
   463  		return false
   464  	}
   465  
   466  	readyIdx, originalNodeReady := nodeutil.GetNodeCondition(&originalNode.Status, v1.NodeReady)
   467  	if readyIdx == -1 {
   468  		klog.ErrorS(nil, "Node does not have NodeReady condition", "originalNode", originalNode)
   469  		return false
   470  	}
   471  
   472  	if originalNodeReady.Status == v1.ConditionTrue {
   473  		return true
   474  	}
   475  
   476  	// This is in addition to the regular syncNodeStatus logic so we can get the container runtime status earlier.
   477  	// This function itself has a mutex and it doesn't recursively call fastNodeStatusUpdate or syncNodeStatus.
   478  	kl.updateRuntimeUp()
   479  
   480  	node, changed := kl.updateNode(ctx, originalNode)
   481  
   482  	if !changed {
   483  		// We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus().
   484  		return false
   485  	}
   486  
   487  	readyIdx, nodeReady := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
   488  	if readyIdx == -1 {
   489  		klog.ErrorS(nil, "Node does not have NodeReady condition", "node", node)
   490  		return false
   491  	}
   492  
   493  	if nodeReady.Status == v1.ConditionFalse {
   494  		return false
   495  	}
   496  
   497  	klog.InfoS("Fast updating node status as it just became ready")
   498  	if _, err := kl.patchNodeStatus(originalNode, node); err != nil {
   499  		// The originalNode is probably stale, but we know that the current state of kubelet would turn
   500  		// the node to be ready. Retry using syncNodeStatus() which fetches from the apiserver.
   501  		klog.ErrorS(err, "Error updating node status, will retry with syncNodeStatus")
   502  
   503  		// The reversed kl.syncNodeStatusMux.Unlock/Lock() below to allow kl.syncNodeStatus() execution.
   504  		kl.syncNodeStatusMux.Unlock()
   505  		kl.syncNodeStatus()
   506  		// This lock action is unnecessary if we add a flag to check in the defer before unlocking it,
   507  		// but having it here makes the logic a bit easier to read.
   508  		kl.syncNodeStatusMux.Lock()
   509  	}
   510  
   511  	// We don't do markVolumesFromNode(node) here and leave it to the regular syncNodeStatus().
   512  	return true
   513  }
   514  
   515  // syncNodeStatus should be called periodically from a goroutine.
   516  // It synchronizes node status to master if there is any change or enough time
   517  // passed from the last sync, registering the kubelet first if necessary.
   518  func (kl *Kubelet) syncNodeStatus() {
   519  	kl.syncNodeStatusMux.Lock()
   520  	defer kl.syncNodeStatusMux.Unlock()
   521  	ctx := context.Background()
   522  
   523  	if kl.kubeClient == nil || kl.heartbeatClient == nil {
   524  		return
   525  	}
   526  	if kl.registerNode {
   527  		// This will exit immediately if it doesn't need to do anything.
   528  		kl.registerWithAPIServer()
   529  	}
   530  	if err := kl.updateNodeStatus(ctx); err != nil {
   531  		klog.ErrorS(err, "Unable to update node status")
   532  	}
   533  }
   534  
   535  // updateNodeStatus updates node status to master with retries if there is any
   536  // change or enough time passed from the last sync.
   537  func (kl *Kubelet) updateNodeStatus(ctx context.Context) error {
   538  	klog.V(5).InfoS("Updating node status")
   539  	for i := 0; i < nodeStatusUpdateRetry; i++ {
   540  		if err := kl.tryUpdateNodeStatus(ctx, i); err != nil {
   541  			if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
   542  				kl.onRepeatedHeartbeatFailure()
   543  			}
   544  			klog.ErrorS(err, "Error updating node status, will retry")
   545  		} else {
   546  			return nil
   547  		}
   548  	}
   549  	return fmt.Errorf("update node status exceeds retry count")
   550  }
   551  
   552  // tryUpdateNodeStatus tries to update node status to master if there is any
   553  // change or enough time passed from the last sync.
   554  func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error {
   555  	// In large clusters, GET and PUT operations on Node objects coming
   556  	// from here are the majority of load on apiserver and etcd.
   557  	// To reduce the load on etcd, we are serving GET operations from
   558  	// apiserver cache (the data might be slightly delayed but it doesn't
   559  	// seem to cause more conflict - the delays are pretty small).
   560  	// If it result in a conflict, all retries are served directly from etcd.
   561  	opts := metav1.GetOptions{}
   562  	if tryNumber == 0 {
   563  		util.FromApiserverCache(&opts)
   564  	}
   565  	originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
   566  	if err != nil {
   567  		return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
   568  	}
   569  	if originalNode == nil {
   570  		return fmt.Errorf("nil %q node object", kl.nodeName)
   571  	}
   572  
   573  	node, changed := kl.updateNode(ctx, originalNode)
   574  	shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency
   575  
   576  	if !shouldPatchNodeStatus {
   577  		kl.markVolumesFromNode(node)
   578  		return nil
   579  	}
   580  
   581  	updatedNode, err := kl.patchNodeStatus(originalNode, node)
   582  	if err == nil {
   583  		kl.markVolumesFromNode(updatedNode)
   584  	}
   585  	return err
   586  }
   587  
   588  // updateNode creates a copy of originalNode and runs update logic on it.
   589  // It returns the updated node object and a bool indicating if anything has been changed.
   590  func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
   591  	node := originalNode.DeepCopy()
   592  
   593  	podCIDRChanged := false
   594  	if len(node.Spec.PodCIDRs) != 0 {
   595  		// Pod CIDR could have been updated before, so we cannot rely on
   596  		// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
   597  		// actually changed.
   598  		var err error
   599  		podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
   600  		if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil {
   601  			klog.ErrorS(err, "Error updating pod CIDR")
   602  		}
   603  	}
   604  
   605  	areRequiredLabelsNotPresent := false
   606  	osName, osLabelExists := node.Labels[v1.LabelOSStable]
   607  	if !osLabelExists || osName != goruntime.GOOS {
   608  		if len(node.Labels) == 0 {
   609  			node.Labels = make(map[string]string)
   610  		}
   611  		node.Labels[v1.LabelOSStable] = goruntime.GOOS
   612  		areRequiredLabelsNotPresent = true
   613  	}
   614  	// Set the arch if there is a mismatch
   615  	arch, archLabelExists := node.Labels[v1.LabelArchStable]
   616  	if !archLabelExists || arch != goruntime.GOARCH {
   617  		if len(node.Labels) == 0 {
   618  			node.Labels = make(map[string]string)
   619  		}
   620  		node.Labels[v1.LabelArchStable] = goruntime.GOARCH
   621  		areRequiredLabelsNotPresent = true
   622  	}
   623  
   624  	kl.setNodeStatus(ctx, node)
   625  
   626  	changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent
   627  	return node, changed
   628  }
   629  
   630  // patchNodeStatus patches node on the API server based on originalNode.
   631  // It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful.
   632  func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) {
   633  	// Patch the current status on the API server
   634  	updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
   635  	if err != nil {
   636  		return nil, err
   637  	}
   638  	kl.lastStatusReportTime = kl.clock.Now()
   639  	kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
   640  
   641  	readyIdx, readyCondition := nodeutil.GetNodeCondition(&updatedNode.Status, v1.NodeReady)
   642  	if readyIdx >= 0 && readyCondition.Status == v1.ConditionTrue {
   643  		kl.nodeStartupLatencyTracker.RecordNodeReady()
   644  	}
   645  
   646  	return updatedNode, nil
   647  }
   648  
   649  // markVolumesFromNode updates volumeManager with VolumesInUse status from node.
   650  //
   651  // In the case of node status update being unnecessary, call with the fetched node.
   652  // We must mark the volumes as ReportedInUse in volume manager's dsw even
   653  // if no changes were made to the node status (no volumes were added or removed
   654  // from the VolumesInUse list).
   655  //
   656  // The reason is that on a kubelet restart, the volume manager's dsw is
   657  // repopulated and the volume ReportedInUse is initialized to false, while the
   658  // VolumesInUse list from the Node object still contains the state from the
   659  // previous kubelet instantiation.
   660  //
   661  // Once the volumes are added to the dsw, the ReportedInUse field needs to be
   662  // synced from the VolumesInUse list in the Node.Status.
   663  //
   664  // The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
   665  // because it does not have access to the Node object.
   666  // This also cannot be populated on node status manager init because the volume
   667  // may not have been added to dsw at that time.
   668  //
   669  // Or, after a successful node status update, call with updatedNode returned from
   670  // the patch call, to mark the volumeInUse as reportedInUse to indicate
   671  // those volumes are already updated in the node's status
   672  func (kl *Kubelet) markVolumesFromNode(node *v1.Node) {
   673  	kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
   674  }
   675  
   676  // recordNodeStatusEvent records an event of the given type with the given
   677  // message for the node.
   678  func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
   679  	klog.V(2).InfoS("Recording event message for node", "node", klog.KRef("", string(kl.nodeName)), "event", event)
   680  	kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event)
   681  }
   682  
   683  // recordEvent records an event for this node, the Kubelet's nodeRef is passed to the recorder
   684  func (kl *Kubelet) recordEvent(eventType, event, message string) {
   685  	kl.recorder.Eventf(kl.nodeRef, eventType, event, message)
   686  }
   687  
   688  // record if node schedulable change.
   689  func (kl *Kubelet) recordNodeSchedulableEvent(ctx context.Context, node *v1.Node) error {
   690  	kl.lastNodeUnschedulableLock.Lock()
   691  	defer kl.lastNodeUnschedulableLock.Unlock()
   692  	if kl.lastNodeUnschedulable != node.Spec.Unschedulable {
   693  		if node.Spec.Unschedulable {
   694  			kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotSchedulable)
   695  		} else {
   696  			kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeSchedulable)
   697  		}
   698  		kl.lastNodeUnschedulable = node.Spec.Unschedulable
   699  	}
   700  	return nil
   701  }
   702  
   703  // setNodeStatus fills in the Status fields of the given Node, overwriting
   704  // any fields that are currently set.
   705  // TODO(madhusudancs): Simplify the logic for setting node conditions and
   706  // refactor the node status condition code out to a different file.
   707  func (kl *Kubelet) setNodeStatus(ctx context.Context, node *v1.Node) {
   708  	for i, f := range kl.setNodeStatusFuncs {
   709  		klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
   710  		if err := f(ctx, node); err != nil {
   711  			klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
   712  		}
   713  	}
   714  }
   715  
   716  func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) {
   717  	kl.lastObservedNodeAddressesMux.Lock()
   718  	defer kl.lastObservedNodeAddressesMux.Unlock()
   719  	kl.lastObservedNodeAddresses = addresses
   720  }
   721  func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
   722  	kl.lastObservedNodeAddressesMux.RLock()
   723  	defer kl.lastObservedNodeAddressesMux.RUnlock()
   724  	return kl.lastObservedNodeAddresses
   725  }
   726  
   727  // defaultNodeStatusFuncs is a factory that generates the default set of
   728  // setNodeStatus funcs
   729  func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) error {
   730  	// if cloud is not nil, we expect the cloud resource sync manager to exist
   731  	var nodeAddressesFunc func() ([]v1.NodeAddress, error)
   732  	if kl.cloud != nil {
   733  		nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
   734  	}
   735  	var setters []func(ctx context.Context, n *v1.Node) error
   736  	setters = append(setters,
   737  		nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
   738  		nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
   739  			kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent, kl.supportLocalStorageCapacityIsolation()),
   740  		nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
   741  		nodestatus.DaemonEndpoints(kl.daemonEndpoints),
   742  		nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
   743  		nodestatus.GoRuntime(),
   744  		nodestatus.RuntimeHandlers(kl.runtimeState.runtimeHandlers),
   745  	)
   746  	// Volume limits
   747  	setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
   748  
   749  	setters = append(setters,
   750  		nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
   751  		nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
   752  		nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
   753  		nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors,
   754  			kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent, kl.supportLocalStorageCapacityIsolation()),
   755  		nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
   756  		// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
   757  		// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
   758  		// these side-effects by decoupling the decisions to send events and partial status recording
   759  		// from the Node setters.
   760  		kl.recordNodeSchedulableEvent,
   761  	)
   762  	return setters
   763  }
   764  
   765  // Validate given node IP belongs to the current host
   766  func validateNodeIP(nodeIP net.IP) error {
   767  	// Honor IP limitations set in setNodeStatus()
   768  	if nodeIP.To4() == nil && nodeIP.To16() == nil {
   769  		return fmt.Errorf("nodeIP must be a valid IP address")
   770  	}
   771  	if nodeIP.IsLoopback() {
   772  		return fmt.Errorf("nodeIP can't be loopback address")
   773  	}
   774  	if nodeIP.IsMulticast() {
   775  		return fmt.Errorf("nodeIP can't be a multicast address")
   776  	}
   777  	if nodeIP.IsLinkLocalUnicast() {
   778  		return fmt.Errorf("nodeIP can't be a link-local unicast address")
   779  	}
   780  	if nodeIP.IsUnspecified() {
   781  		return fmt.Errorf("nodeIP can't be an all zeros address")
   782  	}
   783  
   784  	addrs, err := net.InterfaceAddrs()
   785  	if err != nil {
   786  		return err
   787  	}
   788  	for _, addr := range addrs {
   789  		var ip net.IP
   790  		switch v := addr.(type) {
   791  		case *net.IPNet:
   792  			ip = v.IP
   793  		case *net.IPAddr:
   794  			ip = v.IP
   795  		}
   796  		if ip != nil && ip.Equal(nodeIP) {
   797  			return nil
   798  		}
   799  	}
   800  	return fmt.Errorf("node IP: %q not found in the host's network interfaces", nodeIP.String())
   801  }
   802  
   803  // nodeStatusHasChanged compares the original node and current node's status and
   804  // returns true if any change happens. The heartbeat timestamp is ignored.
   805  func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool {
   806  	if originalStatus == nil && status == nil {
   807  		return false
   808  	}
   809  	if originalStatus == nil || status == nil {
   810  		return true
   811  	}
   812  
   813  	// Compare node conditions here because we need to ignore the heartbeat timestamp.
   814  	if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) {
   815  		return true
   816  	}
   817  
   818  	// Compare other fields of NodeStatus.
   819  	originalStatusCopy := originalStatus.DeepCopy()
   820  	statusCopy := status.DeepCopy()
   821  	originalStatusCopy.Conditions = nil
   822  	statusCopy.Conditions = nil
   823  	return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy)
   824  }
   825  
   826  // nodeConditionsHaveChanged compares the original node and current node's
   827  // conditions and returns true if any change happens. The heartbeat timestamp is
   828  // ignored.
   829  func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool {
   830  	if len(originalConditions) != len(conditions) {
   831  		return true
   832  	}
   833  
   834  	originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions))
   835  	originalConditionsCopy = append(originalConditionsCopy, originalConditions...)
   836  	conditionsCopy := make([]v1.NodeCondition, 0, len(conditions))
   837  	conditionsCopy = append(conditionsCopy, conditions...)
   838  
   839  	sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type })
   840  	sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type })
   841  
   842  	replacedheartbeatTime := metav1.Time{}
   843  	for i := range conditionsCopy {
   844  		originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
   845  		conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
   846  		if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) {
   847  			return true
   848  		}
   849  	}
   850  	return false
   851  }
   852  

View as plain text