1
16
17 package node
18
19 import (
20 "context"
21 "fmt"
22
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/types"
26 utilerrors "k8s.io/apimachinery/pkg/util/errors"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/client-go/tools/cache"
29 "k8s.io/client-go/tools/record"
30
31 v1 "k8s.io/api/core/v1"
32 clientset "k8s.io/client-go/kubernetes"
33 appsv1listers "k8s.io/client-go/listers/apps/v1"
34 utilpod "k8s.io/kubernetes/pkg/api/v1/pod"
35 "k8s.io/kubernetes/pkg/controller"
36 "k8s.io/kubernetes/pkg/kubelet/util/format"
37 nodepkg "k8s.io/kubernetes/pkg/util/node"
38
39 "k8s.io/klog/v2"
40 )
41
42
43
44
45 func DeletePods(ctx context.Context, kubeClient clientset.Interface, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
46 remaining := false
47 var updateErrList []error
48 logger := klog.FromContext(ctx)
49
50 if len(pods) > 0 {
51 RecordNodeEvent(ctx, recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
52 }
53
54 for i := range pods {
55
56 if pods[i].Spec.NodeName != nodeName {
57 continue
58 }
59
60
61 pod := pods[i].DeepCopy()
62
63 if _, err := SetPodTerminationReason(ctx, kubeClient, pod, nodeName); err != nil {
64 if apierrors.IsConflict(err) {
65 updateErrList = append(updateErrList,
66 fmt.Errorf("update status failed for pod %q: %v", format.Pod(pod), err))
67 continue
68 }
69 }
70
71 if pod.DeletionGracePeriodSeconds != nil {
72 remaining = true
73 continue
74 }
75
76 if _, err := daemonStore.GetPodDaemonSets(pod); err == nil {
77
78 continue
79 }
80
81 logger.V(2).Info("Starting deletion of pod", "pod", klog.KObj(pod))
82 recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
83 if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
84 if apierrors.IsNotFound(err) {
85
86
87 continue
88 }
89 return false, err
90 }
91 remaining = true
92 }
93
94 if len(updateErrList) > 0 {
95 return false, utilerrors.NewAggregate(updateErrList)
96 }
97 return remaining, nil
98 }
99
100
101
102
103 func SetPodTerminationReason(ctx context.Context, kubeClient clientset.Interface, pod *v1.Pod, nodeName string) (*v1.Pod, error) {
104 if pod.Status.Reason == nodepkg.NodeUnreachablePodReason {
105 return pod, nil
106 }
107
108 pod.Status.Reason = nodepkg.NodeUnreachablePodReason
109 pod.Status.Message = fmt.Sprintf(nodepkg.NodeUnreachablePodMessage, nodeName, pod.Name)
110
111 var updatedPod *v1.Pod
112 var err error
113 if updatedPod, err = kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
114 return nil, err
115 }
116 return updatedPod, nil
117 }
118
119
120
121 func MarkPodsNotReady(ctx context.Context, kubeClient clientset.Interface, recorder record.EventRecorder, pods []*v1.Pod, nodeName string) error {
122 logger := klog.FromContext(ctx)
123 logger.V(2).Info("Update ready status of pods on node", "node", klog.KRef("", nodeName))
124
125 errs := []error{}
126 for i := range pods {
127
128 if pods[i].Spec.NodeName != nodeName {
129 continue
130 }
131
132
133 pod := pods[i].DeepCopy()
134 for _, cond := range pod.Status.Conditions {
135 if cond.Type != v1.PodReady {
136 continue
137 }
138
139 cond.Status = v1.ConditionFalse
140 if !utilpod.UpdatePodCondition(&pod.Status, &cond) {
141 break
142 }
143
144 logger.V(2).Info("Updating ready status of pod to false", "pod", klog.KObj(pod))
145 if _, err := kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
146 if apierrors.IsNotFound(err) {
147
148
149 continue
150 }
151 logger.Info("Failed to update status for pod", "pod", klog.KObj(pod), "err", err)
152 errs = append(errs, err)
153 }
154
155 recorder.Event(pod, v1.EventTypeWarning, "NodeNotReady", "Node is not ready")
156 break
157 }
158 }
159
160 return utilerrors.NewAggregate(errs)
161 }
162
163
164 func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
165 logger := klog.FromContext(ctx)
166 ref := &v1.ObjectReference{
167 APIVersion: "v1",
168 Kind: "Node",
169 Name: nodeName,
170 UID: types.UID(nodeUID),
171 Namespace: "",
172 }
173 logger.V(2).Info("Recording event message for node", "event", event, "node", klog.KRef("", nodeName))
174 recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
175 }
176
177
178 func RecordNodeStatusChange(logger klog.Logger, recorder record.EventRecorder, node *v1.Node, newStatus string) {
179 ref := &v1.ObjectReference{
180 APIVersion: "v1",
181 Kind: "Node",
182 Name: node.Name,
183 UID: node.UID,
184 Namespace: "",
185 }
186 logger.V(2).Info("Recording status change event message for node", "status", newStatus, "node", node.Name)
187
188
189 recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus)
190 }
191
192
193
194 func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool {
195 logger := klog.FromContext(ctx)
196 for _, taintToAdd := range taintsToAdd {
197 now := metav1.Now()
198 taintToAdd.TimeAdded = &now
199 }
200
201 err := controller.AddOrUpdateTaintOnNode(ctx, kubeClient, node.Name, taintsToAdd...)
202 if err != nil {
203 utilruntime.HandleError(
204 fmt.Errorf(
205 "unable to taint %+v unresponsive Node %q: %v",
206 taintsToAdd,
207 node.Name,
208 err))
209 return false
210 }
211 logger.V(4).Info("Added taint to node", "taint", taintsToAdd, "node", klog.KRef("", node.Name))
212
213 err = controller.RemoveTaintOffNode(ctx, kubeClient, node.Name, node, taintsToRemove...)
214 if err != nil {
215 utilruntime.HandleError(
216 fmt.Errorf(
217 "unable to remove %+v unneeded taint from unresponsive Node %q: %v",
218 taintsToRemove,
219 node.Name,
220 err))
221 return false
222 }
223 logger.V(4).Info("Made sure that node has no taint", "node", klog.KRef("", node.Name), "taint", taintsToRemove)
224
225 return true
226 }
227
228
229
230 func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool {
231 logger := klog.FromContext(ctx)
232 if err := controller.AddOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate); err != nil {
233 utilruntime.HandleError(
234 fmt.Errorf(
235 "unable to update labels %+v for Node %q: %v",
236 labelsToUpdate,
237 node.Name,
238 err))
239 return false
240 }
241 logger.V(4).Info("Updated labels to node", "label", labelsToUpdate, "node", klog.KRef("", node.Name))
242 return true
243 }
244
245
246 func CreateAddNodeHandler(f func(node *v1.Node) error) func(obj interface{}) {
247 return func(originalObj interface{}) {
248 node := originalObj.(*v1.Node).DeepCopy()
249 if err := f(node); err != nil {
250 utilruntime.HandleError(fmt.Errorf("Error while processing Node Add: %v", err))
251 }
252 }
253 }
254
255
256 func CreateUpdateNodeHandler(f func(oldNode, newNode *v1.Node) error) func(oldObj, newObj interface{}) {
257 return func(origOldObj, origNewObj interface{}) {
258 node := origNewObj.(*v1.Node).DeepCopy()
259 prevNode := origOldObj.(*v1.Node).DeepCopy()
260
261 if err := f(prevNode, node); err != nil {
262 utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
263 }
264 }
265 }
266
267
268 func CreateDeleteNodeHandler(logger klog.Logger, f func(node *v1.Node) error) func(obj interface{}) {
269 return func(originalObj interface{}) {
270 originalNode, isNode := originalObj.(*v1.Node)
271
272
273 if !isNode {
274 deletedState, ok := originalObj.(cache.DeletedFinalStateUnknown)
275 if !ok {
276 logger.Error(nil, "Received unexpected object", "object", originalObj)
277 return
278 }
279 originalNode, ok = deletedState.Obj.(*v1.Node)
280 if !ok {
281 logger.Error(nil, "DeletedFinalStateUnknown contained non-Node object", "object", deletedState.Obj)
282 return
283 }
284 }
285 node := originalNode.DeepCopy()
286 if err := f(node); err != nil {
287 utilruntime.HandleError(fmt.Errorf("Error while processing Node Add/Delete: %v", err))
288 }
289 }
290 }
291
292
293
294 func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
295 if status == nil {
296 return -1, nil
297 }
298 for i := range status.Conditions {
299 if status.Conditions[i].Type == conditionType {
300 return i, &status.Conditions[i]
301 }
302 }
303 return -1, nil
304 }
305
View as plain text