1
16
17 package podgc
18
19 import (
20 "context"
21 "sort"
22 "sync"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/labels"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/sets"
31 "k8s.io/apimachinery/pkg/util/wait"
32 utilfeature "k8s.io/apiserver/pkg/util/feature"
33 coreinformers "k8s.io/client-go/informers/core/v1"
34 clientset "k8s.io/client-go/kubernetes"
35 corelisters "k8s.io/client-go/listers/core/v1"
36 "k8s.io/client-go/tools/cache"
37 "k8s.io/client-go/util/workqueue"
38 "k8s.io/klog/v2"
39 apipod "k8s.io/kubernetes/pkg/api/v1/pod"
40 "k8s.io/kubernetes/pkg/controller/podgc/metrics"
41 "k8s.io/kubernetes/pkg/features"
42 "k8s.io/kubernetes/pkg/kubelet/eviction"
43 nodeutil "k8s.io/kubernetes/pkg/util/node"
44 utilpod "k8s.io/kubernetes/pkg/util/pod"
45 "k8s.io/kubernetes/pkg/util/taints"
46 )
47
48 const (
49
50 gcCheckPeriod = 20 * time.Second
51
52
53 quarantineTime = 40 * time.Second
54 )
55
56 type PodGCController struct {
57 kubeClient clientset.Interface
58
59 podLister corelisters.PodLister
60 podListerSynced cache.InformerSynced
61 nodeLister corelisters.NodeLister
62 nodeListerSynced cache.InformerSynced
63
64 nodeQueue workqueue.DelayingInterface
65
66 terminatedPodThreshold int
67 gcCheckPeriod time.Duration
68 quarantineTime time.Duration
69 }
70
71 func NewPodGC(ctx context.Context, kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
72 nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int) *PodGCController {
73 return NewPodGCInternal(ctx, kubeClient, podInformer, nodeInformer, terminatedPodThreshold, gcCheckPeriod, quarantineTime)
74 }
75
76
77 func NewPodGCInternal(ctx context.Context, kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
78 nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int, gcCheckPeriod, quarantineTime time.Duration) *PodGCController {
79 gcc := &PodGCController{
80 kubeClient: kubeClient,
81 terminatedPodThreshold: terminatedPodThreshold,
82 podLister: podInformer.Lister(),
83 podListerSynced: podInformer.Informer().HasSynced,
84 nodeLister: nodeInformer.Lister(),
85 nodeListerSynced: nodeInformer.Informer().HasSynced,
86 nodeQueue: workqueue.NewNamedDelayingQueue("orphaned_pods_nodes"),
87 gcCheckPeriod: gcCheckPeriod,
88 quarantineTime: quarantineTime,
89 }
90
91
92 metrics.RegisterMetrics()
93 return gcc
94 }
95
96 func (gcc *PodGCController) Run(ctx context.Context) {
97 logger := klog.FromContext(ctx)
98
99 defer utilruntime.HandleCrash()
100
101 logger.Info("Starting GC controller")
102 defer gcc.nodeQueue.ShutDown()
103 defer logger.Info("Shutting down GC controller")
104
105 if !cache.WaitForNamedCacheSync("GC", ctx.Done(), gcc.podListerSynced, gcc.nodeListerSynced) {
106 return
107 }
108
109 go wait.UntilWithContext(ctx, gcc.gc, gcc.gcCheckPeriod)
110
111 <-ctx.Done()
112 }
113
114 func (gcc *PodGCController) gc(ctx context.Context) {
115 pods, err := gcc.podLister.List(labels.Everything())
116 if err != nil {
117 klog.FromContext(ctx).Error(err, "Error while listing all pods")
118 return
119 }
120 nodes, err := gcc.nodeLister.List(labels.Everything())
121 if err != nil {
122 klog.FromContext(ctx).Error(err, "Error while listing all nodes")
123 return
124 }
125 if gcc.terminatedPodThreshold > 0 {
126 gcc.gcTerminated(ctx, pods)
127 }
128 gcc.gcTerminating(ctx, pods)
129 gcc.gcOrphaned(ctx, pods, nodes)
130 gcc.gcUnscheduledTerminating(ctx, pods)
131 }
132
133 func isPodTerminated(pod *v1.Pod) bool {
134 if phase := pod.Status.Phase; phase != v1.PodPending && phase != v1.PodRunning && phase != v1.PodUnknown {
135 return true
136 }
137 return false
138 }
139
140
141 func isPodTerminating(pod *v1.Pod) bool {
142 return pod.ObjectMeta.DeletionTimestamp != nil
143 }
144
145 func (gcc *PodGCController) gcTerminating(ctx context.Context, pods []*v1.Pod) {
146 logger := klog.FromContext(ctx)
147 logger.V(4).Info("GC'ing terminating pods that are on out-of-service nodes")
148 terminatingPods := []*v1.Pod{}
149 for _, pod := range pods {
150 if isPodTerminating(pod) {
151 node, err := gcc.nodeLister.Get(pod.Spec.NodeName)
152 if err != nil {
153 logger.Error(err, "Failed to get node", "node", klog.KRef("", pod.Spec.NodeName))
154 continue
155 }
156
157
158
159 if !nodeutil.IsNodeReady(node) && taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService) {
160 logger.V(4).Info("Garbage collecting pod that is terminating", "pod", klog.KObj(pod), "phase", pod.Status.Phase)
161 terminatingPods = append(terminatingPods, pod)
162 }
163 }
164 }
165
166 deleteCount := len(terminatingPods)
167 if deleteCount == 0 {
168 return
169 }
170
171 logger.V(4).Info("Garbage collecting pods that are terminating on node tainted with node.kubernetes.io/out-of-service", "numPods", deleteCount)
172
173 sort.Sort(byEvictionAndCreationTimestamp(terminatingPods))
174 var wait sync.WaitGroup
175 for i := 0; i < deleteCount; i++ {
176 wait.Add(1)
177 go func(pod *v1.Pod) {
178 defer wait.Done()
179 metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingOutOfService).Inc()
180 if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
181
182 utilruntime.HandleError(err)
183 metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingOutOfService).Inc()
184 }
185 }(terminatingPods[i])
186 }
187 wait.Wait()
188 }
189
190 func (gcc *PodGCController) gcTerminated(ctx context.Context, pods []*v1.Pod) {
191 terminatedPods := []*v1.Pod{}
192 for _, pod := range pods {
193 if isPodTerminated(pod) {
194 terminatedPods = append(terminatedPods, pod)
195 }
196 }
197
198 terminatedPodCount := len(terminatedPods)
199 deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
200
201 if deleteCount <= 0 {
202 return
203 }
204
205 logger := klog.FromContext(ctx)
206 logger.Info("Garbage collecting pods", "numPods", deleteCount)
207
208 sort.Sort(byEvictionAndCreationTimestamp(terminatedPods))
209 var wait sync.WaitGroup
210 for i := 0; i < deleteCount; i++ {
211 wait.Add(1)
212 go func(pod *v1.Pod) {
213 defer wait.Done()
214 if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
215
216 defer utilruntime.HandleError(err)
217 metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminated).Inc()
218 }
219 metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminated).Inc()
220 }(terminatedPods[i])
221 }
222 wait.Wait()
223 }
224
225
226 func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, nodes []*v1.Node) {
227 logger := klog.FromContext(ctx)
228 logger.V(4).Info("GC'ing orphaned")
229 existingNodeNames := sets.NewString()
230 for _, node := range nodes {
231 existingNodeNames.Insert(node.Name)
232 }
233
234 for _, pod := range pods {
235 if pod.Spec.NodeName != "" && !existingNodeNames.Has(pod.Spec.NodeName) {
236 gcc.nodeQueue.AddAfter(pod.Spec.NodeName, gcc.quarantineTime)
237 }
238 }
239
240 deletedNodesNames, quit := gcc.discoverDeletedNodes(ctx, existingNodeNames)
241 if quit {
242 return
243 }
244
245 for _, pod := range pods {
246 if !deletedNodesNames.Has(pod.Spec.NodeName) {
247 continue
248 }
249 logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName))
250 condition := &v1.PodCondition{
251 Type: v1.DisruptionTarget,
252 Status: v1.ConditionTrue,
253 Reason: "DeletionByPodGC",
254 Message: "PodGC: node no longer exists",
255 }
256 if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
257 utilruntime.HandleError(err)
258 metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc()
259 } else {
260 logger.Info("Forced deletion of orphaned Pod succeeded", "pod", klog.KObj(pod))
261 }
262 metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc()
263 }
264 }
265
266 func (gcc *PodGCController) discoverDeletedNodes(ctx context.Context, existingNodeNames sets.String) (sets.String, bool) {
267 deletedNodesNames := sets.NewString()
268 for gcc.nodeQueue.Len() > 0 {
269 item, quit := gcc.nodeQueue.Get()
270 if quit {
271 return nil, true
272 }
273 nodeName := item.(string)
274 if !existingNodeNames.Has(nodeName) {
275 exists, err := gcc.checkIfNodeExists(ctx, nodeName)
276 switch {
277 case err != nil:
278 klog.FromContext(ctx).Error(err, "Error while getting node", "node", klog.KRef("", nodeName))
279
280 case !exists:
281 deletedNodesNames.Insert(nodeName)
282 }
283 }
284 gcc.nodeQueue.Done(item)
285 }
286 return deletedNodesNames, false
287 }
288
289 func (gcc *PodGCController) checkIfNodeExists(ctx context.Context, name string) (bool, error) {
290 _, fetchErr := gcc.kubeClient.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
291 if errors.IsNotFound(fetchErr) {
292 return false, nil
293 }
294 return fetchErr == nil, fetchErr
295 }
296
297
298 func (gcc *PodGCController) gcUnscheduledTerminating(ctx context.Context, pods []*v1.Pod) {
299 logger := klog.FromContext(ctx)
300 logger.V(4).Info("GC'ing unscheduled pods which are terminating")
301
302 for _, pod := range pods {
303 if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
304 continue
305 }
306
307 logger.V(2).Info("Found unscheduled terminating Pod not assigned to any Node, deleting", "pod", klog.KObj(pod))
308 if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
309 utilruntime.HandleError(err)
310 metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingUnscheduled).Inc()
311 } else {
312 logger.Info("Forced deletion of unscheduled terminating Pod succeeded", "pod", klog.KObj(pod))
313 }
314 metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingUnscheduled).Inc()
315 }
316 }
317
318
319
320
321 type byEvictionAndCreationTimestamp []*v1.Pod
322
323 func (o byEvictionAndCreationTimestamp) Len() int { return len(o) }
324 func (o byEvictionAndCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
325
326 func (o byEvictionAndCreationTimestamp) Less(i, j int) bool {
327 iEvicted, jEvicted := eviction.PodIsEvicted(o[i].Status), eviction.PodIsEvicted(o[j].Status)
328
329 if iEvicted != jEvicted {
330 return iEvicted
331 }
332 if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
333 return o[i].Name < o[j].Name
334 }
335 return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
336 }
337
338 func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.Pod) error {
339 return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil)
340 }
341
342 func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error {
343 logger := klog.FromContext(ctx)
344 logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod))
345
346
347
348
349 if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) || utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
350
351
352
353
354 if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
355 newStatus := pod.Status.DeepCopy()
356 newStatus.Phase = v1.PodFailed
357 if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
358 apipod.UpdatePodCondition(newStatus, condition)
359 }
360 if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
361 return err
362 }
363 }
364 }
365 return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0))
366 }
367
View as plain text