...
1
16
17 package kubelet
18
19 import (
20 "context"
21 "sort"
22
23 "k8s.io/apimachinery/pkg/util/wait"
24 "k8s.io/klog/v2"
25 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
26 )
27
28 const (
29
30
31 containerDeletorBufferLimit = 50
32 )
33
34 type containerStatusbyCreatedList []*kubecontainer.Status
35
36 type podContainerDeletor struct {
37 worker chan<- kubecontainer.ContainerID
38 containersToKeep int
39 }
40
41 func (a containerStatusbyCreatedList) Len() int { return len(a) }
42 func (a containerStatusbyCreatedList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
43 func (a containerStatusbyCreatedList) Less(i, j int) bool {
44 return a[i].CreatedAt.After(a[j].CreatedAt)
45 }
46
47 func newPodContainerDeletor(runtime kubecontainer.Runtime, containersToKeep int) *podContainerDeletor {
48 buffer := make(chan kubecontainer.ContainerID, containerDeletorBufferLimit)
49 go wait.Until(func() {
50 for {
51 id := <-buffer
52 if err := runtime.DeleteContainer(context.Background(), id); err != nil {
53 klog.InfoS("DeleteContainer returned error", "containerID", id, "err", err)
54 }
55 }
56 }, 0, wait.NeverStop)
57
58 return &podContainerDeletor{
59 worker: buffer,
60 containersToKeep: containersToKeep,
61 }
62 }
63
64
65
66 func getContainersToDeleteInPod(filterContainerID string, podStatus *kubecontainer.PodStatus, containersToKeep int) containerStatusbyCreatedList {
67 matchedContainer := func(filterContainerId string, podStatus *kubecontainer.PodStatus) *kubecontainer.Status {
68 if filterContainerId == "" {
69 return nil
70 }
71 for _, containerStatus := range podStatus.ContainerStatuses {
72 if containerStatus.ID.ID == filterContainerId {
73 return containerStatus
74 }
75 }
76 return nil
77 }(filterContainerID, podStatus)
78
79 if filterContainerID != "" && matchedContainer == nil {
80 klog.InfoS("Container not found in pod's containers", "containerID", filterContainerID)
81 return containerStatusbyCreatedList{}
82 }
83
84
85 var candidates containerStatusbyCreatedList
86 for _, containerStatus := range podStatus.ContainerStatuses {
87 if containerStatus.State != kubecontainer.ContainerStateExited {
88 continue
89 }
90 if matchedContainer == nil || matchedContainer.Name == containerStatus.Name {
91 candidates = append(candidates, containerStatus)
92 }
93 }
94
95 if len(candidates) <= containersToKeep {
96 return containerStatusbyCreatedList{}
97 }
98 sort.Sort(candidates)
99 return candidates[containersToKeep:]
100 }
101
102
103 func (p *podContainerDeletor) deleteContainersInPod(filterContainerID string, podStatus *kubecontainer.PodStatus, removeAll bool) {
104 containersToKeep := p.containersToKeep
105 if removeAll {
106 containersToKeep = 0
107 filterContainerID = ""
108 }
109
110 for _, candidate := range getContainersToDeleteInPod(filterContainerID, podStatus, containersToKeep) {
111 select {
112 case p.worker <- candidate.ID:
113 default:
114 klog.InfoS("Failed to issue the request to remove container", "containerID", candidate.ID)
115 }
116 }
117 }
118
View as plain text