1
16
17 package eviction
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups"
25 "golang.org/x/sys/unix"
26 "k8s.io/klog/v2"
27 )
28
29 const (
30
31
32 eventSize = 8
33
34
35 numFdEvents = 6
36 )
37
38 type linuxCgroupNotifier struct {
39 eventfd int
40 epfd int
41 stop chan struct{}
42 stopLock sync.Mutex
43 }
44
45 var _ CgroupNotifier = &linuxCgroupNotifier{}
46
47
48
49 func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
50
51
52
53
54 if libcontainercgroups.IsCgroup2UnifiedMode() {
55 return &disabledThresholdNotifier{}, nil
56 }
57
58 var watchfd, eventfd, epfd, controlfd int
59 var err error
60 watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY|unix.O_CLOEXEC, 0)
61 if err != nil {
62 return nil, err
63 }
64 defer unix.Close(watchfd)
65 controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY|unix.O_CLOEXEC, 0)
66 if err != nil {
67 return nil, err
68 }
69 defer unix.Close(controlfd)
70 eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC)
71 if err != nil {
72 return nil, err
73 }
74 if eventfd < 0 {
75 err = fmt.Errorf("eventfd call failed")
76 return nil, err
77 }
78 defer func() {
79
80 if err != nil {
81 unix.Close(eventfd)
82 }
83 }()
84 epfd, err = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
85 if err != nil {
86 return nil, err
87 }
88 if epfd < 0 {
89 err = fmt.Errorf("EpollCreate1 call failed")
90 return nil, err
91 }
92 defer func() {
93
94 if err != nil {
95 unix.Close(epfd)
96 }
97 }()
98 config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold)
99 _, err = unix.Write(controlfd, []byte(config))
100 if err != nil {
101 return nil, err
102 }
103 return &linuxCgroupNotifier{
104 eventfd: eventfd,
105 epfd: epfd,
106 stop: make(chan struct{}),
107 }, nil
108 }
109
110 func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) {
111 err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{
112 Fd: int32(n.eventfd),
113 Events: unix.EPOLLIN,
114 })
115 if err != nil {
116 klog.InfoS("Eviction manager: error adding epoll eventfd", "err", err)
117 return
118 }
119 buf := make([]byte, eventSize)
120 for {
121 select {
122 case <-n.stop:
123 return
124 default:
125 }
126 event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval)
127 if err != nil {
128 klog.InfoS("Eviction manager: error while waiting for memcg events", "err", err)
129 return
130 } else if !event {
131
132 continue
133 }
134
135 _, err = unix.Read(n.eventfd, buf)
136 if err != nil {
137 klog.InfoS("Eviction manager: error reading memcg events", "err", err)
138 return
139 }
140 eventCh <- struct{}{}
141 }
142 }
143
144
145
146
147 func wait(epfd, eventfd int, timeout time.Duration) (bool, error) {
148 events := make([]unix.EpollEvent, numFdEvents+1)
149 timeoutMS := int(timeout / time.Millisecond)
150 n, err := unix.EpollWait(epfd, events, timeoutMS)
151 if n == -1 {
152 if err == unix.EINTR {
153
154 return false, nil
155 }
156 return false, err
157 }
158 if n == 0 {
159
160 return false, nil
161 }
162 if n > numFdEvents {
163 return false, fmt.Errorf("epoll_wait returned more events than we know what to do with")
164 }
165 for _, event := range events[:n] {
166 if event.Fd == int32(eventfd) {
167 if event.Events&unix.EPOLLHUP != 0 || event.Events&unix.EPOLLERR != 0 || event.Events&unix.EPOLLIN != 0 {
168
169
170
171
172
173
174 return true, nil
175 }
176 }
177 }
178
179 return false, nil
180 }
181
182 func (n *linuxCgroupNotifier) Stop() {
183 n.stopLock.Lock()
184 defer n.stopLock.Unlock()
185 select {
186 case <-n.stop:
187
188 return
189 default:
190 }
191 unix.Close(n.eventfd)
192 unix.Close(n.epfd)
193 close(n.stop)
194 }
195
196
197 type disabledThresholdNotifier struct{}
198
199 func (*disabledThresholdNotifier) Start(_ chan<- struct{}) {}
200 func (*disabledThresholdNotifier) Stop() {}
201
View as plain text