...

Source file src/k8s.io/kubernetes/pkg/controller/util/node/controller_utils.go

Documentation: k8s.io/kubernetes/pkg/controller/util/node

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package node
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    27  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    28  	"k8s.io/client-go/tools/cache"
    29  	"k8s.io/client-go/tools/record"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	appsv1listers "k8s.io/client-go/listers/apps/v1"
    34  	utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
    35  	"k8s.io/kubernetes/pkg/controller"
    36  	"k8s.io/kubernetes/pkg/kubelet/util/format"
    37  	nodepkg "k8s.io/kubernetes/pkg/util/node"
    38  
    39  	"k8s.io/klog/v2"
    40  )
    41  
    42  // DeletePods will delete all pods from master running on given node,
    43  // and return true if any pods were deleted, or were found pending
    44  // deletion.
    45  func DeletePods(ctx context.Context, kubeClient clientset.Interface, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
    46  	remaining := false
    47  	var updateErrList []error
    48  	logger := klog.FromContext(ctx)
    49  
    50  	if len(pods) > 0 {
    51  		RecordNodeEvent(ctx, recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
    52  	}
    53  
    54  	for i := range pods {
    55  		// Defensive check, also needed for tests.
    56  		if pods[i].Spec.NodeName != nodeName {
    57  			continue
    58  		}
    59  
    60  		// Pod will be modified, so making copy is required.
    61  		pod := pods[i].DeepCopy()
    62  		// Set reason and message in the pod object.
    63  		if _, err := SetPodTerminationReason(ctx, kubeClient, pod, nodeName); err != nil {
    64  			if apierrors.IsConflict(err) {
    65  				updateErrList = append(updateErrList,
    66  					fmt.Errorf("update status failed for pod %q: %v", format.Pod(pod), err))
    67  				continue
    68  			}
    69  		}
    70  		// if the pod has already been marked for deletion, we still return true that there are remaining pods.
    71  		if pod.DeletionGracePeriodSeconds != nil {
    72  			remaining = true
    73  			continue
    74  		}
    75  		// if the pod is managed by a daemonset, ignore it
    76  		if _, err := daemonStore.GetPodDaemonSets(pod); err == nil {
    77  			// No error means at least one daemonset was found
    78  			continue
    79  		}
    80  
    81  		logger.V(2).Info("Starting deletion of pod", "pod", klog.KObj(pod))
    82  		recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
    83  		if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
    84  			if apierrors.IsNotFound(err) {
    85  				// NotFound error means that pod was already deleted.
    86  				// There is nothing left to do with this pod.
    87  				continue
    88  			}
    89  			return false, err
    90  		}
    91  		remaining = true
    92  	}
    93  
    94  	if len(updateErrList) > 0 {
    95  		return false, utilerrors.NewAggregate(updateErrList)
    96  	}
    97  	return remaining, nil
    98  }
    99  
   100  // SetPodTerminationReason attempts to set a reason and message in the
   101  // pod status, updates it in the apiserver, and returns an error if it
   102  // encounters one.
   103  func SetPodTerminationReason(ctx context.Context, kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
   104  	if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
   105  		return pod, nil
   106  	}
   107  
   108  	pod.Status.Reason = nodepkg.NodeUnreachablePodReason
   109  	pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
   110  
   111  	var updatedPod *v1.Pod
   112  	var err error
   113  	if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
   114  		return nil, err
   115  	}
   116  	return updatedPod, nil
   117  }
   118  
   119  // MarkPodsNotReady updates ready status of given pods running on
   120  // given node from master return true if success
   121  func MarkPodsNotReady(ctx context.Context, kubeClient clientset.Interface, recorder record.EventRecorder, pods []*v1.Pod, nodeName string) error {
   122  	logger := klog.FromContext(ctx)
   123  	logger.V(2).Info("Update ready status of pods on node", "node", klog.KRef("", nodeName))
   124  
   125  	errs := []error{}
   126  	for i := range pods {
   127  		// Defensive check, also needed for tests.
   128  		if pods[i].Spec.NodeName != nodeName {
   129  			continue
   130  		}
   131  
   132  		// Pod will be modified, so making copy is required.
   133  		pod := pods[i].DeepCopy()
   134  		for _, cond := range pod.Status.Conditions {
   135  			if cond.Type != v1.PodReady {
   136  				continue
   137  			}
   138  
   139  			cond.Status = v1.ConditionFalse
   140  			if !utilpod.UpdatePodCondition(&pod.Status, &cond) {
   141  				break
   142  			}
   143  
   144  			logger.V(2).Info("Updating ready status of pod to false", "pod", klog.KObj(pod))
   145  			if _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
   146  				if apierrors.IsNotFound(err) {
   147  					// NotFound error means that pod was already deleted.
   148  					// There is nothing left to do with this pod.
   149  					continue
   150  				}
   151  				logger.Info("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
   152  				errs = append(errs, err)
   153  			}
   154  			// record NodeNotReady event after updateStatus to make sure pod still exists
   155  			recorder.Event(pod, v1.EventTypeWarning, "NodeNotReady", "Node is not ready")
   156  			break
   157  		}
   158  	}
   159  
   160  	return utilerrors.NewAggregate(errs)
   161  }
   162  
   163  // RecordNodeEvent records a event related to a node.
   164  func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
   165  	logger := klog.FromContext(ctx)
   166  	ref := &v1.ObjectReference{
   167  		APIVersion: "v1",
   168  		Kind:       "Node",
   169  		Name:       nodeName,
   170  		UID:        types.UID(nodeUID),
   171  		Namespace:  "",
   172  	}
   173  	logger.V(2).Info("Recording event message for node", "event", event, "node", klog.KRef("", nodeName))
   174  	recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
   175  }
   176  
   177  // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam)
   178  func RecordNodeStatusChange(logger klog.Logger, recorder record.EventRecorder, node *v1.Node, newStatus string) {
   179  	ref := &v1.ObjectReference{
   180  		APIVersion: "v1",
   181  		Kind:       "Node",
   182  		Name:       node.Name,
   183  		UID:        node.UID,
   184  		Namespace:  "",
   185  	}
   186  	logger.V(2).Info("Recording status change event message for node", "status", newStatus, "node", node.Name)
   187  	// TODO: This requires a transaction, either both node status is updated
   188  	// and event is recorded or neither should happen, see issue #6055.
   189  	recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
   190  }
   191  
   192  // SwapNodeControllerTaint returns true in case of success and false
   193  // otherwise.
   194  func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
   195  	logger := klog.FromContext(ctx)
   196  	for _, taintToAdd := range taintsToAdd {
   197  		now := metav1.Now()
   198  		taintToAdd.TimeAdded = &now
   199  	}
   200  
   201  	err := controller.AddOrUpdateTaintOnNode(ctx, kubeClient, node.Name, taintsToAdd...)
   202  	if err != nil {
   203  		utilruntime.HandleError(
   204  			fmt.Errorf(
   205  				"unable to taint %+v unresponsive Node %q: %v",
   206  				taintsToAdd,
   207  				node.Name,
   208  				err))
   209  		return false
   210  	}
   211  	logger.V(4).Info("Added taint to node", "taint", taintsToAdd, "node", klog.KRef("", node.Name))
   212  
   213  	err = controller.RemoveTaintOffNode(ctx, kubeClient, node.Name, node, taintsToRemove...)
   214  	if err != nil {
   215  		utilruntime.HandleError(
   216  			fmt.Errorf(
   217  				"unable to remove %+v unneeded taint from unresponsive Node %q: %v",
   218  				taintsToRemove,
   219  				node.Name,
   220  				err))
   221  		return false
   222  	}
   223  	logger.V(4).Info("Made sure that node has no taint", "node", klog.KRef("", node.Name), "taint", taintsToRemove)
   224  
   225  	return true
   226  }
   227  
   228  // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on
   229  // success and false on failure.
   230  func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
   231  	logger := klog.FromContext(ctx)
   232  	if err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate); err != nil {
   233  		utilruntime.HandleError(
   234  			fmt.Errorf(
   235  				"unable to update labels %+v for Node %q: %v",
   236  				labelsToUpdate,
   237  				node.Name,
   238  				err))
   239  		return false
   240  	}
   241  	logger.V(4).Info("Updated labels to node", "label", labelsToUpdate, "node", klog.KRef("", node.Name))
   242  	return true
   243  }
   244  
   245  // CreateAddNodeHandler creates an add node handler.
   246  func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
   247  	return func(originalObj interface{}) {
   248  		node := originalObj.(*v1.Node).DeepCopy()
   249  		if err := f(node); err != nil {
   250  			utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err))
   251  		}
   252  	}
   253  }
   254  
   255  // CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam)
   256  func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
   257  	return func(origOldObj, origNewObj interface{}) {
   258  		node := origNewObj.(*v1.Node).DeepCopy()
   259  		prevNode := origOldObj.(*v1.Node).DeepCopy()
   260  
   261  		if err := f(prevNode, node); err != nil {
   262  			utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
   263  		}
   264  	}
   265  }
   266  
   267  // CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam)
   268  func CreateDeleteNodeHandler(logger klog.Logger, f func(node *v1.Node) error) func(obj interface{}) {
   269  	return func(originalObj interface{}) {
   270  		originalNode, isNode := originalObj.(*v1.Node)
   271  		// We can get DeletedFinalStateUnknown instead of *v1.Node here and
   272  		// we need to handle that correctly. #34692
   273  		if !isNode {
   274  			deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
   275  			if !ok {
   276  				logger.Error(nil, "Received unexpected object", "object", originalObj)
   277  				return
   278  			}
   279  			originalNode, ok = deletedState.Obj.(*v1.Node)
   280  			if !ok {
   281  				logger.Error(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj)
   282  				return
   283  			}
   284  		}
   285  		node := originalNode.DeepCopy()
   286  		if err := f(node); err != nil {
   287  			utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
   288  		}
   289  	}
   290  }
   291  
   292  // GetNodeCondition extracts the provided condition from the given status and returns that.
   293  // Returns nil and -1 if the condition is not present, and the index of the located condition.
   294  func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
   295  	if status == nil {
   296  		return -1, nil
   297  	}
   298  	for i := range status.Conditions {
   299  		if status.Conditions[i].Type == conditionType {
   300  			return i, &status.Conditions[i]
   301  		}
   302  	}
   303  	return -1, nil
   304  }
   305  

View as plain text