1
16
17 package kuberuntime
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "path/filepath"
24 "sort"
25 "time"
26
27 "go.opentelemetry.io/otel/trace"
28 "k8s.io/apimachinery/pkg/types"
29 utilerrors "k8s.io/apimachinery/pkg/util/errors"
30 "k8s.io/apimachinery/pkg/util/sets"
31 internalapi "k8s.io/cri-api/pkg/apis"
32 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
33 "k8s.io/klog/v2"
34 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
35 )
36
37
38 type containerGC struct {
39 client internalapi.RuntimeService
40 manager *kubeGenericRuntimeManager
41 podStateProvider podStateProvider
42 tracer trace.Tracer
43 }
44
45
46 func newContainerGC(client internalapi.RuntimeService, podStateProvider podStateProvider, manager *kubeGenericRuntimeManager, tracer trace.Tracer) *containerGC {
47 return &containerGC{
48 client: client,
49 manager: manager,
50 podStateProvider: podStateProvider,
51 tracer: tracer,
52 }
53 }
54
55
56 type containerGCInfo struct {
57
58 id string
59
60 name string
61
62 createTime time.Time
63
64
65 unknown bool
66 }
67
68
69 type sandboxGCInfo struct {
70
71 id string
72
73 createTime time.Time
74
75 active bool
76 }
77
78
79 type evictUnit struct {
80
81 uid types.UID
82
83 name string
84 }
85
86 type containersByEvictUnit map[evictUnit][]containerGCInfo
87 type sandboxesByPodUID map[types.UID][]sandboxGCInfo
88
89
90 func (cu containersByEvictUnit) NumContainers() int {
91 num := 0
92 for key := range cu {
93 num += len(cu[key])
94 }
95 return num
96 }
97
98
99 func (cu containersByEvictUnit) NumEvictUnits() int {
100 return len(cu)
101 }
102
103
104 type byCreated []containerGCInfo
105
106 func (a byCreated) Len() int { return len(a) }
107 func (a byCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
108 func (a byCreated) Less(i, j int) bool { return a[i].createTime.After(a[j].createTime) }
109
110
111 type sandboxByCreated []sandboxGCInfo
112
113 func (a sandboxByCreated) Len() int { return len(a) }
114 func (a sandboxByCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
115 func (a sandboxByCreated) Less(i, j int) bool { return a[i].createTime.After(a[j].createTime) }
116
117
118 func (cgc *containerGC) enforceMaxContainersPerEvictUnit(ctx context.Context, evictUnits containersByEvictUnit, MaxContainers int) {
119 for key := range evictUnits {
120 toRemove := len(evictUnits[key]) - MaxContainers
121
122 if toRemove > 0 {
123 evictUnits[key] = cgc.removeOldestN(ctx, evictUnits[key], toRemove)
124 }
125 }
126 }
127
128
129 func (cgc *containerGC) removeOldestN(ctx context.Context, containers []containerGCInfo, toRemove int) []containerGCInfo {
130
131 numToKeep := len(containers) - toRemove
132 if numToKeep > 0 {
133 sort.Sort(byCreated(containers))
134 }
135 for i := len(containers) - 1; i >= numToKeep; i-- {
136 if containers[i].unknown {
137
138
139 id := kubecontainer.ContainerID{
140 Type: cgc.manager.runtimeName,
141 ID: containers[i].id,
142 }
143 message := "Container is in unknown state, try killing it before removal"
144 if err := cgc.manager.killContainer(ctx, nil, id, containers[i].name, message, reasonUnknown, nil, nil); err != nil {
145 klog.ErrorS(err, "Failed to stop container", "containerID", containers[i].id)
146 continue
147 }
148 }
149 if err := cgc.manager.removeContainer(ctx, containers[i].id); err != nil {
150 klog.ErrorS(err, "Failed to remove container", "containerID", containers[i].id)
151 }
152 }
153
154
155 return containers[:numToKeep]
156 }
157
158
159
160 func (cgc *containerGC) removeOldestNSandboxes(ctx context.Context, sandboxes []sandboxGCInfo, toRemove int) {
161 numToKeep := len(sandboxes) - toRemove
162 if numToKeep > 0 {
163 sort.Sort(sandboxByCreated(sandboxes))
164 }
165
166 for i := len(sandboxes) - 1; i >= numToKeep; i-- {
167 if !sandboxes[i].active {
168 cgc.removeSandbox(ctx, sandboxes[i].id)
169 }
170 }
171 }
172
173
174 func (cgc *containerGC) removeSandbox(ctx context.Context, sandboxID string) {
175 klog.V(4).InfoS("Removing sandbox", "sandboxID", sandboxID)
176
177
178
179 if err := cgc.client.StopPodSandbox(ctx, sandboxID); err != nil {
180 klog.ErrorS(err, "Failed to stop sandbox before removing", "sandboxID", sandboxID)
181 return
182 }
183 if err := cgc.client.RemovePodSandbox(ctx, sandboxID); err != nil {
184 klog.ErrorS(err, "Failed to remove sandbox", "sandboxID", sandboxID)
185 }
186 }
187
188
189
190 func (cgc *containerGC) evictableContainers(ctx context.Context, minAge time.Duration) (containersByEvictUnit, error) {
191 containers, err := cgc.manager.getKubeletContainers(ctx, true)
192 if err != nil {
193 return containersByEvictUnit{}, err
194 }
195
196 evictUnits := make(containersByEvictUnit)
197 newestGCTime := time.Now().Add(-minAge)
198 for _, container := range containers {
199
200 if container.State == runtimeapi.ContainerState_CONTAINER_RUNNING {
201 continue
202 }
203
204 createdAt := time.Unix(0, container.CreatedAt)
205 if newestGCTime.Before(createdAt) {
206 continue
207 }
208
209 labeledInfo := getContainerInfoFromLabels(container.Labels)
210 containerInfo := containerGCInfo{
211 id: container.Id,
212 name: container.Metadata.Name,
213 createTime: createdAt,
214 unknown: container.State == runtimeapi.ContainerState_CONTAINER_UNKNOWN,
215 }
216 key := evictUnit{
217 uid: labeledInfo.PodUID,
218 name: containerInfo.name,
219 }
220 evictUnits[key] = append(evictUnits[key], containerInfo)
221 }
222
223 return evictUnits, nil
224 }
225
226
227 func (cgc *containerGC) evictContainers(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
228
229 evictUnits, err := cgc.evictableContainers(ctx, gcPolicy.MinAge)
230 if err != nil {
231 return err
232 }
233
234
235 if allSourcesReady {
236 for key, unit := range evictUnits {
237 if cgc.podStateProvider.ShouldPodContentBeRemoved(key.uid) || (evictNonDeletedPods && cgc.podStateProvider.ShouldPodRuntimeBeRemoved(key.uid)) {
238 cgc.removeOldestN(ctx, unit, len(unit))
239 delete(evictUnits, key)
240 }
241 }
242 }
243
244
245 if gcPolicy.MaxPerPodContainer >= 0 {
246 cgc.enforceMaxContainersPerEvictUnit(ctx, evictUnits, gcPolicy.MaxPerPodContainer)
247 }
248
249
250 if gcPolicy.MaxContainers >= 0 && evictUnits.NumContainers() > gcPolicy.MaxContainers {
251
252 numContainersPerEvictUnit := gcPolicy.MaxContainers / evictUnits.NumEvictUnits()
253 if numContainersPerEvictUnit < 1 {
254 numContainersPerEvictUnit = 1
255 }
256 cgc.enforceMaxContainersPerEvictUnit(ctx, evictUnits, numContainersPerEvictUnit)
257
258
259 numContainers := evictUnits.NumContainers()
260 if numContainers > gcPolicy.MaxContainers {
261 flattened := make([]containerGCInfo, 0, numContainers)
262 for key := range evictUnits {
263 flattened = append(flattened, evictUnits[key]...)
264 }
265 sort.Sort(byCreated(flattened))
266
267 cgc.removeOldestN(ctx, flattened, numContainers-gcPolicy.MaxContainers)
268 }
269 }
270 return nil
271 }
272
273
274
275
276
277
278
279 func (cgc *containerGC) evictSandboxes(ctx context.Context, evictNonDeletedPods bool) error {
280 containers, err := cgc.manager.getKubeletContainers(ctx, true)
281 if err != nil {
282 return err
283 }
284
285 sandboxes, err := cgc.manager.getKubeletSandboxes(ctx, true)
286 if err != nil {
287 return err
288 }
289
290
291 sandboxIDs := sets.NewString()
292 for _, container := range containers {
293 sandboxIDs.Insert(container.PodSandboxId)
294 }
295
296 sandboxesByPod := make(sandboxesByPodUID, len(sandboxes))
297 for _, sandbox := range sandboxes {
298 podUID := types.UID(sandbox.Metadata.Uid)
299 sandboxInfo := sandboxGCInfo{
300 id: sandbox.Id,
301 createTime: time.Unix(0, sandbox.CreatedAt),
302 }
303
304
305 if sandbox.State == runtimeapi.PodSandboxState_SANDBOX_READY || sandboxIDs.Has(sandbox.Id) {
306 sandboxInfo.active = true
307 }
308
309 sandboxesByPod[podUID] = append(sandboxesByPod[podUID], sandboxInfo)
310 }
311
312 for podUID, sandboxes := range sandboxesByPod {
313 if cgc.podStateProvider.ShouldPodContentBeRemoved(podUID) || (evictNonDeletedPods && cgc.podStateProvider.ShouldPodRuntimeBeRemoved(podUID)) {
314
315
316
317 cgc.removeOldestNSandboxes(ctx, sandboxes, len(sandboxes))
318 } else {
319
320 cgc.removeOldestNSandboxes(ctx, sandboxes, len(sandboxes)-1)
321 }
322 }
323 return nil
324 }
325
326
327
328 func (cgc *containerGC) evictPodLogsDirectories(ctx context.Context, allSourcesReady bool) error {
329 osInterface := cgc.manager.osInterface
330 podLogsDirectory := cgc.manager.podLogsDirectory
331 if allSourcesReady {
332
333 dirs, err := osInterface.ReadDir(podLogsDirectory)
334 if err != nil {
335 return fmt.Errorf("failed to read podLogsDirectory %q: %w", podLogsDirectory, err)
336 }
337 for _, dir := range dirs {
338 name := dir.Name()
339 podUID := parsePodUIDFromLogsDirectory(name)
340 if !cgc.podStateProvider.ShouldPodContentBeRemoved(podUID) {
341 continue
342 }
343 klog.V(4).InfoS("Removing pod logs", "podUID", podUID)
344 err := osInterface.RemoveAll(filepath.Join(podLogsDirectory, name))
345 if err != nil {
346 klog.ErrorS(err, "Failed to remove pod logs directory", "path", name)
347 }
348 }
349 }
350
351
352
353 logSymlinks, _ := osInterface.Glob(filepath.Join(legacyContainerLogsDir, fmt.Sprintf("*.%s", legacyLogSuffix)))
354 for _, logSymlink := range logSymlinks {
355 if _, err := osInterface.Stat(logSymlink); os.IsNotExist(err) {
356 if containerID, err := getContainerIDFromLegacyLogSymlink(logSymlink); err == nil {
357 resp, err := cgc.manager.runtimeService.ContainerStatus(ctx, containerID, false)
358 if err != nil {
359
360
361 klog.InfoS("Error getting ContainerStatus for containerID", "containerID", containerID, "err", err)
362 } else {
363 status := resp.GetStatus()
364 if status == nil {
365 klog.V(4).InfoS("Container status is nil")
366 continue
367 }
368 if status.State != runtimeapi.ContainerState_CONTAINER_EXITED {
369
370
371
372
373
374
375
376
377
378
379
380 klog.V(5).InfoS("Container is still running, not removing symlink", "containerID", containerID, "path", logSymlink)
381 continue
382 }
383 }
384 } else {
385 klog.V(4).InfoS("Unable to obtain container ID", "err", err)
386 }
387 err := osInterface.Remove(logSymlink)
388 if err != nil {
389 klog.ErrorS(err, "Failed to remove container log dead symlink", "path", logSymlink)
390 } else {
391 klog.V(4).InfoS("Removed symlink", "path", logSymlink)
392 }
393 }
394 }
395 return nil
396 }
397
398
399
400
401
402
403
404
405
406
407
408 func (cgc *containerGC) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
409 ctx, otelSpan := cgc.tracer.Start(ctx, "Containers/GarbageCollect")
410 defer otelSpan.End()
411 errors := []error{}
412
413 if err := cgc.evictContainers(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods); err != nil {
414 errors = append(errors, err)
415 }
416
417
418 if err := cgc.evictSandboxes(ctx, evictNonDeletedPods); err != nil {
419 errors = append(errors, err)
420 }
421
422
423 if err := cgc.evictPodLogsDirectories(ctx, allSourcesReady); err != nil {
424 errors = append(errors, err)
425 }
426 return utilerrors.NewAggregate(errors)
427 }
428
View as plain text