...
1
2
3
4
19
20 package oom
21
22 import (
23 "fmt"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/util/runtime"
27 "k8s.io/client-go/tools/record"
28 "k8s.io/klog/v2"
29
30 "github.com/google/cadvisor/utils/oomparser"
31 )
32
33 type streamer interface {
34 StreamOoms(chan<- *oomparser.OomInstance)
35 }
36
37 var _ streamer = &oomparser.OomParser{}
38
39 type realWatcher struct {
40 recorder record.EventRecorder
41 oomStreamer streamer
42 }
43
44 var _ Watcher = &realWatcher{}
45
46
47
48 func NewWatcher(recorder record.EventRecorder) (Watcher, error) {
49
50 _, ok := recorder.(*record.FakeRecorder)
51 if ok {
52 return nil, nil
53 }
54
55 oomStreamer, err := oomparser.New()
56 if err != nil {
57 return nil, err
58 }
59
60 watcher := &realWatcher{
61 recorder: recorder,
62 oomStreamer: oomStreamer,
63 }
64
65 return watcher, nil
66 }
67
68 const (
69 systemOOMEvent = "SystemOOM"
70 recordEventContainerName = "/"
71 )
72
73
74 func (ow *realWatcher) Start(ref *v1.ObjectReference) error {
75 outStream := make(chan *oomparser.OomInstance, 10)
76 go ow.oomStreamer.StreamOoms(outStream)
77
78 go func() {
79 defer runtime.HandleCrash()
80
81 for event := range outStream {
82 if event.VictimContainerName == recordEventContainerName {
83 klog.V(1).InfoS("Got sys oom event", "event", event)
84 eventMsg := "System OOM encountered"
85 if event.ProcessName != "" && event.Pid != 0 {
86 eventMsg = fmt.Sprintf("%s, victim process: %s, pid: %d", eventMsg, event.ProcessName, event.Pid)
87 }
88 ow.recorder.Eventf(ref, v1.EventTypeWarning, systemOOMEvent, eventMsg)
89 }
90 }
91 klog.ErrorS(nil, "Unexpectedly stopped receiving OOM notifications")
92 }()
93 return nil
94 }
95
View as plain text