1
2
3
4
19
20
21 package nodeshutdown
22
23 import (
24 "fmt"
25 "path/filepath"
26 "sort"
27 "sync"
28 "time"
29
30 v1 "k8s.io/api/core/v1"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/tools/record"
33 "k8s.io/klog/v2"
34 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
35 "k8s.io/kubernetes/pkg/apis/scheduling"
36 "k8s.io/kubernetes/pkg/features"
37 kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
38 kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
39 "k8s.io/kubernetes/pkg/kubelet/eviction"
40 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
41 "k8s.io/kubernetes/pkg/kubelet/metrics"
42 "k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
43 "k8s.io/kubernetes/pkg/kubelet/prober"
44 "k8s.io/utils/clock"
45 )
46
47 const (
48 nodeShutdownReason = "Terminated"
49 nodeShutdownMessage = "Pod was terminated in response to imminent node shutdown."
50 nodeShutdownNotAdmittedReason = "NodeShutdown"
51 nodeShutdownNotAdmittedMessage = "Pod was rejected as the node is shutting down."
52 dbusReconnectPeriod = 1 * time.Second
53 localStorageStateFile = "graceful_node_shutdown_state"
54 )
55
56 var systemDbus = func() (dbusInhibiter, error) {
57 return systemd.NewDBusCon()
58 }
59
60 type dbusInhibiter interface {
61 CurrentInhibitDelay() (time.Duration, error)
62 InhibitShutdown() (systemd.InhibitLock, error)
63 ReleaseInhibitLock(lock systemd.InhibitLock) error
64 ReloadLogindConf() error
65 MonitorShutdown() (<-chan bool, error)
66 OverrideInhibitDelay(inhibitDelayMax time.Duration) error
67 }
68
69
70 type managerImpl struct {
71 logger klog.Logger
72 recorder record.EventRecorder
73 nodeRef *v1.ObjectReference
74 probeManager prober.Manager
75
76 shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
77
78 getPods eviction.ActivePodsFunc
79 killPodFunc eviction.KillPodFunc
80 syncNodeStatus func()
81
82 dbusCon dbusInhibiter
83 inhibitLock systemd.InhibitLock
84
85 nodeShuttingDownMutex sync.Mutex
86 nodeShuttingDownNow bool
87
88 clock clock.Clock
89
90 enableMetrics bool
91 storage storage
92 }
93
94
95 func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
96 if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) {
97 m := managerStub{}
98 return m, m
99 }
100
101 shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority
102
103 if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) ||
104 len(shutdownGracePeriodByPodPriority) == 0 {
105 shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods)
106 }
107
108
109 if len(shutdownGracePeriodByPodPriority) == 0 {
110 m := managerStub{}
111 return m, m
112 }
113
114
115 sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
116 return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
117 })
118
119 if conf.Clock == nil {
120 conf.Clock = clock.RealClock{}
121 }
122 manager := &managerImpl{
123 logger: conf.Logger,
124 probeManager: conf.ProbeManager,
125 recorder: conf.Recorder,
126 nodeRef: conf.NodeRef,
127 getPods: conf.GetPodsFunc,
128 killPodFunc: conf.KillPodFunc,
129 syncNodeStatus: conf.SyncNodeStatusFunc,
130 shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
131 clock: conf.Clock,
132 enableMetrics: utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority),
133 storage: localStorage{
134 Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
135 },
136 }
137 manager.logger.Info("Creating node shutdown manager",
138 "shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
139 "shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
140 "shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
141 )
142 return manager, manager
143 }
144
145
146 func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
147 nodeShuttingDown := m.ShutdownStatus() != nil
148
149 if nodeShuttingDown {
150 return lifecycle.PodAdmitResult{
151 Admit: false,
152 Reason: nodeShutdownNotAdmittedReason,
153 Message: nodeShutdownNotAdmittedMessage,
154 }
155 }
156 return lifecycle.PodAdmitResult{Admit: true}
157 }
158
159
160 func (m *managerImpl) setMetrics() {
161 if m.enableMetrics && m.storage != nil {
162 sta := state{}
163 err := m.storage.Load(&sta)
164 if err != nil {
165 m.logger.Error(err, "Failed to load graceful shutdown state")
166 } else {
167 if !sta.StartTime.IsZero() {
168 metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
169 }
170 if !sta.EndTime.IsZero() {
171 metrics.GracefulShutdownEndTime.Set(timestamp(sta.EndTime))
172 }
173 }
174 }
175 }
176
177
178 func (m *managerImpl) Start() error {
179 stop, err := m.start()
180 if err != nil {
181 return err
182 }
183 go func() {
184 for {
185 if stop != nil {
186 <-stop
187 }
188
189 time.Sleep(dbusReconnectPeriod)
190 m.logger.V(1).Info("Restarting watch for node shutdown events")
191 stop, err = m.start()
192 if err != nil {
193 m.logger.Error(err, "Unable to watch the node for shutdown events")
194 }
195 }
196 }()
197
198 m.setMetrics()
199 return nil
200 }
201
202 func (m *managerImpl) start() (chan struct{}, error) {
203 systemBus, err := systemDbus()
204 if err != nil {
205 return nil, err
206 }
207 m.dbusCon = systemBus
208
209 currentInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
210 if err != nil {
211 return nil, err
212 }
213
214
215 if periodRequested := m.periodRequested(); periodRequested > currentInhibitDelay {
216 err := m.dbusCon.OverrideInhibitDelay(periodRequested)
217 if err != nil {
218 return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err)
219 }
220
221 err = m.dbusCon.ReloadLogindConf()
222 if err != nil {
223 return nil, err
224 }
225
226
227 updatedInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
228 if err != nil {
229 return nil, err
230 }
231
232 if periodRequested > updatedInhibitDelay {
233 return nil, fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", periodRequested, updatedInhibitDelay)
234 }
235 }
236
237 err = m.aquireInhibitLock()
238 if err != nil {
239 return nil, err
240 }
241
242 events, err := m.dbusCon.MonitorShutdown()
243 if err != nil {
244 releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
245 if releaseErr != nil {
246 return nil, fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %v", releaseErr, err)
247 }
248 return nil, fmt.Errorf("failed to monitor shutdown: %v", err)
249 }
250
251 stop := make(chan struct{})
252 go func() {
253
254
255
256
257 for {
258 select {
259 case isShuttingDown, ok := <-events:
260 if !ok {
261 m.logger.Error(err, "Ended to watching the node for shutdown events")
262 close(stop)
263 return
264 }
265 m.logger.V(1).Info("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
266
267 var shutdownType string
268 if isShuttingDown {
269 shutdownType = "shutdown"
270 } else {
271 shutdownType = "cancelled"
272 }
273 m.logger.V(1).Info("Shutdown manager detected new shutdown event", "event", shutdownType)
274 if isShuttingDown {
275 m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown event")
276 } else {
277 m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown cancellation")
278 }
279
280 m.nodeShuttingDownMutex.Lock()
281 m.nodeShuttingDownNow = isShuttingDown
282 m.nodeShuttingDownMutex.Unlock()
283
284 if isShuttingDown {
285
286 go m.syncNodeStatus()
287
288 m.processShutdownEvent()
289 } else {
290 m.aquireInhibitLock()
291 }
292 }
293 }
294 }()
295 return stop, nil
296 }
297
298 func (m *managerImpl) aquireInhibitLock() error {
299 lock, err := m.dbusCon.InhibitShutdown()
300 if err != nil {
301 return err
302 }
303 if m.inhibitLock != 0 {
304 m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
305 }
306 m.inhibitLock = lock
307 return nil
308 }
309
310
311 func (m *managerImpl) ShutdownStatus() error {
312 m.nodeShuttingDownMutex.Lock()
313 defer m.nodeShuttingDownMutex.Unlock()
314
315 if m.nodeShuttingDownNow {
316 return fmt.Errorf("node is shutting down")
317 }
318 return nil
319 }
320
321 func (m *managerImpl) processShutdownEvent() error {
322 m.logger.V(1).Info("Shutdown manager processing shutdown event")
323 activePods := m.getPods()
324
325 defer func() {
326 m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
327 m.logger.V(1).Info("Shutdown manager completed processing shutdown event, node will shutdown shortly")
328 }()
329
330 if m.enableMetrics && m.storage != nil {
331 startTime := time.Now()
332 err := m.storage.Store(state{
333 StartTime: startTime,
334 })
335 if err != nil {
336 m.logger.Error(err, "Failed to store graceful shutdown state")
337 }
338 metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
339 metrics.GracefulShutdownEndTime.Set(0)
340
341 defer func() {
342 endTime := time.Now()
343 err := m.storage.Store(state{
344 StartTime: startTime,
345 EndTime: endTime,
346 })
347 if err != nil {
348 m.logger.Error(err, "Failed to store graceful shutdown state")
349 }
350 metrics.GracefulShutdownStartTime.Set(timestamp(endTime))
351 }()
352 }
353
354 groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
355 for _, group := range groups {
356
357
358 if len(group.Pods) == 0 {
359 continue
360 }
361
362 var wg sync.WaitGroup
363 wg.Add(len(group.Pods))
364 for _, pod := range group.Pods {
365 go func(pod *v1.Pod, group podShutdownGroup) {
366 defer wg.Done()
367
368 gracePeriodOverride := group.ShutdownGracePeriodSeconds
369
370
371 if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
372 gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
373 }
374
375 m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
376
377 if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
378
379 if status.Phase != v1.PodSucceeded {
380 status.Phase = v1.PodFailed
381 }
382 status.Message = nodeShutdownMessage
383 status.Reason = nodeShutdownReason
384 if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
385 podutil.UpdatePodCondition(status, &v1.PodCondition{
386 Type: v1.DisruptionTarget,
387 Status: v1.ConditionTrue,
388 Reason: v1.PodReasonTerminationByKubelet,
389 Message: nodeShutdownMessage,
390 })
391 }
392 }); err != nil {
393 m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
394 } else {
395 m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
396 }
397 }(pod, group)
398 }
399
400 var (
401 doneCh = make(chan struct{})
402 timer = m.clock.NewTimer(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second)
403 )
404 go func() {
405 defer close(doneCh)
406 wg.Wait()
407 }()
408
409 select {
410 case <-doneCh:
411 timer.Stop()
412 case <-timer.C():
413 m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
414 }
415 }
416
417 return nil
418 }
419
420 func (m *managerImpl) periodRequested() time.Duration {
421 var sum int64
422 for _, period := range m.shutdownGracePeriodByPodPriority {
423 sum += period.ShutdownGracePeriodSeconds
424 }
425 return time.Duration(sum) * time.Second
426 }
427
428 func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority {
429 if shutdownGracePeriodRequested == 0 {
430 return nil
431 }
432 defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods
433 if defaultPriority < 0 {
434 return nil
435 }
436 criticalPriority := shutdownGracePeriodRequested - defaultPriority
437 if criticalPriority < 0 {
438 return nil
439 }
440 return []kubeletconfig.ShutdownGracePeriodByPodPriority{
441 {
442 Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
443 ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second),
444 },
445 {
446 Priority: scheduling.SystemCriticalPriority,
447 ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second),
448 },
449 }
450 }
451
452 func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup {
453 groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority))
454 for _, period := range shutdownGracePeriodByPodPriority {
455 groups = append(groups, podShutdownGroup{
456 ShutdownGracePeriodByPodPriority: period,
457 })
458 }
459
460 for _, pod := range pods {
461 var priority int32
462 if pod.Spec.Priority != nil {
463 priority = *pod.Spec.Priority
464 }
465
466
467 index := sort.Search(len(groups), func(i int) bool {
468 return groups[i].Priority >= priority
469 })
470
471
472
473
474
475
476
477 if index == len(groups) {
478 index = len(groups) - 1
479 } else if index < 0 {
480 index = 0
481 } else if index > 0 && groups[index].Priority > priority {
482 index--
483 }
484
485 groups[index].Pods = append(groups[index].Pods, pod)
486 }
487 return groups
488 }
489
490 type podShutdownGroup struct {
491 kubeletconfig.ShutdownGracePeriodByPodPriority
492 Pods []*v1.Pod
493 }
494
View as plain text