...

Source file src/k8s.io/kubernetes/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go

Documentation: k8s.io/kubernetes/pkg/kubelet/nodeshutdown

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2020 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  // Package nodeshutdown can watch for node level shutdown events and trigger graceful termination of pods running on the node prior to a system shutdown.
    21  package nodeshutdown
    22  
    23  import (
    24  	"fmt"
    25  	"path/filepath"
    26  	"sort"
    27  	"sync"
    28  	"time"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/tools/record"
    33  	"k8s.io/klog/v2"
    34  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    35  	"k8s.io/kubernetes/pkg/apis/scheduling"
    36  	"k8s.io/kubernetes/pkg/features"
    37  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    38  	kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
    39  	"k8s.io/kubernetes/pkg/kubelet/eviction"
    40  	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
    41  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    42  	"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
    43  	"k8s.io/kubernetes/pkg/kubelet/prober"
    44  	"k8s.io/utils/clock"
    45  )
    46  
    47  const (
    48  	nodeShutdownReason             = "Terminated"
    49  	nodeShutdownMessage            = "Pod was terminated in response to imminent node shutdown."
    50  	nodeShutdownNotAdmittedReason  = "NodeShutdown"
    51  	nodeShutdownNotAdmittedMessage = "Pod was rejected as the node is shutting down."
    52  	dbusReconnectPeriod            = 1 * time.Second
    53  	localStorageStateFile          = "graceful_node_shutdown_state"
    54  )
    55  
    56  var systemDbus = func() (dbusInhibiter, error) {
    57  	return systemd.NewDBusCon()
    58  }
    59  
    60  type dbusInhibiter interface {
    61  	CurrentInhibitDelay() (time.Duration, error)
    62  	InhibitShutdown() (systemd.InhibitLock, error)
    63  	ReleaseInhibitLock(lock systemd.InhibitLock) error
    64  	ReloadLogindConf() error
    65  	MonitorShutdown() (<-chan bool, error)
    66  	OverrideInhibitDelay(inhibitDelayMax time.Duration) error
    67  }
    68  
    69  // managerImpl has functions that can be used to interact with the Node Shutdown Manager.
    70  type managerImpl struct {
    71  	logger       klog.Logger
    72  	recorder     record.EventRecorder
    73  	nodeRef      *v1.ObjectReference
    74  	probeManager prober.Manager
    75  
    76  	shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
    77  
    78  	getPods        eviction.ActivePodsFunc
    79  	killPodFunc    eviction.KillPodFunc
    80  	syncNodeStatus func()
    81  
    82  	dbusCon     dbusInhibiter
    83  	inhibitLock systemd.InhibitLock
    84  
    85  	nodeShuttingDownMutex sync.Mutex
    86  	nodeShuttingDownNow   bool
    87  
    88  	clock clock.Clock
    89  
    90  	enableMetrics bool
    91  	storage       storage
    92  }
    93  
    94  // NewManager returns a new node shutdown manager.
    95  func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
    96  	if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) {
    97  		m := managerStub{}
    98  		return m, m
    99  	}
   100  
   101  	shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority
   102  	// Migration from the original configuration
   103  	if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) ||
   104  		len(shutdownGracePeriodByPodPriority) == 0 {
   105  		shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods)
   106  	}
   107  
   108  	// Disable if the configuration is empty
   109  	if len(shutdownGracePeriodByPodPriority) == 0 {
   110  		m := managerStub{}
   111  		return m, m
   112  	}
   113  
   114  	// Sort by priority from low to high
   115  	sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
   116  		return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
   117  	})
   118  
   119  	if conf.Clock == nil {
   120  		conf.Clock = clock.RealClock{}
   121  	}
   122  	manager := &managerImpl{
   123  		logger:                           conf.Logger,
   124  		probeManager:                     conf.ProbeManager,
   125  		recorder:                         conf.Recorder,
   126  		nodeRef:                          conf.NodeRef,
   127  		getPods:                          conf.GetPodsFunc,
   128  		killPodFunc:                      conf.KillPodFunc,
   129  		syncNodeStatus:                   conf.SyncNodeStatusFunc,
   130  		shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
   131  		clock:                            conf.Clock,
   132  		enableMetrics:                    utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority),
   133  		storage: localStorage{
   134  			Path: filepath.Join(conf.StateDirectory, localStorageStateFile),
   135  		},
   136  	}
   137  	manager.logger.Info("Creating node shutdown manager",
   138  		"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
   139  		"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
   140  		"shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
   141  	)
   142  	return manager, manager
   143  }
   144  
   145  // Admit rejects all pods if node is shutting
   146  func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
   147  	nodeShuttingDown := m.ShutdownStatus() != nil
   148  
   149  	if nodeShuttingDown {
   150  		return lifecycle.PodAdmitResult{
   151  			Admit:   false,
   152  			Reason:  nodeShutdownNotAdmittedReason,
   153  			Message: nodeShutdownNotAdmittedMessage,
   154  		}
   155  	}
   156  	return lifecycle.PodAdmitResult{Admit: true}
   157  }
   158  
   159  // setMetrics sets the metrics for the node shutdown manager.
   160  func (m *managerImpl) setMetrics() {
   161  	if m.enableMetrics && m.storage != nil {
   162  		sta := state{}
   163  		err := m.storage.Load(&sta)
   164  		if err != nil {
   165  			m.logger.Error(err, "Failed to load graceful shutdown state")
   166  		} else {
   167  			if !sta.StartTime.IsZero() {
   168  				metrics.GracefulShutdownStartTime.Set(timestamp(sta.StartTime))
   169  			}
   170  			if !sta.EndTime.IsZero() {
   171  				metrics.GracefulShutdownEndTime.Set(timestamp(sta.EndTime))
   172  			}
   173  		}
   174  	}
   175  }
   176  
   177  // Start starts the node shutdown manager and will start watching the node for shutdown events.
   178  func (m *managerImpl) Start() error {
   179  	stop, err := m.start()
   180  	if err != nil {
   181  		return err
   182  	}
   183  	go func() {
   184  		for {
   185  			if stop != nil {
   186  				<-stop
   187  			}
   188  
   189  			time.Sleep(dbusReconnectPeriod)
   190  			m.logger.V(1).Info("Restarting watch for node shutdown events")
   191  			stop, err = m.start()
   192  			if err != nil {
   193  				m.logger.Error(err, "Unable to watch the node for shutdown events")
   194  			}
   195  		}
   196  	}()
   197  
   198  	m.setMetrics()
   199  	return nil
   200  }
   201  
   202  func (m *managerImpl) start() (chan struct{}, error) {
   203  	systemBus, err := systemDbus()
   204  	if err != nil {
   205  		return nil, err
   206  	}
   207  	m.dbusCon = systemBus
   208  
   209  	currentInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
   210  	if err != nil {
   211  		return nil, err
   212  	}
   213  
   214  	// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than periodRequested, attempt to update the value to periodRequested.
   215  	if periodRequested := m.periodRequested(); periodRequested > currentInhibitDelay {
   216  		err := m.dbusCon.OverrideInhibitDelay(periodRequested)
   217  		if err != nil {
   218  			return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err)
   219  		}
   220  
   221  		err = m.dbusCon.ReloadLogindConf()
   222  		if err != nil {
   223  			return nil, err
   224  		}
   225  
   226  		// Read the current inhibitDelay again, if the override was successful, currentInhibitDelay will be equal to shutdownGracePeriodRequested.
   227  		updatedInhibitDelay, err := m.dbusCon.CurrentInhibitDelay()
   228  		if err != nil {
   229  			return nil, err
   230  		}
   231  
   232  		if periodRequested > updatedInhibitDelay {
   233  			return nil, fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", periodRequested, updatedInhibitDelay)
   234  		}
   235  	}
   236  
   237  	err = m.aquireInhibitLock()
   238  	if err != nil {
   239  		return nil, err
   240  	}
   241  
   242  	events, err := m.dbusCon.MonitorShutdown()
   243  	if err != nil {
   244  		releaseErr := m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
   245  		if releaseErr != nil {
   246  			return nil, fmt.Errorf("failed releasing inhibitLock: %v and failed monitoring shutdown: %v", releaseErr, err)
   247  		}
   248  		return nil, fmt.Errorf("failed to monitor shutdown: %v", err)
   249  	}
   250  
   251  	stop := make(chan struct{})
   252  	go func() {
   253  		// Monitor for shutdown events. This follows the logind Inhibit Delay pattern described on https://www.freedesktop.org/wiki/Software/systemd/inhibit/
   254  		// 1. When shutdown manager starts, an inhibit lock is taken.
   255  		// 2. When shutdown(true) event is received, process the shutdown and release the inhibit lock.
   256  		// 3. When shutdown(false) event is received, this indicates a previous shutdown was cancelled. In this case, acquire the inhibit lock again.
   257  		for {
   258  			select {
   259  			case isShuttingDown, ok := <-events:
   260  				if !ok {
   261  					m.logger.Error(err, "Ended to watching the node for shutdown events")
   262  					close(stop)
   263  					return
   264  				}
   265  				m.logger.V(1).Info("Shutdown manager detected new shutdown event, isNodeShuttingDownNow", "event", isShuttingDown)
   266  
   267  				var shutdownType string
   268  				if isShuttingDown {
   269  					shutdownType = "shutdown"
   270  				} else {
   271  					shutdownType = "cancelled"
   272  				}
   273  				m.logger.V(1).Info("Shutdown manager detected new shutdown event", "event", shutdownType)
   274  				if isShuttingDown {
   275  					m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown event")
   276  				} else {
   277  					m.recorder.Event(m.nodeRef, v1.EventTypeNormal, kubeletevents.NodeShutdown, "Shutdown manager detected shutdown cancellation")
   278  				}
   279  
   280  				m.nodeShuttingDownMutex.Lock()
   281  				m.nodeShuttingDownNow = isShuttingDown
   282  				m.nodeShuttingDownMutex.Unlock()
   283  
   284  				if isShuttingDown {
   285  					// Update node status and ready condition
   286  					go m.syncNodeStatus()
   287  
   288  					m.processShutdownEvent()
   289  				} else {
   290  					m.aquireInhibitLock()
   291  				}
   292  			}
   293  		}
   294  	}()
   295  	return stop, nil
   296  }
   297  
   298  func (m *managerImpl) aquireInhibitLock() error {
   299  	lock, err := m.dbusCon.InhibitShutdown()
   300  	if err != nil {
   301  		return err
   302  	}
   303  	if m.inhibitLock != 0 {
   304  		m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
   305  	}
   306  	m.inhibitLock = lock
   307  	return nil
   308  }
   309  
   310  // ShutdownStatus will return an error if the node is currently shutting down.
   311  func (m *managerImpl) ShutdownStatus() error {
   312  	m.nodeShuttingDownMutex.Lock()
   313  	defer m.nodeShuttingDownMutex.Unlock()
   314  
   315  	if m.nodeShuttingDownNow {
   316  		return fmt.Errorf("node is shutting down")
   317  	}
   318  	return nil
   319  }
   320  
   321  func (m *managerImpl) processShutdownEvent() error {
   322  	m.logger.V(1).Info("Shutdown manager processing shutdown event")
   323  	activePods := m.getPods()
   324  
   325  	defer func() {
   326  		m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
   327  		m.logger.V(1).Info("Shutdown manager completed processing shutdown event, node will shutdown shortly")
   328  	}()
   329  
   330  	if m.enableMetrics && m.storage != nil {
   331  		startTime := time.Now()
   332  		err := m.storage.Store(state{
   333  			StartTime: startTime,
   334  		})
   335  		if err != nil {
   336  			m.logger.Error(err, "Failed to store graceful shutdown state")
   337  		}
   338  		metrics.GracefulShutdownStartTime.Set(timestamp(startTime))
   339  		metrics.GracefulShutdownEndTime.Set(0)
   340  
   341  		defer func() {
   342  			endTime := time.Now()
   343  			err := m.storage.Store(state{
   344  				StartTime: startTime,
   345  				EndTime:   endTime,
   346  			})
   347  			if err != nil {
   348  				m.logger.Error(err, "Failed to store graceful shutdown state")
   349  			}
   350  			metrics.GracefulShutdownStartTime.Set(timestamp(endTime))
   351  		}()
   352  	}
   353  
   354  	groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
   355  	for _, group := range groups {
   356  		// If there are no pods in a particular range,
   357  		// then do not wait for pods in that priority range.
   358  		if len(group.Pods) == 0 {
   359  			continue
   360  		}
   361  
   362  		var wg sync.WaitGroup
   363  		wg.Add(len(group.Pods))
   364  		for _, pod := range group.Pods {
   365  			go func(pod *v1.Pod, group podShutdownGroup) {
   366  				defer wg.Done()
   367  
   368  				gracePeriodOverride := group.ShutdownGracePeriodSeconds
   369  
   370  				// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
   371  				if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
   372  					gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
   373  				}
   374  
   375  				m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
   376  
   377  				if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
   378  					// set the pod status to failed (unless it was already in a successful terminal phase)
   379  					if status.Phase != v1.PodSucceeded {
   380  						status.Phase = v1.PodFailed
   381  					}
   382  					status.Message = nodeShutdownMessage
   383  					status.Reason = nodeShutdownReason
   384  					if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
   385  						podutil.UpdatePodCondition(status, &v1.PodCondition{
   386  							Type:    v1.DisruptionTarget,
   387  							Status:  v1.ConditionTrue,
   388  							Reason:  v1.PodReasonTerminationByKubelet,
   389  							Message: nodeShutdownMessage,
   390  						})
   391  					}
   392  				}); err != nil {
   393  					m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
   394  				} else {
   395  					m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
   396  				}
   397  			}(pod, group)
   398  		}
   399  
   400  		var (
   401  			doneCh = make(chan struct{})
   402  			timer  = m.clock.NewTimer(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second)
   403  		)
   404  		go func() {
   405  			defer close(doneCh)
   406  			wg.Wait()
   407  		}()
   408  
   409  		select {
   410  		case <-doneCh:
   411  			timer.Stop()
   412  		case <-timer.C():
   413  			m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
   414  		}
   415  	}
   416  
   417  	return nil
   418  }
   419  
   420  func (m *managerImpl) periodRequested() time.Duration {
   421  	var sum int64
   422  	for _, period := range m.shutdownGracePeriodByPodPriority {
   423  		sum += period.ShutdownGracePeriodSeconds
   424  	}
   425  	return time.Duration(sum) * time.Second
   426  }
   427  
   428  func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority {
   429  	if shutdownGracePeriodRequested == 0 {
   430  		return nil
   431  	}
   432  	defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods
   433  	if defaultPriority < 0 {
   434  		return nil
   435  	}
   436  	criticalPriority := shutdownGracePeriodRequested - defaultPriority
   437  	if criticalPriority < 0 {
   438  		return nil
   439  	}
   440  	return []kubeletconfig.ShutdownGracePeriodByPodPriority{
   441  		{
   442  			Priority:                   scheduling.DefaultPriorityWhenNoDefaultClassExists,
   443  			ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second),
   444  		},
   445  		{
   446  			Priority:                   scheduling.SystemCriticalPriority,
   447  			ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second),
   448  		},
   449  	}
   450  }
   451  
   452  func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup {
   453  	groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority))
   454  	for _, period := range shutdownGracePeriodByPodPriority {
   455  		groups = append(groups, podShutdownGroup{
   456  			ShutdownGracePeriodByPodPriority: period,
   457  		})
   458  	}
   459  
   460  	for _, pod := range pods {
   461  		var priority int32
   462  		if pod.Spec.Priority != nil {
   463  			priority = *pod.Spec.Priority
   464  		}
   465  
   466  		// Find the group index according to the priority.
   467  		index := sort.Search(len(groups), func(i int) bool {
   468  			return groups[i].Priority >= priority
   469  		})
   470  
   471  		// 1. Those higher than the highest priority default to the highest priority
   472  		// 2. Those lower than the lowest priority default to the lowest priority
   473  		// 3. Those boundary priority default to the lower priority
   474  		// if priority of pod is:
   475  		//   groups[index-1].Priority <= pod priority < groups[index].Priority
   476  		// in which case we want to pick lower one (i.e index-1)
   477  		if index == len(groups) {
   478  			index = len(groups) - 1
   479  		} else if index < 0 {
   480  			index = 0
   481  		} else if index > 0 && groups[index].Priority > priority {
   482  			index--
   483  		}
   484  
   485  		groups[index].Pods = append(groups[index].Pods, pod)
   486  	}
   487  	return groups
   488  }
   489  
   490  type podShutdownGroup struct {
   491  	kubeletconfig.ShutdownGracePeriodByPodPriority
   492  	Pods []*v1.Pod
   493  }
   494  

View as plain text