/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package node import ( "context" "fmt" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" v1 "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" appsv1listers "k8s.io/client-go/listers/apps/v1" utilpod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/kubelet/util/format" nodepkg "k8s.io/kubernetes/pkg/util/node" "k8s.io/klog/v2" ) // DeletePods will delete all pods from master running on given node, // and return true if any pods were deleted, or were found pending // deletion. func DeletePods(ctx context.Context, kubeClient clientset.Interface, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) { remaining := false var updateErrList []error logger := klog.FromContext(ctx) if len(pods) > 0 { RecordNodeEvent(ctx, recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } for i := range pods { // Defensive check, also needed for tests. if pods[i].Spec.NodeName != nodeName { continue } // Pod will be modified, so making copy is required. pod := pods[i].DeepCopy() // Set reason and message in the pod object. if _, err := SetPodTerminationReason(ctx, kubeClient, pod, nodeName); err != nil { if apierrors.IsConflict(err) { updateErrList = append(updateErrList, fmt.Errorf("update status failed for pod %q: %v", format.Pod(pod), err)) continue } } // if the pod has already been marked for deletion, we still return true that there are remaining pods. if pod.DeletionGracePeriodSeconds != nil { remaining = true continue } // if the pod is managed by a daemonset, ignore it if _, err := daemonStore.GetPodDaemonSets(pod); err == nil { // No error means at least one daemonset was found continue } logger.V(2).Info("Starting deletion of pod", "pod", klog.KObj(pod)) recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { if apierrors.IsNotFound(err) { // NotFound error means that pod was already deleted. // There is nothing left to do with this pod. continue } return false, err } remaining = true } if len(updateErrList) > 0 { return false, utilerrors.NewAggregate(updateErrList) } return remaining, nil } // SetPodTerminationReason attempts to set a reason and message in the // pod status, updates it in the apiserver, and returns an error if it // encounters one. func SetPodTerminationReason(ctx context.Context, kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) { if pod.Status.Reason == nodepkg.NodeUnreachablePodReason { return pod, nil } pod.Status.Reason = nodepkg.NodeUnreachablePodReason pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name) var updatedPod *v1.Pod var err error if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil { return nil, err } return updatedPod, nil } // MarkPodsNotReady updates ready status of given pods running on // given node from master return true if success func MarkPodsNotReady(ctx context.Context, kubeClient clientset.Interface, recorder record.EventRecorder, pods []*v1.Pod, nodeName string) error { logger := klog.FromContext(ctx) logger.V(2).Info("Update ready status of pods on node", "node", klog.KRef("", nodeName)) errs := []error{} for i := range pods { // Defensive check, also needed for tests. if pods[i].Spec.NodeName != nodeName { continue } // Pod will be modified, so making copy is required. pod := pods[i].DeepCopy() for _, cond := range pod.Status.Conditions { if cond.Type != v1.PodReady { continue } cond.Status = v1.ConditionFalse if !utilpod.UpdatePodCondition(&pod.Status, &cond) { break } logger.V(2).Info("Updating ready status of pod to false", "pod", klog.KObj(pod)) if _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil { if apierrors.IsNotFound(err) { // NotFound error means that pod was already deleted. // There is nothing left to do with this pod. continue } logger.Info("Failed to update status for pod", "pod", klog.KObj(pod), "err", err) errs = append(errs, err) } // record NodeNotReady event after updateStatus to make sure pod still exists recorder.Event(pod, v1.EventTypeWarning, "NodeNotReady", "Node is not ready") break } } return utilerrors.NewAggregate(errs) } // RecordNodeEvent records a event related to a node. func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { logger := klog.FromContext(ctx) ref := &v1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: nodeName, UID: types.UID(nodeUID), Namespace: "", } logger.V(2).Info("Recording event message for node", "event", event, "node", klog.KRef("", nodeName)) recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) } // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) func RecordNodeStatusChange(logger klog.Logger, recorder record.EventRecorder, node *v1.Node, newStatus string) { ref := &v1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: node.Name, UID: node.UID, Namespace: "", } logger.V(2).Info("Recording status change event message for node", "status", newStatus, "node", node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) } // SwapNodeControllerTaint returns true in case of success and false // otherwise. func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool { logger := klog.FromContext(ctx) for _, taintToAdd := range taintsToAdd { now := metav1.Now() taintToAdd.TimeAdded = &now } err := controller.AddOrUpdateTaintOnNode(ctx, kubeClient, node.Name, taintsToAdd...) if err != nil { utilruntime.HandleError( fmt.Errorf( "unable to taint %+v unresponsive Node %q: %v", taintsToAdd, node.Name, err)) return false } logger.V(4).Info("Added taint to node", "taint", taintsToAdd, "node", klog.KRef("", node.Name)) err = controller.RemoveTaintOffNode(ctx, kubeClient, node.Name, node, taintsToRemove...) if err != nil { utilruntime.HandleError( fmt.Errorf( "unable to remove %+v unneeded taint from unresponsive Node %q: %v", taintsToRemove, node.Name, err)) return false } logger.V(4).Info("Made sure that node has no taint", "node", klog.KRef("", node.Name), "taint", taintsToRemove) return true } // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on // success and false on failure. func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool { logger := klog.FromContext(ctx) if err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate); err != nil { utilruntime.HandleError( fmt.Errorf( "unable to update labels %+v for Node %q: %v", labelsToUpdate, node.Name, err)) return false } logger.V(4).Info("Updated labels to node", "label", labelsToUpdate, "node", klog.KRef("", node.Name)) return true } // CreateAddNodeHandler creates an add node handler. func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { node := originalObj.(*v1.Node).DeepCopy() if err := f(node); err != nil { utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err)) } } } // CreateUpdateNodeHandler creates a node update handler. (Common to lifecycle and ipam) func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) { return func(origOldObj, origNewObj interface{}) { node := origNewObj.(*v1.Node).DeepCopy() prevNode := origOldObj.(*v1.Node).DeepCopy() if err := f(prevNode, node); err != nil { utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err)) } } } // CreateDeleteNodeHandler creates a delete node handler. (Common to lifecycle and ipam) func CreateDeleteNodeHandler(logger klog.Logger, f func(node *v1.Node) error) func(obj interface{}) { return func(originalObj interface{}) { originalNode, isNode := originalObj.(*v1.Node) // We can get DeletedFinalStateUnknown instead of *v1.Node here and // we need to handle that correctly. #34692 if !isNode { deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown) if !ok { logger.Error(nil, "Received unexpected object", "object", originalObj) return } originalNode, ok = deletedState.Obj.(*v1.Node) if !ok { logger.Error(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj) return } } node := originalNode.DeepCopy() if err := f(node); err != nil { utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err)) } } } // GetNodeCondition extracts the provided condition from the given status and returns that. // Returns nil and -1 if the condition is not present, and the index of the located condition. func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) { if status == nil { return -1, nil } for i := range status.Conditions { if status.Conditions[i].Type == conditionType { return i, &status.Conditions[i] } } return -1, nil }