1
16
17 package pleg
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "sync/atomic"
24 "time"
25
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/apimachinery/pkg/util/sets"
28 "k8s.io/apimachinery/pkg/util/wait"
29 utilfeature "k8s.io/apiserver/pkg/util/feature"
30 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
31 "k8s.io/klog/v2"
32 "k8s.io/kubernetes/pkg/features"
33 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
34 "k8s.io/kubernetes/pkg/kubelet/metrics"
35 "k8s.io/utils/clock"
36 )
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 type GenericPLEG struct {
54
55 runtime kubecontainer.Runtime
56
57 eventChannel chan *PodLifecycleEvent
58
59 podRecords podRecords
60
61 relistTime atomic.Value
62
63 cache kubecontainer.Cache
64
65 clock clock.Clock
66
67
68 podsToReinspect map[types.UID]*kubecontainer.Pod
69
70 stopCh chan struct{}
71
72 relistLock sync.Mutex
73
74 isRunning bool
75
76 runningMu sync.Mutex
77
78 relistDuration *RelistDuration
79
80 podCacheMutex sync.Mutex
81 }
82
83
84
85
86 type plegContainerState string
87
88 const (
89 plegContainerRunning plegContainerState = "running"
90 plegContainerExited plegContainerState = "exited"
91 plegContainerUnknown plegContainerState = "unknown"
92 plegContainerNonExistent plegContainerState = "non-existent"
93 )
94
95 func convertState(state kubecontainer.State) plegContainerState {
96 switch state {
97 case kubecontainer.ContainerStateCreated:
98
99 return plegContainerUnknown
100 case kubecontainer.ContainerStateRunning:
101 return plegContainerRunning
102 case kubecontainer.ContainerStateExited:
103 return plegContainerExited
104 case kubecontainer.ContainerStateUnknown:
105 return plegContainerUnknown
106 default:
107 panic(fmt.Sprintf("unrecognized container state: %v", state))
108 }
109 }
110
111 type podRecord struct {
112 old *kubecontainer.Pod
113 current *kubecontainer.Pod
114 }
115
116 type podRecords map[types.UID]*podRecord
117
118
119 func NewGenericPLEG(runtime kubecontainer.Runtime, eventChannel chan *PodLifecycleEvent,
120 relistDuration *RelistDuration, cache kubecontainer.Cache,
121 clock clock.Clock) PodLifecycleEventGenerator {
122 return &GenericPLEG{
123 relistDuration: relistDuration,
124 runtime: runtime,
125 eventChannel: eventChannel,
126 podRecords: make(podRecords),
127 cache: cache,
128 clock: clock,
129 }
130 }
131
132
133
134
135 func (g *GenericPLEG) Watch() chan *PodLifecycleEvent {
136 return g.eventChannel
137 }
138
139
140 func (g *GenericPLEG) Start() {
141 g.runningMu.Lock()
142 defer g.runningMu.Unlock()
143 if !g.isRunning {
144 g.isRunning = true
145 g.stopCh = make(chan struct{})
146 go wait.Until(g.Relist, g.relistDuration.RelistPeriod, g.stopCh)
147 }
148 }
149
150 func (g *GenericPLEG) Stop() {
151 g.runningMu.Lock()
152 defer g.runningMu.Unlock()
153 if g.isRunning {
154 close(g.stopCh)
155 g.isRunning = false
156 }
157 }
158
159 func (g *GenericPLEG) Update(relistDuration *RelistDuration) {
160 g.relistDuration = relistDuration
161 }
162
163
164
165 func (g *GenericPLEG) Healthy() (bool, error) {
166 relistTime := g.getRelistTime()
167 if relistTime.IsZero() {
168 return false, fmt.Errorf("pleg has yet to be successful")
169 }
170
171 metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
172 elapsed := g.clock.Since(relistTime)
173 if elapsed > g.relistDuration.RelistThreshold {
174 return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, g.relistDuration.RelistThreshold)
175 }
176 return true, nil
177 }
178
179 func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
180 if newState == oldState {
181 return nil
182 }
183
184 klog.V(4).InfoS("GenericPLEG", "podUID", podID, "containerID", cid, "oldState", oldState, "newState", newState)
185 switch newState {
186 case plegContainerRunning:
187 return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
188 case plegContainerExited:
189 return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
190 case plegContainerUnknown:
191 return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
192 case plegContainerNonExistent:
193 switch oldState {
194 case plegContainerExited:
195
196 return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
197 default:
198 return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
199 }
200 default:
201 panic(fmt.Sprintf("unrecognized container state: %v", newState))
202 }
203 }
204
205 func (g *GenericPLEG) getRelistTime() time.Time {
206 val := g.relistTime.Load()
207 if val == nil {
208 return time.Time{}
209 }
210 return val.(time.Time)
211 }
212
213 func (g *GenericPLEG) updateRelistTime(timestamp time.Time) {
214 g.relistTime.Store(timestamp)
215 }
216
217
218
219 func (g *GenericPLEG) Relist() {
220 g.relistLock.Lock()
221 defer g.relistLock.Unlock()
222
223 ctx := context.Background()
224 klog.V(5).InfoS("GenericPLEG: Relisting")
225
226 if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
227 metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
228 }
229
230 timestamp := g.clock.Now()
231 defer func() {
232 metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
233 }()
234
235
236 podList, err := g.runtime.GetPods(ctx, true)
237 if err != nil {
238 klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
239 return
240 }
241
242 g.updateRelistTime(timestamp)
243
244 pods := kubecontainer.Pods(podList)
245
246 updateRunningPodAndContainerMetrics(pods)
247 g.podRecords.setCurrent(pods)
248
249
250 eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
251 for pid := range g.podRecords {
252 oldPod := g.podRecords.getOld(pid)
253 pod := g.podRecords.getCurrent(pid)
254
255 allContainers := getContainersFromPods(oldPod, pod)
256 for _, container := range allContainers {
257 events := computeEvents(oldPod, pod, &container.ID)
258 for _, e := range events {
259 updateEvents(eventsByPodID, e)
260 }
261 }
262 }
263
264 var needsReinspection map[types.UID]*kubecontainer.Pod
265 if g.cacheEnabled() {
266 needsReinspection = make(map[types.UID]*kubecontainer.Pod)
267 }
268
269
270
271 for pid, events := range eventsByPodID {
272 pod := g.podRecords.getCurrent(pid)
273 if g.cacheEnabled() {
274
275
276
277
278
279
280
281
282
283 if err, updated := g.updateCache(ctx, pod, pid); err != nil {
284
285 klog.V(4).ErrorS(err, "PLEG: Ignoring events for pod", "pod", klog.KRef(pod.Namespace, pod.Name))
286
287
288 needsReinspection[pid] = pod
289
290 continue
291 } else {
292
293
294
295 delete(g.podsToReinspect, pid)
296 if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
297 if !updated {
298 continue
299 }
300 }
301 }
302 }
303
304 g.podRecords.update(pid)
305
306
307 containerExitCode := make(map[string]int)
308
309 for i := range events {
310
311 if events[i].Type == ContainerChanged {
312 continue
313 }
314 select {
315 case g.eventChannel <- events[i]:
316 default:
317 metrics.PLEGDiscardEvents.Inc()
318 klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
319 }
320
321 if events[i].Type == ContainerDied {
322
323 if len(containerExitCode) == 0 && pod != nil && g.cache != nil {
324
325 status, err := g.cache.Get(pod.ID)
326 if err == nil {
327 for _, containerStatus := range status.ContainerStatuses {
328 containerExitCode[containerStatus.ID.ID] = containerStatus.ExitCode
329 }
330 }
331 }
332 if containerID, ok := events[i].Data.(string); ok {
333 if exitCode, ok := containerExitCode[containerID]; ok && pod != nil {
334 klog.V(2).InfoS("Generic (PLEG): container finished", "podID", pod.ID, "containerID", containerID, "exitCode", exitCode)
335 }
336 }
337 }
338 }
339 }
340
341 if g.cacheEnabled() {
342
343 if len(g.podsToReinspect) > 0 {
344 klog.V(5).InfoS("GenericPLEG: Reinspecting pods that previously failed inspection")
345 for pid, pod := range g.podsToReinspect {
346 if err, _ := g.updateCache(ctx, pod, pid); err != nil {
347
348 klog.V(5).ErrorS(err, "PLEG: pod failed reinspection", "pod", klog.KRef(pod.Namespace, pod.Name))
349 needsReinspection[pid] = pod
350 }
351 }
352 }
353
354
355
356 g.cache.UpdateTime(timestamp)
357 }
358
359
360 g.podsToReinspect = needsReinspection
361 }
362
363 func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
364 cidSet := sets.NewString()
365 var containers []*kubecontainer.Container
366 fillCidSet := func(cs []*kubecontainer.Container) {
367 for _, c := range cs {
368 cid := c.ID.ID
369 if cidSet.Has(cid) {
370 continue
371 }
372 cidSet.Insert(cid)
373 containers = append(containers, c)
374 }
375 }
376
377 for _, p := range pods {
378 if p == nil {
379 continue
380 }
381 fillCidSet(p.Containers)
382
383
384 fillCidSet(p.Sandboxes)
385 }
386 return containers
387 }
388
389 func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
390 var pid types.UID
391 if oldPod != nil {
392 pid = oldPod.ID
393 } else if newPod != nil {
394 pid = newPod.ID
395 }
396 oldState := getContainerState(oldPod, cid)
397 newState := getContainerState(newPod, cid)
398 return generateEvents(pid, cid.ID, oldState, newState)
399 }
400
401 func (g *GenericPLEG) cacheEnabled() bool {
402 return g.cache != nil
403 }
404
405
406
407 func (g *GenericPLEG) getPodIPs(pid types.UID, status *kubecontainer.PodStatus) []string {
408 if len(status.IPs) != 0 {
409 return status.IPs
410 }
411
412 oldStatus, err := g.cache.Get(pid)
413 if err != nil || len(oldStatus.IPs) == 0 {
414 return nil
415 }
416
417 for _, sandboxStatus := range status.SandboxStatuses {
418
419 if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY {
420 return status.IPs
421 }
422 }
423
424
425
426 return oldStatus.IPs
427 }
428
429
430
431
432 func (g *GenericPLEG) updateCache(ctx context.Context, pod *kubecontainer.Pod, pid types.UID) (error, bool) {
433 if pod == nil {
434
435
436 klog.V(4).InfoS("PLEG: Delete status for pod", "podUID", string(pid))
437 g.cache.Delete(pid)
438 return nil, true
439 }
440
441 g.podCacheMutex.Lock()
442 defer g.podCacheMutex.Unlock()
443 timestamp := g.clock.Now()
444
445 status, err := g.runtime.GetPodStatus(ctx, pod.ID, pod.Name, pod.Namespace)
446 if err != nil {
447
448
449
450 if klog.V(6).Enabled() {
451 klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
452 } else {
453 klog.ErrorS(err, "PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
454 }
455 } else {
456 if klogV := klog.V(6); klogV.Enabled() {
457 klogV.InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name), "podStatus", status)
458 } else {
459 klog.V(4).InfoS("PLEG: Write status", "pod", klog.KRef(pod.Namespace, pod.Name))
460 }
461
462
463
464
465 status.IPs = g.getPodIPs(pid, status)
466 }
467
468
469
470
471
472
473
474
475
476
477
478 if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) && isEventedPLEGInUse() && status != nil {
479 timestamp = status.TimeStamp
480 }
481
482 return err, g.cache.Set(pod.ID, status, err, timestamp)
483 }
484
485 func (g *GenericPLEG) UpdateCache(pod *kubecontainer.Pod, pid types.UID) (error, bool) {
486 ctx := context.Background()
487 if !g.cacheEnabled() {
488 return fmt.Errorf("pod cache disabled"), false
489 }
490 if pod == nil {
491 return fmt.Errorf("pod cannot be nil"), false
492 }
493 return g.updateCache(ctx, pod, pid)
494 }
495
496 func updateEvents(eventsByPodID map[types.UID][]*PodLifecycleEvent, e *PodLifecycleEvent) {
497 if e == nil {
498 return
499 }
500 eventsByPodID[e.ID] = append(eventsByPodID[e.ID], e)
501 }
502
503 func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) plegContainerState {
504
505 state := plegContainerNonExistent
506 if pod == nil {
507 return state
508 }
509 c := pod.FindContainerByID(*cid)
510 if c != nil {
511 return convertState(c.State)
512 }
513
514 c = pod.FindSandboxByID(*cid)
515 if c != nil {
516 return convertState(c.State)
517 }
518
519 return state
520 }
521
522 func updateRunningPodAndContainerMetrics(pods []*kubecontainer.Pod) {
523 runningSandboxNum := 0
524
525 containerStateCount := make(map[string]int)
526
527 for _, pod := range pods {
528 containers := pod.Containers
529 for _, container := range containers {
530
531 containerStateCount[string(container.State)]++
532 }
533
534 sandboxes := pod.Sandboxes
535
536 for _, sandbox := range sandboxes {
537 if sandbox.State == kubecontainer.ContainerStateRunning {
538 runningSandboxNum++
539
540 break
541 }
542 }
543 }
544 for key, value := range containerStateCount {
545 metrics.RunningContainerCount.WithLabelValues(key).Set(float64(value))
546 }
547
548
549 metrics.RunningPodCount.Set(float64(runningSandboxNum))
550 }
551
552 func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
553 r, ok := pr[id]
554 if !ok {
555 return nil
556 }
557 return r.old
558 }
559
560 func (pr podRecords) getCurrent(id types.UID) *kubecontainer.Pod {
561 r, ok := pr[id]
562 if !ok {
563 return nil
564 }
565 return r.current
566 }
567
568 func (pr podRecords) setCurrent(pods []*kubecontainer.Pod) {
569 for i := range pr {
570 pr[i].current = nil
571 }
572 for _, pod := range pods {
573 if r, ok := pr[pod.ID]; ok {
574 r.current = pod
575 } else {
576 pr[pod.ID] = &podRecord{current: pod}
577 }
578 }
579 }
580
581 func (pr podRecords) update(id types.UID) {
582 r, ok := pr[id]
583 if !ok {
584 return
585 }
586 pr.updateInternal(id, r)
587 }
588
589 func (pr podRecords) updateInternal(id types.UID, r *podRecord) {
590 if r.current == nil {
591
592 delete(pr, id)
593 return
594 }
595 r.old = r.current
596 r.current = nil
597 }
598
View as plain text