...

Source file src/k8s.io/kubernetes/pkg/kubelet/pod_workers.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/apimachinery/pkg/util/runtime"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/client-go/tools/record"
    31  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    32  	"k8s.io/klog/v2"
    33  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    34  	"k8s.io/kubernetes/pkg/kubelet/events"
    35  	"k8s.io/kubernetes/pkg/kubelet/eviction"
    36  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    37  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    38  	"k8s.io/kubernetes/pkg/kubelet/util/queue"
    39  	"k8s.io/utils/clock"
    40  )
    41  
    42  // OnCompleteFunc is a function that is invoked when an operation completes.
    43  // If err is non-nil, the operation did not complete successfully.
    44  type OnCompleteFunc func(err error)
    45  
    46  // PodStatusFunc is a function that is invoked to override the pod status when a pod is killed.
    47  type PodStatusFunc func(podStatus *v1.PodStatus)
    48  
    49  // KillPodOptions are options when performing a pod update whose update type is kill.
    50  type KillPodOptions struct {
    51  	// CompletedCh is closed when the kill request completes (syncTerminatingPod has completed
    52  	// without error) or if the pod does not exist, or if the pod has already terminated. This
    53  	// could take an arbitrary amount of time to be closed, but is never left open once
    54  	// CouldHaveRunningContainers() returns false.
    55  	CompletedCh chan<- struct{}
    56  	// Evict is true if this is a pod triggered eviction - once a pod is evicted some resources are
    57  	// more aggressively reaped than during normal pod operation (stopped containers).
    58  	Evict bool
    59  	// PodStatusFunc is invoked (if set) and overrides the status of the pod at the time the pod is killed.
    60  	// The provided status is populated from the latest state.
    61  	PodStatusFunc PodStatusFunc
    62  	// PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
    63  	PodTerminationGracePeriodSecondsOverride *int64
    64  }
    65  
    66  // UpdatePodOptions is an options struct to pass to a UpdatePod operation.
    67  type UpdatePodOptions struct {
    68  	// The type of update (create, update, sync, kill).
    69  	UpdateType kubetypes.SyncPodType
    70  	// StartTime is an optional timestamp for when this update was created. If set,
    71  	// when this update is fully realized by the pod worker it will be recorded in
    72  	// the PodWorkerDuration metric.
    73  	StartTime time.Time
    74  	// Pod to update. Required.
    75  	Pod *v1.Pod
    76  	// MirrorPod is the mirror pod if Pod is a static pod. Optional when UpdateType
    77  	// is kill or terminated.
    78  	MirrorPod *v1.Pod
    79  	// RunningPod is a runtime pod that is no longer present in config. Required
    80  	// if Pod is nil, ignored if Pod is set.
    81  	RunningPod *kubecontainer.Pod
    82  	// KillPodOptions is used to override the default termination behavior of the
    83  	// pod or to update the pod status after an operation is completed. Since a
    84  	// pod can be killed for multiple reasons, PodStatusFunc is invoked in order
    85  	// and later kills have an opportunity to override the status (i.e. a preemption
    86  	// may be later turned into an eviction).
    87  	KillPodOptions *KillPodOptions
    88  }
    89  
    90  // PodWorkType classifies the status of pod as seen by the pod worker - setup (sync),
    91  // teardown of containers (terminating), or cleanup (terminated).
    92  type PodWorkerState int
    93  
    94  const (
    95  	// SyncPod is when the pod is expected to be started and running.
    96  	SyncPod PodWorkerState = iota
    97  	// TerminatingPod is when the pod is no longer being set up, but some
    98  	// containers may be running and are being torn down.
    99  	TerminatingPod
   100  	// TerminatedPod indicates the pod is stopped, can have no more running
   101  	// containers, and any foreground cleanup can be executed.
   102  	TerminatedPod
   103  )
   104  
   105  func (state PodWorkerState) String() string {
   106  	switch state {
   107  	case SyncPod:
   108  		return "sync"
   109  	case TerminatingPod:
   110  		return "terminating"
   111  	case TerminatedPod:
   112  		return "terminated"
   113  	default:
   114  		panic(fmt.Sprintf("the state %d is not defined", state))
   115  	}
   116  }
   117  
   118  // PodWorkerSync is the summarization of a single pod worker for sync. Values
   119  // besides state are used to provide metric counts for operators.
   120  type PodWorkerSync struct {
   121  	// State of the pod.
   122  	State PodWorkerState
   123  	// Orphan is true if the pod is no longer in the desired set passed to SyncKnownPods.
   124  	Orphan bool
   125  	// HasConfig is true if we have a historical pod spec for this pod.
   126  	HasConfig bool
   127  	// Static is true if we have config and the pod came from a static source.
   128  	Static bool
   129  }
   130  
   131  // podWork is the internal changes
   132  type podWork struct {
   133  	// WorkType is the type of sync to perform - sync (create), terminating (stop
   134  	// containers), terminated (clean up and write status).
   135  	WorkType PodWorkerState
   136  
   137  	// Options contains the data to sync.
   138  	Options UpdatePodOptions
   139  }
   140  
   141  // PodWorkers is an abstract interface for testability.
   142  type PodWorkers interface {
   143  	// UpdatePod notifies the pod worker of a change to a pod, which will then
   144  	// be processed in FIFO order by a goroutine per pod UID. The state of the
   145  	// pod will be passed to the syncPod method until either the pod is marked
   146  	// as deleted, it reaches a terminal phase (Succeeded/Failed), or the pod
   147  	// is evicted by the kubelet. Once that occurs the syncTerminatingPod method
   148  	// will be called until it exits successfully, and after that all further
   149  	// UpdatePod() calls will be ignored for that pod until it has been forgotten
   150  	// due to significant time passing. A pod that is terminated will never be
   151  	// restarted.
   152  	UpdatePod(options UpdatePodOptions)
   153  	// SyncKnownPods removes workers for pods that are not in the desiredPods set
   154  	// and have been terminated for a significant period of time. Once this method
   155  	// has been called once, the workers are assumed to be fully initialized and
   156  	// subsequent calls to ShouldPodContentBeRemoved on unknown pods will return
   157  	// true. It returns a map describing the state of each known pod worker. It
   158  	// is the responsibility of the caller to re-add any desired pods that are not
   159  	// returned as knownPods.
   160  	SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync)
   161  
   162  	// IsPodKnownTerminated returns true once SyncTerminatingPod completes
   163  	// successfully - the provided pod UID it is known by the pod
   164  	// worker to be terminated. If the pod has been force deleted and the pod worker
   165  	// has completed termination this method will return false, so this method should
   166  	// only be used to filter out pods from the desired set such as in admission.
   167  	//
   168  	// Intended for use by the kubelet config loops, but not subsystems, which should
   169  	// use ShouldPod*().
   170  	IsPodKnownTerminated(uid types.UID) bool
   171  	// CouldHaveRunningContainers returns true before the pod workers have synced,
   172  	// once the pod workers see the pod (syncPod could be called), and returns false
   173  	// after the pod has been terminated (running containers guaranteed stopped).
   174  	//
   175  	// Intended for use by the kubelet config loops, but not subsystems, which should
   176  	// use ShouldPod*().
   177  	CouldHaveRunningContainers(uid types.UID) bool
   178  
   179  	// ShouldPodBeFinished returns true once SyncTerminatedPod completes
   180  	// successfully - the provided pod UID it is known to the pod worker to
   181  	// be terminated and have resources reclaimed. It returns false before the
   182  	// pod workers have synced (syncPod could be called). Once the pod workers
   183  	// have synced it returns false if the pod has a sync status until
   184  	// SyncTerminatedPod completes successfully. If the pod workers have synced,
   185  	// but the pod does not have a status it returns true.
   186  	//
   187  	// Intended for use by subsystem sync loops to avoid performing background setup
   188  	// after termination has been requested for a pod. Callers must ensure that the
   189  	// syncPod method is non-blocking when their data is absent.
   190  	ShouldPodBeFinished(uid types.UID) bool
   191  	// IsPodTerminationRequested returns true when pod termination has been requested
   192  	// until the termination completes and the pod is removed from config. This should
   193  	// not be used in cleanup loops because it will return false if the pod has already
   194  	// been cleaned up - use ShouldPodContainersBeTerminating instead. Also, this method
   195  	// may return true while containers are still being initialized by the pod worker.
   196  	//
   197  	// Intended for use by the kubelet sync* methods, but not subsystems, which should
   198  	// use ShouldPod*().
   199  	IsPodTerminationRequested(uid types.UID) bool
   200  
   201  	// ShouldPodContainersBeTerminating returns false before pod workers have synced,
   202  	// or once a pod has started terminating. This check is similar to
   203  	// ShouldPodRuntimeBeRemoved but is also true after pod termination is requested.
   204  	//
   205  	// Intended for use by subsystem sync loops to avoid performing background setup
   206  	// after termination has been requested for a pod. Callers must ensure that the
   207  	// syncPod method is non-blocking when their data is absent.
   208  	ShouldPodContainersBeTerminating(uid types.UID) bool
   209  	// ShouldPodRuntimeBeRemoved returns true if runtime managers within the Kubelet
   210  	// should aggressively cleanup pod resources that are not containers or on disk
   211  	// content, like attached volumes. This is true when a pod is not yet observed
   212  	// by a worker after the first sync (meaning it can't be running yet) or after
   213  	// all running containers are stopped.
   214  	// TODO: Once pod logs are separated from running containers, this method should
   215  	// be used to gate whether containers are kept.
   216  	//
   217  	// Intended for use by subsystem sync loops to know when to start tearing down
   218  	// resources that are used by running containers. Callers should ensure that
   219  	// runtime content they own is not required for post-termination - for instance
   220  	// containers are required in docker to preserve pod logs until after the pod
   221  	// is deleted.
   222  	ShouldPodRuntimeBeRemoved(uid types.UID) bool
   223  	// ShouldPodContentBeRemoved returns true if resource managers within the Kubelet
   224  	// should aggressively cleanup all content related to the pod. This is true
   225  	// during pod eviction (when we wish to remove that content to free resources)
   226  	// as well as after the request to delete a pod has resulted in containers being
   227  	// stopped (which is a more graceful action). Note that a deleting pod can still
   228  	// be evicted.
   229  	//
   230  	// Intended for use by subsystem sync loops to know when to start tearing down
   231  	// resources that are used by non-deleted pods. Content is generally preserved
   232  	// until deletion+removal_from_etcd or eviction, although garbage collection
   233  	// can free content when this method returns false.
   234  	ShouldPodContentBeRemoved(uid types.UID) bool
   235  	// IsPodForMirrorPodTerminatingByFullName returns true if a static pod with the
   236  	// provided pod name is currently terminating and has yet to complete. It is
   237  	// intended to be used only during orphan mirror pod cleanup to prevent us from
   238  	// deleting a terminating static pod from the apiserver before the pod is shut
   239  	// down.
   240  	IsPodForMirrorPodTerminatingByFullName(podFullname string) bool
   241  }
   242  
   243  // podSyncer describes the core lifecyle operations of the pod state machine. A pod is first
   244  // synced until it naturally reaches termination (true is returned) or an external agent decides
   245  // the pod should be terminated. Once a pod should be terminating, SyncTerminatingPod is invoked
   246  // until it returns no error. Then the SyncTerminatedPod method is invoked until it exits without
   247  // error, and the pod is considered terminal. Implementations of this interface must be threadsafe
   248  // for simultaneous invocation of these methods for multiple pods.
   249  type podSyncer interface {
   250  	// SyncPod configures the pod and starts and restarts all containers. If it returns true, the
   251  	// pod has reached a terminal state and the presence of the error indicates succeeded or failed.
   252  	// If an error is returned, the sync was not successful and should be rerun in the future. This
   253  	// is a long running method and should exit early with context.Canceled if the context is canceled.
   254  	SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
   255  	// SyncTerminatingPod attempts to ensure the pod's containers are no longer running and to collect
   256  	// any final status. This method is repeatedly invoked with diminishing grace periods until it exits
   257  	// without error. Once this method exits with no error other components are allowed to tear down
   258  	// supporting resources like volumes and devices. If the context is canceled, the method should
   259  	// return context.Canceled unless it has successfully finished, which may occur when a shorter
   260  	// grace period is detected.
   261  	SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
   262  	// SyncTerminatingRuntimePod is invoked when running containers are found that correspond to
   263  	// a pod that is no longer known to the kubelet to terminate those containers. It should not
   264  	// exit without error unless all containers are known to be stopped.
   265  	SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error
   266  	// SyncTerminatedPod is invoked after all running containers are stopped and is responsible
   267  	// for releasing resources that should be executed right away rather than in the background.
   268  	// Once it exits without error the pod is considered finished on the node.
   269  	SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error
   270  }
   271  
   272  type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error)
   273  type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error
   274  type syncTerminatingRuntimePodFnType func(ctx context.Context, runningPod *kubecontainer.Pod) error
   275  type syncTerminatedPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error
   276  
   277  // podSyncerFuncs implements podSyncer and accepts functions for each method.
   278  type podSyncerFuncs struct {
   279  	syncPod                   syncPodFnType
   280  	syncTerminatingPod        syncTerminatingPodFnType
   281  	syncTerminatingRuntimePod syncTerminatingRuntimePodFnType
   282  	syncTerminatedPod         syncTerminatedPodFnType
   283  }
   284  
   285  func newPodSyncerFuncs(s podSyncer) podSyncerFuncs {
   286  	return podSyncerFuncs{
   287  		syncPod:                   s.SyncPod,
   288  		syncTerminatingPod:        s.SyncTerminatingPod,
   289  		syncTerminatingRuntimePod: s.SyncTerminatingRuntimePod,
   290  		syncTerminatedPod:         s.SyncTerminatedPod,
   291  	}
   292  }
   293  
   294  var _ podSyncer = podSyncerFuncs{}
   295  
   296  func (f podSyncerFuncs) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
   297  	return f.syncPod(ctx, updateType, pod, mirrorPod, podStatus)
   298  }
   299  func (f podSyncerFuncs) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
   300  	return f.syncTerminatingPod(ctx, pod, podStatus, gracePeriod, podStatusFn)
   301  }
   302  func (f podSyncerFuncs) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error {
   303  	return f.syncTerminatingRuntimePod(ctx, runningPod)
   304  }
   305  func (f podSyncerFuncs) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
   306  	return f.syncTerminatedPod(ctx, pod, podStatus)
   307  }
   308  
   309  const (
   310  	// jitter factor for resyncInterval
   311  	workerResyncIntervalJitterFactor = 0.5
   312  
   313  	// jitter factor for backOffPeriod and backOffOnTransientErrorPeriod
   314  	workerBackOffPeriodJitterFactor = 0.5
   315  
   316  	// backoff period when transient error occurred.
   317  	backOffOnTransientErrorPeriod = time.Second
   318  )
   319  
   320  // podSyncStatus tracks per-pod transitions through the three phases of pod
   321  // worker sync (setup, terminating, terminated).
   322  type podSyncStatus struct {
   323  	// ctx is the context that is associated with the current pod sync.
   324  	// TODO: remove this from the struct by having the context initialized
   325  	// in startPodSync, the cancelFn used by UpdatePod, and cancellation of
   326  	// a parent context for tearing down workers (if needed) on shutdown
   327  	ctx context.Context
   328  	// cancelFn if set is expected to cancel the current podSyncer operation.
   329  	cancelFn context.CancelFunc
   330  
   331  	// fullname of the pod
   332  	fullname string
   333  
   334  	// working is true if an update is pending or being worked by a pod worker
   335  	// goroutine.
   336  	working bool
   337  	// pendingUpdate is the updated state the pod worker should observe. It is
   338  	// cleared and moved to activeUpdate when a pod worker reads it. A new update
   339  	// may always replace a pending update as the pod worker does not guarantee
   340  	// that all intermediate states are synced to a worker, only the most recent.
   341  	// This state will not be visible to downstream components until a pod worker
   342  	// has begun processing it.
   343  	pendingUpdate *UpdatePodOptions
   344  	// activeUpdate is the most recent version of the pod's state that will be
   345  	// passed to a sync*Pod function. A pod becomes visible to downstream components
   346  	// once a worker decides to start a pod (startedAt is set). The pod and mirror
   347  	// pod fields are accumulated if they are missing on a particular call (the last
   348  	// known version), and the value of KillPodOptions is accumulated as pods cannot
   349  	// have their grace period shortened. This is the source of truth for the pod spec
   350  	// the kubelet is reconciling towards for all components that act on running pods.
   351  	activeUpdate *UpdatePodOptions
   352  
   353  	// syncedAt is the time at which the pod worker first observed this pod.
   354  	syncedAt time.Time
   355  	// startedAt is the time at which the pod worker allowed the pod to start.
   356  	startedAt time.Time
   357  	// terminatingAt is set once the pod is requested to be killed - note that
   358  	// this can be set before the pod worker starts terminating the pod, see
   359  	// terminating.
   360  	terminatingAt time.Time
   361  	// terminatedAt is set once the pod worker has completed a successful
   362  	// syncTerminatingPod call and means all running containers are stopped.
   363  	terminatedAt time.Time
   364  	// gracePeriod is the requested gracePeriod once terminatingAt is nonzero.
   365  	gracePeriod int64
   366  	// notifyPostTerminating will be closed once the pod transitions to
   367  	// terminated. After the pod is in terminated state, nothing should be
   368  	// added to this list.
   369  	notifyPostTerminating []chan<- struct{}
   370  	// statusPostTerminating is a list of the status changes associated
   371  	// with kill pod requests. After the pod is in terminated state, nothing
   372  	// should be added to this list. The worker will execute the last function
   373  	// in this list on each termination attempt.
   374  	statusPostTerminating []PodStatusFunc
   375  
   376  	// startedTerminating is true once the pod worker has observed the request to
   377  	// stop a pod (exited syncPod and observed a podWork with WorkType
   378  	// TerminatingPod). Once this is set, it is safe for other components
   379  	// of the kubelet to assume that no other containers may be started.
   380  	startedTerminating bool
   381  	// deleted is true if the pod has been marked for deletion on the apiserver
   382  	// or has no configuration represented (was deleted before).
   383  	deleted bool
   384  	// evicted is true if the kill indicated this was an eviction (an evicted
   385  	// pod can be more aggressively cleaned up).
   386  	evicted bool
   387  	// finished is true once the pod worker completes for a pod
   388  	// (syncTerminatedPod exited with no errors) until SyncKnownPods is invoked
   389  	// to remove the pod. A terminal pod (Succeeded/Failed) will have
   390  	// termination status until the pod is deleted.
   391  	finished bool
   392  	// restartRequested is true if the pod worker was informed the pod is
   393  	// expected to exist (update type of create, update, or sync) after
   394  	// it has been killed. When known pods are synced, any pod that is
   395  	// terminated and has restartRequested will have its history cleared.
   396  	restartRequested bool
   397  	// observedRuntime is true if the pod has been observed to be present in the
   398  	// runtime. A pod that has been observed at runtime must go through either
   399  	// SyncTerminatingRuntimePod or SyncTerminatingPod. Otherwise, we can avoid
   400  	// invoking the terminating methods if the pod is deleted or orphaned before
   401  	// it has been started.
   402  	observedRuntime bool
   403  }
   404  
   405  func (s *podSyncStatus) IsWorking() bool              { return s.working }
   406  func (s *podSyncStatus) IsTerminationRequested() bool { return !s.terminatingAt.IsZero() }
   407  func (s *podSyncStatus) IsTerminationStarted() bool   { return s.startedTerminating }
   408  func (s *podSyncStatus) IsTerminated() bool           { return !s.terminatedAt.IsZero() }
   409  func (s *podSyncStatus) IsFinished() bool             { return s.finished }
   410  func (s *podSyncStatus) IsEvicted() bool              { return s.evicted }
   411  func (s *podSyncStatus) IsDeleted() bool              { return s.deleted }
   412  func (s *podSyncStatus) IsStarted() bool              { return !s.startedAt.IsZero() }
   413  
   414  // WorkType returns this pods' current state of the pod in pod lifecycle state machine.
   415  func (s *podSyncStatus) WorkType() PodWorkerState {
   416  	if s.IsTerminated() {
   417  		return TerminatedPod
   418  	}
   419  	if s.IsTerminationRequested() {
   420  		return TerminatingPod
   421  	}
   422  	return SyncPod
   423  }
   424  
   425  // mergeLastUpdate records the most recent state from a new update. Pod and MirrorPod are
   426  // incremented. KillPodOptions is accumulated. If RunningPod is set, Pod is synthetic and
   427  // will *not* be used as the last pod state unless no previous pod state exists (because
   428  // the pod worker may be responsible for terminating a pod from a previous run of the
   429  // kubelet where no config state is visible). The contents of activeUpdate are used as the
   430  // source of truth for components downstream of the pod workers.
   431  func (s *podSyncStatus) mergeLastUpdate(other UpdatePodOptions) {
   432  	opts := s.activeUpdate
   433  	if opts == nil {
   434  		opts = &UpdatePodOptions{}
   435  		s.activeUpdate = opts
   436  	}
   437  
   438  	// UpdatePodOptions states (and UpdatePod enforces) that either Pod or RunningPod
   439  	// is set, and we wish to preserve the most recent Pod we have observed, so only
   440  	// overwrite our Pod when we have no Pod or when RunningPod is nil.
   441  	if opts.Pod == nil || other.RunningPod == nil {
   442  		opts.Pod = other.Pod
   443  	}
   444  	// running pods will not persist but will be remembered for replay
   445  	opts.RunningPod = other.RunningPod
   446  	// if mirrorPod was not provided, remember the last one for replay
   447  	if other.MirrorPod != nil {
   448  		opts.MirrorPod = other.MirrorPod
   449  	}
   450  	// accumulate kill pod options
   451  	if other.KillPodOptions != nil {
   452  		opts.KillPodOptions = &KillPodOptions{}
   453  		if other.KillPodOptions.Evict {
   454  			opts.KillPodOptions.Evict = true
   455  		}
   456  		if override := other.KillPodOptions.PodTerminationGracePeriodSecondsOverride; override != nil {
   457  			value := *override
   458  			opts.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &value
   459  		}
   460  	}
   461  	// StartTime is not copied - that is purely for tracking latency of config propagation
   462  	// from kubelet to pod worker.
   463  }
   464  
   465  // podWorkers keeps track of operations on pods and ensures each pod is
   466  // reconciled with the container runtime and other subsystems. The worker
   467  // also tracks which pods are in flight for starting, which pods are
   468  // shutting down but still have running containers, and which pods have
   469  // terminated recently and are guaranteed to have no running containers.
   470  //
   471  // podWorkers is the source of truth for what pods should be active on a
   472  // node at any time, and is kept up to date with the desired state of the
   473  // node (tracked by the kubelet pod config loops and the state in the
   474  // kubelet's podManager) via the UpdatePod method. Components that act
   475  // upon running pods should look to the pod worker for state instead of the
   476  // kubelet podManager. The pod worker is periodically reconciled with the
   477  // state of the podManager via SyncKnownPods() and is responsible for
   478  // ensuring the completion of all observed pods no longer present in
   479  // the podManager (no longer part of the node's desired config).
   480  //
   481  // A pod passed to a pod worker is either being synced (expected to be
   482  // running), terminating (has running containers but no new containers are
   483  // expected to start), terminated (has no running containers but may still
   484  // have resources being consumed), or cleaned up (no resources remaining).
   485  // Once a pod is set to be "torn down" it cannot be started again for that
   486  // UID (corresponding to a delete or eviction) until:
   487  //
   488  //  1. The pod worker is finalized (syncTerminatingPod and
   489  //     syncTerminatedPod exit without error sequentially)
   490  //  2. The SyncKnownPods method is invoked by kubelet housekeeping and the pod
   491  //     is not part of the known config.
   492  //
   493  // Pod workers provide a consistent source of information to other kubelet
   494  // loops about the status of the pod and whether containers can be
   495  // running. The ShouldPodContentBeRemoved() method tracks whether a pod's
   496  // contents should still exist, which includes non-existent pods after
   497  // SyncKnownPods() has been called once (as per the contract, all existing
   498  // pods should be provided via UpdatePod before SyncKnownPods is invoked).
   499  // Generally other sync loops are expected to separate "setup" and
   500  // "teardown" responsibilities and the information methods here assist in
   501  // each by centralizing that state. A simple visualization of the time
   502  // intervals involved might look like:
   503  //
   504  // ---|                                         = kubelet config has synced at least once
   505  // -------|                                  |- = pod exists in apiserver config
   506  // --------|                  |---------------- = CouldHaveRunningContainers() is true
   507  //
   508  //	^- pod is observed by pod worker  .
   509  //	.                                 .
   510  //
   511  // ----------|       |------------------------- = syncPod is running
   512  //
   513  //	. ^- pod worker loop sees change and invokes syncPod
   514  //	. .                               .
   515  //
   516  // --------------|                     |------- = ShouldPodContainersBeTerminating() returns true
   517  // --------------|                     |------- = IsPodTerminationRequested() returns true (pod is known)
   518  //
   519  //	. .   ^- Kubelet evicts pod       .
   520  //	. .                               .
   521  //
   522  // -------------------|       |---------------- = syncTerminatingPod runs then exits without error
   523  //
   524  //	        . .        ^ pod worker loop exits syncPod, sees pod is terminating,
   525  //					 . .          invokes syncTerminatingPod
   526  //	        . .                               .
   527  //
   528  // ---|    |------------------|              .  = ShouldPodRuntimeBeRemoved() returns true (post-sync)
   529  //
   530  //	.                ^ syncTerminatingPod has exited successfully
   531  //	.                               .
   532  //
   533  // ----------------------------|       |------- = syncTerminatedPod runs then exits without error
   534  //
   535  //	.                         ^ other loops can tear down
   536  //	.                               .
   537  //
   538  // ------------------------------------|  |---- = status manager is waiting for SyncTerminatedPod() finished
   539  //
   540  //	.                         ^     .
   541  //
   542  // ----------|                               |- = status manager can be writing pod status
   543  //
   544  //	^ status manager deletes pod because no longer exists in config
   545  //
   546  // Other components in the Kubelet can request a termination of the pod
   547  // via the UpdatePod method or the killPodNow wrapper - this will ensure
   548  // the components of the pod are stopped until the kubelet is restarted
   549  // or permanently (if the phase of the pod is set to a terminal phase
   550  // in the pod status change).
   551  type podWorkers struct {
   552  	// Protects all per worker fields.
   553  	podLock sync.Mutex
   554  	// podsSynced is true once the pod worker has been synced at least once,
   555  	// which means that all working pods have been started via UpdatePod().
   556  	podsSynced bool
   557  
   558  	// Tracks all running per-pod goroutines - per-pod goroutine will be
   559  	// processing updates received through its corresponding channel. Sending
   560  	// a message on this channel will signal the corresponding goroutine to
   561  	// consume podSyncStatuses[uid].pendingUpdate if set.
   562  	podUpdates map[types.UID]chan struct{}
   563  	// Tracks by UID the termination status of a pod - syncing, terminating,
   564  	// terminated, and evicted.
   565  	podSyncStatuses map[types.UID]*podSyncStatus
   566  
   567  	// Tracks all uids for started static pods by full name
   568  	startedStaticPodsByFullname map[string]types.UID
   569  	// Tracks all uids for static pods that are waiting to start by full name
   570  	waitingToStartStaticPodsByFullname map[string][]types.UID
   571  
   572  	workQueue queue.WorkQueue
   573  
   574  	// This function is run to sync the desired state of pod.
   575  	// NOTE: This function has to be thread-safe - it can be called for
   576  	// different pods at the same time.
   577  	podSyncer podSyncer
   578  
   579  	// workerChannelFn is exposed for testing to allow unit tests to impose delays
   580  	// in channel communication. The function is invoked once each time a new worker
   581  	// goroutine starts.
   582  	workerChannelFn func(uid types.UID, in chan struct{}) (out <-chan struct{})
   583  
   584  	// The EventRecorder to use
   585  	recorder record.EventRecorder
   586  
   587  	// backOffPeriod is the duration to back off when there is a sync error.
   588  	backOffPeriod time.Duration
   589  
   590  	// resyncInterval is the duration to wait until the next sync.
   591  	resyncInterval time.Duration
   592  
   593  	// podCache stores kubecontainer.PodStatus for all pods.
   594  	podCache kubecontainer.Cache
   595  
   596  	// clock is used for testing timing
   597  	clock clock.PassiveClock
   598  }
   599  
   600  func newPodWorkers(
   601  	podSyncer podSyncer,
   602  	recorder record.EventRecorder,
   603  	workQueue queue.WorkQueue,
   604  	resyncInterval, backOffPeriod time.Duration,
   605  	podCache kubecontainer.Cache,
   606  ) PodWorkers {
   607  	return &podWorkers{
   608  		podSyncStatuses:                    map[types.UID]*podSyncStatus{},
   609  		podUpdates:                         map[types.UID]chan struct{}{},
   610  		startedStaticPodsByFullname:        map[string]types.UID{},
   611  		waitingToStartStaticPodsByFullname: map[string][]types.UID{},
   612  		podSyncer:                          podSyncer,
   613  		recorder:                           recorder,
   614  		workQueue:                          workQueue,
   615  		resyncInterval:                     resyncInterval,
   616  		backOffPeriod:                      backOffPeriod,
   617  		podCache:                           podCache,
   618  		clock:                              clock.RealClock{},
   619  	}
   620  }
   621  
   622  func (p *podWorkers) IsPodKnownTerminated(uid types.UID) bool {
   623  	p.podLock.Lock()
   624  	defer p.podLock.Unlock()
   625  	if status, ok := p.podSyncStatuses[uid]; ok {
   626  		return status.IsTerminated()
   627  	}
   628  	// if the pod is not known, we return false (pod worker is not aware of it)
   629  	return false
   630  }
   631  
   632  func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool {
   633  	p.podLock.Lock()
   634  	defer p.podLock.Unlock()
   635  	if status, ok := p.podSyncStatuses[uid]; ok {
   636  		return !status.IsTerminated()
   637  	}
   638  	// once all pods are synced, any pod without sync status is known to not be running.
   639  	return !p.podsSynced
   640  }
   641  
   642  func (p *podWorkers) ShouldPodBeFinished(uid types.UID) bool {
   643  	p.podLock.Lock()
   644  	defer p.podLock.Unlock()
   645  	if status, ok := p.podSyncStatuses[uid]; ok {
   646  		return status.IsFinished()
   647  	}
   648  	// once all pods are synced, any pod without sync status is assumed to
   649  	// have SyncTerminatedPod finished.
   650  	return p.podsSynced
   651  }
   652  
   653  func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool {
   654  	p.podLock.Lock()
   655  	defer p.podLock.Unlock()
   656  	if status, ok := p.podSyncStatuses[uid]; ok {
   657  		// the pod may still be setting up at this point.
   658  		return status.IsTerminationRequested()
   659  	}
   660  	// an unknown pod is considered not to be terminating (use ShouldPodContainersBeTerminating in
   661  	// cleanup loops to avoid failing to cleanup pods that have already been removed from config)
   662  	return false
   663  }
   664  
   665  func (p *podWorkers) ShouldPodContainersBeTerminating(uid types.UID) bool {
   666  	p.podLock.Lock()
   667  	defer p.podLock.Unlock()
   668  	if status, ok := p.podSyncStatuses[uid]; ok {
   669  		// we wait until the pod worker goroutine observes the termination, which means syncPod will not
   670  		// be executed again, which means no new containers can be started
   671  		return status.IsTerminationStarted()
   672  	}
   673  	// once we've synced, if the pod isn't known to the workers we should be tearing them
   674  	// down
   675  	return p.podsSynced
   676  }
   677  
   678  func (p *podWorkers) ShouldPodRuntimeBeRemoved(uid types.UID) bool {
   679  	p.podLock.Lock()
   680  	defer p.podLock.Unlock()
   681  	if status, ok := p.podSyncStatuses[uid]; ok {
   682  		return status.IsTerminated()
   683  	}
   684  	// a pod that hasn't been sent to the pod worker yet should have no runtime components once we have
   685  	// synced all content.
   686  	return p.podsSynced
   687  }
   688  
   689  func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool {
   690  	p.podLock.Lock()
   691  	defer p.podLock.Unlock()
   692  	if status, ok := p.podSyncStatuses[uid]; ok {
   693  		return status.IsEvicted() || (status.IsDeleted() && status.IsTerminated())
   694  	}
   695  	// a pod that hasn't been sent to the pod worker yet should have no content on disk once we have
   696  	// synced all content.
   697  	return p.podsSynced
   698  }
   699  
   700  func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool {
   701  	p.podLock.Lock()
   702  	defer p.podLock.Unlock()
   703  	uid, started := p.startedStaticPodsByFullname[podFullName]
   704  	if !started {
   705  		return false
   706  	}
   707  	status, exists := p.podSyncStatuses[uid]
   708  	if !exists {
   709  		return false
   710  	}
   711  	if !status.IsTerminationRequested() || status.IsTerminated() {
   712  		return false
   713  	}
   714  
   715  	return true
   716  }
   717  
   718  func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
   719  	for _, container := range status.ContainerStatuses {
   720  		if container.State == kubecontainer.ContainerStateRunning {
   721  			return false
   722  		}
   723  	}
   724  	for _, sb := range status.SandboxStatuses {
   725  		if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY {
   726  			return false
   727  		}
   728  	}
   729  	return true
   730  }
   731  
   732  // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
   733  // terminating, or terminated, and will transition to terminating if: deleted on the apiserver,
   734  // discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet.
   735  func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
   736  	// Handle when the pod is an orphan (no config) and we only have runtime status by running only
   737  	// the terminating part of the lifecycle. A running pod contains only a minimal set of information
   738  	// about the pod
   739  	var isRuntimePod bool
   740  	var uid types.UID
   741  	var name, ns string
   742  	if runningPod := options.RunningPod; runningPod != nil {
   743  		if options.Pod == nil {
   744  			// the sythetic pod created here is used only as a placeholder and not tracked
   745  			if options.UpdateType != kubetypes.SyncPodKill {
   746  				klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType)
   747  				return
   748  			}
   749  			uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name
   750  			isRuntimePod = true
   751  		} else {
   752  			options.RunningPod = nil
   753  			uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
   754  			klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   755  		}
   756  	} else {
   757  		uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name
   758  	}
   759  
   760  	p.podLock.Lock()
   761  	defer p.podLock.Unlock()
   762  
   763  	// decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
   764  	var firstTime bool
   765  	now := p.clock.Now()
   766  	status, ok := p.podSyncStatuses[uid]
   767  	if !ok {
   768  		klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   769  		firstTime = true
   770  		status = &podSyncStatus{
   771  			syncedAt: now,
   772  			fullname: kubecontainer.BuildPodFullName(name, ns),
   773  		}
   774  		// if this pod is being synced for the first time, we need to make sure it is an active pod
   775  		if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) {
   776  			// Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated.
   777  			// This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523
   778  			// However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set.
   779  			if statusCache, err := p.podCache.Get(uid); err == nil {
   780  				if isPodStatusCacheTerminal(statusCache) {
   781  					// At this point we know:
   782  					// (1) The pod is terminal based on the config source.
   783  					// (2) The pod is terminal based on the runtime cache.
   784  					// This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart.
   785  					// These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod.
   786  					// As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run.
   787  					status = &podSyncStatus{
   788  						terminatedAt:       now,
   789  						terminatingAt:      now,
   790  						syncedAt:           now,
   791  						startedTerminating: true,
   792  						finished:           false,
   793  						fullname:           kubecontainer.BuildPodFullName(name, ns),
   794  					}
   795  				}
   796  			}
   797  		}
   798  		p.podSyncStatuses[uid] = status
   799  	}
   800  
   801  	// RunningPods represent an unknown pod execution and don't contain pod spec information
   802  	// sufficient to perform any action other than termination. If we received a RunningPod
   803  	// after a real pod has already been provided, use the most recent spec instead. Also,
   804  	// once we observe a runtime pod we must drive it to completion, even if we weren't the
   805  	// ones who started it.
   806  	pod := options.Pod
   807  	if isRuntimePod {
   808  		status.observedRuntime = true
   809  		switch {
   810  		case status.pendingUpdate != nil && status.pendingUpdate.Pod != nil:
   811  			pod = status.pendingUpdate.Pod
   812  			options.Pod = pod
   813  			options.RunningPod = nil
   814  		case status.activeUpdate != nil && status.activeUpdate.Pod != nil:
   815  			pod = status.activeUpdate.Pod
   816  			options.Pod = pod
   817  			options.RunningPod = nil
   818  		default:
   819  			// we will continue to use RunningPod.ToAPIPod() as pod here, but
   820  			// options.Pod will be nil and other methods must handle that appropriately.
   821  			pod = options.RunningPod.ToAPIPod()
   822  		}
   823  	}
   824  
   825  	// When we see a create update on an already terminating pod, that implies two pods with the same UID were created in
   826  	// close temporal proximity (usually static pod but it's possible for an apiserver to extremely rarely do something
   827  	// similar) - flag the sync status to indicate that after the pod terminates it should be reset to "not running" to
   828  	// allow a subsequent add/update to start the pod worker again. This does not apply to the first time we see a pod,
   829  	// such as when the kubelet restarts and we see already terminated pods for the first time.
   830  	if !firstTime && status.IsTerminationRequested() {
   831  		if options.UpdateType == kubetypes.SyncPodCreate {
   832  			status.restartRequested = true
   833  			klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   834  			return
   835  		}
   836  	}
   837  
   838  	// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
   839  	if status.IsFinished() {
   840  		klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   841  		return
   842  	}
   843  
   844  	// check for a transition to terminating
   845  	var becameTerminating bool
   846  	if !status.IsTerminationRequested() {
   847  		switch {
   848  		case isRuntimePod:
   849  			klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   850  			status.deleted = true
   851  			status.terminatingAt = now
   852  			becameTerminating = true
   853  		case pod.DeletionTimestamp != nil:
   854  			klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   855  			status.deleted = true
   856  			status.terminatingAt = now
   857  			becameTerminating = true
   858  		case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded:
   859  			klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   860  			status.terminatingAt = now
   861  			becameTerminating = true
   862  		case options.UpdateType == kubetypes.SyncPodKill:
   863  			if options.KillPodOptions != nil && options.KillPodOptions.Evict {
   864  				klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   865  				status.evicted = true
   866  			} else {
   867  				klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   868  			}
   869  			status.terminatingAt = now
   870  			becameTerminating = true
   871  		}
   872  	}
   873  
   874  	// once a pod is terminating, all updates are kills and the grace period can only decrease
   875  	var wasGracePeriodShortened bool
   876  	switch {
   877  	case status.IsTerminated():
   878  		// A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request
   879  		// due to housekeeping seeing an older cached version of the runtime pod simply ignore it until
   880  		// after the pod worker completes.
   881  		if isRuntimePod {
   882  			klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType)
   883  			return
   884  		}
   885  
   886  		if options.KillPodOptions != nil {
   887  			if ch := options.KillPodOptions.CompletedCh; ch != nil {
   888  				close(ch)
   889  			}
   890  		}
   891  		options.KillPodOptions = nil
   892  
   893  	case status.IsTerminationRequested():
   894  		if options.KillPodOptions == nil {
   895  			options.KillPodOptions = &KillPodOptions{}
   896  		}
   897  
   898  		if ch := options.KillPodOptions.CompletedCh; ch != nil {
   899  			status.notifyPostTerminating = append(status.notifyPostTerminating, ch)
   900  		}
   901  		if fn := options.KillPodOptions.PodStatusFunc; fn != nil {
   902  			status.statusPostTerminating = append(status.statusPostTerminating, fn)
   903  		}
   904  
   905  		gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions)
   906  
   907  		wasGracePeriodShortened = gracePeriodShortened
   908  		status.gracePeriod = gracePeriod
   909  		// always set the grace period for syncTerminatingPod so we don't have to recalculate,
   910  		// will never be zero.
   911  		options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
   912  
   913  	default:
   914  		// KillPodOptions is not valid for sync actions outside of the terminating phase
   915  		if options.KillPodOptions != nil {
   916  			if ch := options.KillPodOptions.CompletedCh; ch != nil {
   917  				close(ch)
   918  			}
   919  			options.KillPodOptions = nil
   920  		}
   921  	}
   922  
   923  	// start the pod worker goroutine if it doesn't exist
   924  	podUpdates, exists := p.podUpdates[uid]
   925  	if !exists {
   926  		// buffer the channel to avoid blocking this method
   927  		podUpdates = make(chan struct{}, 1)
   928  		p.podUpdates[uid] = podUpdates
   929  
   930  		// ensure that static pods start in the order they are received by UpdatePod
   931  		if kubetypes.IsStaticPod(pod) {
   932  			p.waitingToStartStaticPodsByFullname[status.fullname] =
   933  				append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
   934  		}
   935  
   936  		// allow testing of delays in the pod update channel
   937  		var outCh <-chan struct{}
   938  		if p.workerChannelFn != nil {
   939  			outCh = p.workerChannelFn(uid, podUpdates)
   940  		} else {
   941  			outCh = podUpdates
   942  		}
   943  
   944  		// spawn a pod worker
   945  		go func() {
   946  			// TODO: this should be a wait.Until with backoff to handle panics, and
   947  			// accept a context for shutdown
   948  			defer runtime.HandleCrash()
   949  			defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
   950  			p.podWorkerLoop(uid, outCh)
   951  		}()
   952  	}
   953  
   954  	// measure the maximum latency between a call to UpdatePod and when the pod worker reacts to it
   955  	// by preserving the oldest StartTime
   956  	if status.pendingUpdate != nil && !status.pendingUpdate.StartTime.IsZero() && status.pendingUpdate.StartTime.Before(options.StartTime) {
   957  		options.StartTime = status.pendingUpdate.StartTime
   958  	}
   959  
   960  	// notify the pod worker there is a pending update
   961  	status.pendingUpdate = &options
   962  	status.working = true
   963  	klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
   964  	select {
   965  	case podUpdates <- struct{}{}:
   966  	default:
   967  	}
   968  
   969  	if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil {
   970  		klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
   971  		status.cancelFn()
   972  		return
   973  	}
   974  }
   975  
   976  // calculateEffectiveGracePeriod sets the initial grace period for a newly terminating pod or allows a
   977  // shorter grace period to be provided, returning the desired value.
   978  func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *KillPodOptions) (int64, bool) {
   979  	// enforce the restriction that a grace period can only decrease and track whatever our value is,
   980  	// then ensure a calculated value is passed down to lower levels
   981  	gracePeriod := status.gracePeriod
   982  	// this value is bedrock truth - the apiserver owns telling us this value calculated by apiserver
   983  	if override := pod.DeletionGracePeriodSeconds; override != nil {
   984  		if gracePeriod == 0 || *override < gracePeriod {
   985  			gracePeriod = *override
   986  		}
   987  	}
   988  	// we allow other parts of the kubelet (namely eviction) to request this pod be terminated faster
   989  	if options != nil {
   990  		if override := options.PodTerminationGracePeriodSecondsOverride; override != nil {
   991  			if gracePeriod == 0 || *override < gracePeriod {
   992  				gracePeriod = *override
   993  			}
   994  		}
   995  	}
   996  	// make a best effort to default this value to the pod's desired intent, in the event
   997  	// the kubelet provided no requested value (graceful termination?)
   998  	if gracePeriod == 0 && pod.Spec.TerminationGracePeriodSeconds != nil {
   999  		gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
  1000  	}
  1001  	// no matter what, we always supply a grace period of 1
  1002  	if gracePeriod < 1 {
  1003  		gracePeriod = 1
  1004  	}
  1005  	return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
  1006  }
  1007  
  1008  // allowPodStart tries to start the pod and returns true if allowed, otherwise
  1009  // it requeues the pod and returns false. If the pod will never be able to start
  1010  // because data is missing, or the pod was terminated before start, canEverStart
  1011  // is false. This method can only be called while holding the pod lock.
  1012  func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) {
  1013  	if !kubetypes.IsStaticPod(pod) {
  1014  		// TODO: Do we want to allow non-static pods with the same full name?
  1015  		// Note that it may disable the force deletion of pods.
  1016  		return true, true
  1017  	}
  1018  	status, ok := p.podSyncStatuses[pod.UID]
  1019  	if !ok {
  1020  		klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
  1021  		return false, false
  1022  	}
  1023  	if status.IsTerminationRequested() {
  1024  		return false, false
  1025  	}
  1026  	if !p.allowStaticPodStart(status.fullname, pod.UID) {
  1027  		p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
  1028  		return false, true
  1029  	}
  1030  	return true, true
  1031  }
  1032  
  1033  // allowStaticPodStart tries to start the static pod and returns true if
  1034  // 1. there are no other started static pods with the same fullname
  1035  // 2. the uid matches that of the first valid static pod waiting to start
  1036  func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
  1037  	startedUID, started := p.startedStaticPodsByFullname[fullname]
  1038  	if started {
  1039  		return startedUID == uid
  1040  	}
  1041  
  1042  	waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
  1043  	// TODO: This is O(N) with respect to the number of updates to static pods
  1044  	// with overlapping full names, and ideally would be O(1).
  1045  	for i, waitingUID := range waitingPods {
  1046  		// has pod already terminated or been deleted?
  1047  		status, ok := p.podSyncStatuses[waitingUID]
  1048  		if !ok || status.IsTerminationRequested() || status.IsTerminated() {
  1049  			continue
  1050  		}
  1051  		// another pod is next in line
  1052  		if waitingUID != uid {
  1053  			p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:]
  1054  			return false
  1055  		}
  1056  		// we are up next, remove ourselves
  1057  		waitingPods = waitingPods[i+1:]
  1058  		break
  1059  	}
  1060  	if len(waitingPods) != 0 {
  1061  		p.waitingToStartStaticPodsByFullname[fullname] = waitingPods
  1062  	} else {
  1063  		delete(p.waitingToStartStaticPodsByFullname, fullname)
  1064  	}
  1065  	p.startedStaticPodsByFullname[fullname] = uid
  1066  	return true
  1067  }
  1068  
  1069  // cleanupUnstartedPod is invoked if a pod that has never been started receives a termination
  1070  // signal before it can be started. This method must be called holding the pod lock.
  1071  func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) {
  1072  	p.cleanupPodUpdates(pod.UID)
  1073  
  1074  	if status.terminatingAt.IsZero() {
  1075  		klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
  1076  	}
  1077  	if !status.terminatedAt.IsZero() {
  1078  		klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
  1079  	}
  1080  	status.finished = true
  1081  	status.working = false
  1082  	status.terminatedAt = p.clock.Now()
  1083  
  1084  	if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
  1085  		delete(p.startedStaticPodsByFullname, status.fullname)
  1086  	}
  1087  }
  1088  
  1089  // startPodSync is invoked by each pod worker goroutine when a message arrives on the pod update channel.
  1090  // This method consumes a pending update, initializes a context, decides whether the pod is already started
  1091  // or can be started, and updates the cached pod state so that downstream components can observe what the
  1092  // pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any
  1093  // of the boolean values is false, ensure the appropriate cleanup happens before returning.
  1094  //
  1095  // This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate,
  1096  // or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started
  1097  // should never have an activeUpdate because that is exposed to downstream components on started pods.
  1098  func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) {
  1099  	p.podLock.Lock()
  1100  	defer p.podLock.Unlock()
  1101  
  1102  	// verify we are known to the pod worker still
  1103  	status, ok := p.podSyncStatuses[podUID]
  1104  	if !ok {
  1105  		// pod status has disappeared, the worker should exit
  1106  		klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID)
  1107  		return nil, update, false, false, false
  1108  	}
  1109  	if !status.working {
  1110  		// working is used by unit tests to observe whether a worker is currently acting on this pod
  1111  		klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID)
  1112  	}
  1113  	if status.pendingUpdate == nil {
  1114  		// no update available, this means we were queued without work being added or there is a
  1115  		// race condition, both of which are unexpected
  1116  		status.working = false
  1117  		klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID)
  1118  		return nil, update, false, false, false
  1119  	}
  1120  
  1121  	// consume the pending update
  1122  	update.WorkType = status.WorkType()
  1123  	update.Options = *status.pendingUpdate
  1124  	status.pendingUpdate = nil
  1125  	select {
  1126  	case <-p.podUpdates[podUID]:
  1127  		// ensure the pod update channel is empty (it is only ever written to under lock)
  1128  	default:
  1129  	}
  1130  
  1131  	// initialize a context for the worker if one does not exist
  1132  	if status.ctx == nil || status.ctx.Err() == context.Canceled {
  1133  		status.ctx, status.cancelFn = context.WithCancel(context.Background())
  1134  	}
  1135  	ctx = status.ctx
  1136  
  1137  	// if we are already started, make our state visible to downstream components
  1138  	if status.IsStarted() {
  1139  		status.mergeLastUpdate(update.Options)
  1140  		return ctx, update, true, true, true
  1141  	}
  1142  
  1143  	// if we are already terminating and we only have a running pod, allow the worker
  1144  	// to "start" since we are immediately moving to terminating
  1145  	if update.Options.RunningPod != nil && update.WorkType == TerminatingPod {
  1146  		status.mergeLastUpdate(update.Options)
  1147  		return ctx, update, true, true, true
  1148  	}
  1149  
  1150  	// If we receive an update where Pod is nil (running pod is set) but haven't
  1151  	// started yet, we can only terminate the pod, not start it. We should not be
  1152  	// asked to start such a pod, but guard here just in case an accident occurs.
  1153  	if update.Options.Pod == nil {
  1154  		status.mergeLastUpdate(update.Options)
  1155  		klog.V(4).InfoS("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
  1156  		return ctx, update, false, false, true
  1157  	}
  1158  
  1159  	// verify we can start
  1160  	canStart, canEverStart = p.allowPodStart(update.Options.Pod)
  1161  	switch {
  1162  	case !canEverStart:
  1163  		p.cleanupUnstartedPod(update.Options.Pod, status)
  1164  		status.working = false
  1165  		if start := update.Options.StartTime; !start.IsZero() {
  1166  			metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
  1167  		}
  1168  		klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType)
  1169  		return ctx, update, canStart, canEverStart, true
  1170  	case !canStart:
  1171  		// this is the only path we don't start the pod, so we need to put the change back in pendingUpdate
  1172  		status.pendingUpdate = &update.Options
  1173  		status.working = false
  1174  		klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID)
  1175  		return ctx, update, canStart, canEverStart, true
  1176  	}
  1177  
  1178  	// mark the pod as started
  1179  	status.startedAt = p.clock.Now()
  1180  	status.mergeLastUpdate(update.Options)
  1181  
  1182  	// If we are admitting the pod and it is new, record the count of containers
  1183  	// TODO: We should probably move this into syncPod and add an execution count
  1184  	// to the syncPod arguments, and this should be recorded on the first sync.
  1185  	// Leaving it here complicates a particularly important loop.
  1186  	metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers)))
  1187  
  1188  	return ctx, update, true, true, true
  1189  }
  1190  
  1191  func podUIDAndRefForUpdate(update UpdatePodOptions) (types.UID, klog.ObjectRef) {
  1192  	if update.RunningPod != nil {
  1193  		return update.RunningPod.ID, klog.KObj(update.RunningPod.ToAPIPod())
  1194  	}
  1195  	return update.Pod.UID, klog.KObj(update.Pod)
  1196  }
  1197  
  1198  // podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final
  1199  // state is reached. The loop is responsible for driving the pod through four main phases:
  1200  //
  1201  // 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time
  1202  // 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod
  1203  // 3. Terminating, ensuring all running containers in the pod are stopped
  1204  // 4. Terminated, cleaning up any resources that must be released before the pod can be deleted
  1205  //
  1206  // The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular
  1207  // sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet
  1208  // to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the
  1209  // caller. When a pod transitions working->terminating or terminating->terminated, the next update is
  1210  // queued immediately and no kubelet action is required.
  1211  func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
  1212  	var lastSyncTime time.Time
  1213  	for range podUpdates {
  1214  		ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID)
  1215  		// If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate.
  1216  		if !ok {
  1217  			continue
  1218  		}
  1219  		// If the pod was terminated prior to the pod being allowed to start, we exit the loop.
  1220  		if !canEverStart {
  1221  			return
  1222  		}
  1223  		// If the pod is not yet ready to start, continue and wait for more updates.
  1224  		if !canStart {
  1225  			continue
  1226  		}
  1227  
  1228  		podUID, podRef := podUIDAndRefForUpdate(update.Options)
  1229  
  1230  		klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
  1231  		var isTerminal bool
  1232  		err := func() error {
  1233  			// The worker is responsible for ensuring the sync method sees the appropriate
  1234  			// status updates on resyncs (the result of the last sync), transitions to
  1235  			// terminating (no wait), or on terminated (whatever the most recent state is).
  1236  			// Only syncing and terminating can generate pod status changes, while terminated
  1237  			// pods ensure the most recent status makes it to the api server.
  1238  			var status *kubecontainer.PodStatus
  1239  			var err error
  1240  			switch {
  1241  			case update.Options.RunningPod != nil:
  1242  				// when we receive a running pod, we don't need status at all because we are
  1243  				// guaranteed to be terminating and we skip updates to the pod
  1244  			default:
  1245  				// wait until we see the next refresh from the PLEG via the cache (max 2s)
  1246  				// TODO: this adds ~1s of latency on all transitions from sync to terminating
  1247  				//  to terminated, and on all termination retries (including evictions). We should
  1248  				//  improve latency by making the pleg continuous and by allowing pod status
  1249  				//  changes to be refreshed when key events happen (killPod, sync->terminating).
  1250  				//  Improving this latency also reduces the possibility that a terminated
  1251  				//  container's status is garbage collected before we have a chance to update the
  1252  				//  API server (thus losing the exit code).
  1253  				status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime)
  1254  
  1255  				if err != nil {
  1256  					// This is the legacy event thrown by manage pod loop all other events are now dispatched
  1257  					// from syncPodFn
  1258  					p.recorder.Eventf(update.Options.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
  1259  					return err
  1260  				}
  1261  			}
  1262  
  1263  			// Take the appropriate action (illegal phases are prevented by UpdatePod)
  1264  			switch {
  1265  			case update.WorkType == TerminatedPod:
  1266  				err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)
  1267  
  1268  			case update.WorkType == TerminatingPod:
  1269  				var gracePeriod *int64
  1270  				if opt := update.Options.KillPodOptions; opt != nil {
  1271  					gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
  1272  				}
  1273  				podStatusFn := p.acknowledgeTerminating(podUID)
  1274  
  1275  				// if we only have a running pod, terminate it directly
  1276  				if update.Options.RunningPod != nil {
  1277  					err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
  1278  				} else {
  1279  					err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
  1280  				}
  1281  
  1282  			default:
  1283  				isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
  1284  			}
  1285  
  1286  			lastSyncTime = p.clock.Now()
  1287  			return err
  1288  		}()
  1289  
  1290  		var phaseTransition bool
  1291  		switch {
  1292  		case err == context.Canceled:
  1293  			// when the context is cancelled we expect an update to already be queued
  1294  			klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
  1295  
  1296  		case err != nil:
  1297  			// we will queue a retry
  1298  			klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID)
  1299  
  1300  		case update.WorkType == TerminatedPod:
  1301  			// we can shut down the worker
  1302  			p.completeTerminated(podUID)
  1303  			if start := update.Options.StartTime; !start.IsZero() {
  1304  				metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
  1305  			}
  1306  			klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
  1307  			return
  1308  
  1309  		case update.WorkType == TerminatingPod:
  1310  			// pods that don't exist in config don't need to be terminated, other loops will clean them up
  1311  			if update.Options.RunningPod != nil {
  1312  				p.completeTerminatingRuntimePod(podUID)
  1313  				if start := update.Options.StartTime; !start.IsZero() {
  1314  					metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
  1315  				}
  1316  				klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
  1317  				return
  1318  			}
  1319  			// otherwise we move to the terminating phase
  1320  			p.completeTerminating(podUID)
  1321  			phaseTransition = true
  1322  
  1323  		case isTerminal:
  1324  			// if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
  1325  			klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
  1326  			p.completeSync(podUID)
  1327  			phaseTransition = true
  1328  		}
  1329  
  1330  		// queue a retry if necessary, then put the next event in the channel if any
  1331  		p.completeWork(podUID, phaseTransition, err)
  1332  		if start := update.Options.StartTime; !start.IsZero() {
  1333  			metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
  1334  		}
  1335  		klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType)
  1336  	}
  1337  }
  1338  
  1339  // acknowledgeTerminating sets the terminating flag on the pod status once the pod worker sees
  1340  // the termination state so that other components know no new containers will be started in this
  1341  // pod. It then returns the status function, if any, that applies to this pod.
  1342  func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc {
  1343  	p.podLock.Lock()
  1344  	defer p.podLock.Unlock()
  1345  
  1346  	status, ok := p.podSyncStatuses[podUID]
  1347  	if !ok {
  1348  		return nil
  1349  	}
  1350  
  1351  	if !status.terminatingAt.IsZero() && !status.startedTerminating {
  1352  		klog.V(4).InfoS("Pod worker has observed request to terminate", "podUID", podUID)
  1353  		status.startedTerminating = true
  1354  	}
  1355  
  1356  	if l := len(status.statusPostTerminating); l > 0 {
  1357  		return status.statusPostTerminating[l-1]
  1358  	}
  1359  	return nil
  1360  }
  1361  
  1362  // completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should
  1363  // be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways
  1364  // exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by
  1365  // UpdatePod.
  1366  func (p *podWorkers) completeSync(podUID types.UID) {
  1367  	p.podLock.Lock()
  1368  	defer p.podLock.Unlock()
  1369  
  1370  	klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID)
  1371  
  1372  	status, ok := p.podSyncStatuses[podUID]
  1373  	if !ok {
  1374  		klog.V(4).InfoS("Pod had no status in completeSync, programmer error?", "podUID", podUID)
  1375  		return
  1376  	}
  1377  
  1378  	// update the status of the pod
  1379  	if status.terminatingAt.IsZero() {
  1380  		status.terminatingAt = p.clock.Now()
  1381  	} else {
  1382  		klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID)
  1383  	}
  1384  	status.startedTerminating = true
  1385  
  1386  	// the pod has now transitioned to terminating and we want to run syncTerminatingPod
  1387  	// as soon as possible, so if no update is already waiting queue a synthetic update
  1388  	p.requeueLastPodUpdate(podUID, status)
  1389  }
  1390  
  1391  // completeTerminating is invoked when syncTerminatingPod completes successfully, which means
  1392  // no container is running, no container will be started in the future, and we are ready for
  1393  // cleanup.  This updates the termination state which prevents future syncs and will ensure
  1394  // other kubelet loops know this pod is not running any containers.
  1395  func (p *podWorkers) completeTerminating(podUID types.UID) {
  1396  	p.podLock.Lock()
  1397  	defer p.podLock.Unlock()
  1398  
  1399  	klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID)
  1400  
  1401  	status, ok := p.podSyncStatuses[podUID]
  1402  	if !ok {
  1403  		return
  1404  	}
  1405  
  1406  	// update the status of the pod
  1407  	if status.terminatingAt.IsZero() {
  1408  		klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
  1409  	}
  1410  	status.terminatedAt = p.clock.Now()
  1411  	for _, ch := range status.notifyPostTerminating {
  1412  		close(ch)
  1413  	}
  1414  	status.notifyPostTerminating = nil
  1415  	status.statusPostTerminating = nil
  1416  
  1417  	// the pod has now transitioned to terminated and we want to run syncTerminatedPod
  1418  	// as soon as possible, so if no update is already waiting queue a synthetic update
  1419  	p.requeueLastPodUpdate(podUID, status)
  1420  }
  1421  
  1422  // completeTerminatingRuntimePod is invoked when syncTerminatingPod completes successfully,
  1423  // which means an orphaned pod (no config) is terminated and we can exit. Since orphaned
  1424  // pods have no API representation, we want to exit the loop at this point and ensure no
  1425  // status is present afterwards - the running pod is truly terminated when this is invoked.
  1426  func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) {
  1427  	p.podLock.Lock()
  1428  	defer p.podLock.Unlock()
  1429  
  1430  	klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID)
  1431  
  1432  	p.cleanupPodUpdates(podUID)
  1433  
  1434  	status, ok := p.podSyncStatuses[podUID]
  1435  	if !ok {
  1436  		return
  1437  	}
  1438  	if status.terminatingAt.IsZero() {
  1439  		klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID)
  1440  	}
  1441  	status.terminatedAt = p.clock.Now()
  1442  	status.finished = true
  1443  	status.working = false
  1444  
  1445  	if p.startedStaticPodsByFullname[status.fullname] == podUID {
  1446  		delete(p.startedStaticPodsByFullname, status.fullname)
  1447  	}
  1448  
  1449  	// A runtime pod is transient and not part of the desired state - once it has reached
  1450  	// terminated we can abandon tracking it.
  1451  	delete(p.podSyncStatuses, podUID)
  1452  }
  1453  
  1454  // completeTerminated is invoked after syncTerminatedPod completes successfully and means we
  1455  // can stop the pod worker. The pod is finalized at this point.
  1456  func (p *podWorkers) completeTerminated(podUID types.UID) {
  1457  	p.podLock.Lock()
  1458  	defer p.podLock.Unlock()
  1459  
  1460  	klog.V(4).InfoS("Pod is complete and the worker can now stop", "podUID", podUID)
  1461  
  1462  	p.cleanupPodUpdates(podUID)
  1463  
  1464  	status, ok := p.podSyncStatuses[podUID]
  1465  	if !ok {
  1466  		return
  1467  	}
  1468  	if status.terminatingAt.IsZero() {
  1469  		klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID)
  1470  	}
  1471  	if status.terminatedAt.IsZero() {
  1472  		klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID)
  1473  	}
  1474  	status.finished = true
  1475  	status.working = false
  1476  
  1477  	if p.startedStaticPodsByFullname[status.fullname] == podUID {
  1478  		delete(p.startedStaticPodsByFullname, status.fullname)
  1479  	}
  1480  }
  1481  
  1482  // completeWork requeues on error or the next sync interval and then immediately executes any pending
  1483  // work.
  1484  func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncErr error) {
  1485  	// Requeue the last update if the last sync returned error.
  1486  	switch {
  1487  	case phaseTransition:
  1488  		p.workQueue.Enqueue(podUID, 0)
  1489  	case syncErr == nil:
  1490  		// No error; requeue at the regular resync interval.
  1491  		p.workQueue.Enqueue(podUID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
  1492  	case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
  1493  		// Network is not ready; back off for short period of time and retry as network might be ready soon.
  1494  		p.workQueue.Enqueue(podUID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
  1495  	default:
  1496  		// Error occurred during the sync; back off and then retry.
  1497  		p.workQueue.Enqueue(podUID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
  1498  	}
  1499  
  1500  	// if there is a pending update for this worker, requeue immediately, otherwise
  1501  	// clear working status
  1502  	p.podLock.Lock()
  1503  	defer p.podLock.Unlock()
  1504  	if status, ok := p.podSyncStatuses[podUID]; ok {
  1505  		if status.pendingUpdate != nil {
  1506  			select {
  1507  			case p.podUpdates[podUID] <- struct{}{}:
  1508  				klog.V(4).InfoS("Requeueing pod due to pending update", "podUID", podUID)
  1509  			default:
  1510  				klog.V(4).InfoS("Pending update already queued", "podUID", podUID)
  1511  			}
  1512  		} else {
  1513  			status.working = false
  1514  		}
  1515  	}
  1516  }
  1517  
  1518  // SyncKnownPods will purge any fully terminated pods that are not in the desiredPods
  1519  // list, which means SyncKnownPods must be called in a threadsafe manner from calls
  1520  // to UpdatePods for new pods. Because the podworker is dependent on UpdatePod being
  1521  // invoked to drive a pod's state machine, if a pod is missing in the desired list the
  1522  // pod worker must be responsible for delivering that update. The method returns a map
  1523  // of known workers that are not finished with a value of SyncPodTerminated,
  1524  // SyncPodKill, or SyncPodSync depending on whether the pod is terminated, terminating,
  1525  // or syncing.
  1526  func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
  1527  	workers := make(map[types.UID]PodWorkerSync)
  1528  	known := make(map[types.UID]struct{})
  1529  	for _, pod := range desiredPods {
  1530  		known[pod.UID] = struct{}{}
  1531  	}
  1532  
  1533  	p.podLock.Lock()
  1534  	defer p.podLock.Unlock()
  1535  
  1536  	p.podsSynced = true
  1537  	for uid, status := range p.podSyncStatuses {
  1538  		// We retain the worker history of any pod that is still desired according to
  1539  		// its UID. However, there are two scenarios during a sync that result in us
  1540  		// needing to purge the history:
  1541  		//
  1542  		// 1. The pod is no longer desired (the local version is orphaned)
  1543  		// 2. The pod received a kill update and then a subsequent create, which means
  1544  		//    the UID was reused in the source config (vanishingly rare for API servers,
  1545  		//    common for static pods that have specified a fixed UID)
  1546  		//
  1547  		// In the former case we wish to bound the amount of information we store for
  1548  		// deleted pods. In the latter case we wish to minimize the amount of time before
  1549  		// we restart the static pod. If we succeed at removing the worker, then we
  1550  		// omit it from the returned map of known workers, and the caller of SyncKnownPods
  1551  		// is expected to send a new UpdatePod({UpdateType: Create}).
  1552  		_, knownPod := known[uid]
  1553  		orphan := !knownPod
  1554  		if status.restartRequested || orphan {
  1555  			if p.removeTerminatedWorker(uid, status, orphan) {
  1556  				// no worker running, we won't return it
  1557  				continue
  1558  			}
  1559  		}
  1560  
  1561  		sync := PodWorkerSync{
  1562  			State:  status.WorkType(),
  1563  			Orphan: orphan,
  1564  		}
  1565  		switch {
  1566  		case status.activeUpdate != nil:
  1567  			if status.activeUpdate.Pod != nil {
  1568  				sync.HasConfig = true
  1569  				sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod)
  1570  			}
  1571  		case status.pendingUpdate != nil:
  1572  			if status.pendingUpdate.Pod != nil {
  1573  				sync.HasConfig = true
  1574  				sync.Static = kubetypes.IsStaticPod(status.pendingUpdate.Pod)
  1575  			}
  1576  		}
  1577  		workers[uid] = sync
  1578  	}
  1579  	return workers
  1580  }
  1581  
  1582  // removeTerminatedWorker cleans up and removes the worker status for a worker
  1583  // that has reached a terminal state of "finished" - has successfully exited
  1584  // syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be
  1585  // recreated with the same UID. The kubelet preserves state about recently
  1586  // terminated pods to prevent accidentally restarting a terminal pod, which is
  1587  // proportional to the number of pods described in the pod config. The method
  1588  // returns true if the worker was completely removed.
  1589  func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus, orphaned bool) bool {
  1590  	if !status.finished {
  1591  		// If the pod worker has not reached terminal state and the pod is still known, we wait.
  1592  		if !orphaned {
  1593  			klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
  1594  			return false
  1595  		}
  1596  
  1597  		// all orphaned pods are considered deleted
  1598  		status.deleted = true
  1599  
  1600  		// When a pod is no longer in the desired set, the pod is considered orphaned and the
  1601  		// the pod worker becomes responsible for driving the pod to completion (there is no
  1602  		// guarantee another component will notify us of updates).
  1603  		switch {
  1604  		case !status.IsStarted() && !status.observedRuntime:
  1605  			// The pod has not been started, which means we can safely clean up the pod - the
  1606  			// pod worker will shutdown as a result of this change without executing a sync.
  1607  			klog.V(4).InfoS("Pod is orphaned and has not been started", "podUID", uid)
  1608  		case !status.IsTerminationRequested():
  1609  			// The pod has been started but termination has not been requested - set the appropriate
  1610  			// timestamp and notify the pod worker. Because the pod has been synced at least once,
  1611  			// the value of status.activeUpdate will be the fallback for the next sync.
  1612  			status.terminatingAt = p.clock.Now()
  1613  			if status.activeUpdate != nil && status.activeUpdate.Pod != nil {
  1614  				status.gracePeriod, _ = calculateEffectiveGracePeriod(status, status.activeUpdate.Pod, nil)
  1615  			} else {
  1616  				status.gracePeriod = 1
  1617  			}
  1618  			p.requeueLastPodUpdate(uid, status)
  1619  			klog.V(4).InfoS("Pod is orphaned and still running, began terminating", "podUID", uid)
  1620  			return false
  1621  		default:
  1622  			// The pod is already moving towards termination, notify the pod worker. Because the pod
  1623  			// has been synced at least once, the value of status.activeUpdate will be the fallback for
  1624  			// the next sync.
  1625  			p.requeueLastPodUpdate(uid, status)
  1626  			klog.V(4).InfoS("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid)
  1627  			return false
  1628  		}
  1629  	}
  1630  
  1631  	if status.restartRequested {
  1632  		klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid)
  1633  	} else {
  1634  		klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
  1635  	}
  1636  	delete(p.podSyncStatuses, uid)
  1637  	p.cleanupPodUpdates(uid)
  1638  
  1639  	if p.startedStaticPodsByFullname[status.fullname] == uid {
  1640  		delete(p.startedStaticPodsByFullname, status.fullname)
  1641  	}
  1642  	return true
  1643  }
  1644  
  1645  // killPodNow returns a KillPodFunc that can be used to kill a pod.
  1646  // It is intended to be injected into other modules that need to kill a pod.
  1647  func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc {
  1648  	return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error {
  1649  		// determine the grace period to use when killing the pod
  1650  		gracePeriod := int64(0)
  1651  		if gracePeriodOverride != nil {
  1652  			gracePeriod = *gracePeriodOverride
  1653  		} else if pod.Spec.TerminationGracePeriodSeconds != nil {
  1654  			gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
  1655  		}
  1656  
  1657  		// we timeout and return an error if we don't get a callback within a reasonable time.
  1658  		// the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill)
  1659  		timeout := gracePeriod + (gracePeriod / 2)
  1660  		minTimeout := int64(10)
  1661  		if timeout < minTimeout {
  1662  			timeout = minTimeout
  1663  		}
  1664  		timeoutDuration := time.Duration(timeout) * time.Second
  1665  
  1666  		// open a channel we block against until we get a result
  1667  		ch := make(chan struct{}, 1)
  1668  		podWorkers.UpdatePod(UpdatePodOptions{
  1669  			Pod:        pod,
  1670  			UpdateType: kubetypes.SyncPodKill,
  1671  			KillPodOptions: &KillPodOptions{
  1672  				CompletedCh:                              ch,
  1673  				Evict:                                    isEvicted,
  1674  				PodStatusFunc:                            statusFn,
  1675  				PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
  1676  			},
  1677  		})
  1678  
  1679  		// wait for either a response, or a timeout
  1680  		select {
  1681  		case <-ch:
  1682  			return nil
  1683  		case <-time.After(timeoutDuration):
  1684  			recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.")
  1685  			return fmt.Errorf("timeout waiting to kill pod")
  1686  		}
  1687  	}
  1688  }
  1689  
  1690  // cleanupPodUpdates closes the podUpdates channel and removes it from
  1691  // podUpdates map so that the corresponding pod worker can stop. It also
  1692  // removes any undelivered work. This method must be called holding the
  1693  // pod lock.
  1694  func (p *podWorkers) cleanupPodUpdates(uid types.UID) {
  1695  	if ch, ok := p.podUpdates[uid]; ok {
  1696  		close(ch)
  1697  	}
  1698  	delete(p.podUpdates, uid)
  1699  }
  1700  
  1701  // requeueLastPodUpdate creates a new pending pod update from the most recently
  1702  // executed update if no update is already queued, and then notifies the pod
  1703  // worker goroutine of the update. This method must be called while holding
  1704  // the pod lock.
  1705  func (p *podWorkers) requeueLastPodUpdate(podUID types.UID, status *podSyncStatus) {
  1706  	// if there is already an update queued, we can use that instead, or if
  1707  	// we have no previously executed update, we cannot replay it.
  1708  	if status.pendingUpdate != nil || status.activeUpdate == nil {
  1709  		return
  1710  	}
  1711  	copied := *status.activeUpdate
  1712  	status.pendingUpdate = &copied
  1713  
  1714  	// notify the pod worker
  1715  	status.working = true
  1716  	select {
  1717  	case p.podUpdates[podUID] <- struct{}{}:
  1718  	default:
  1719  	}
  1720  }
  1721  

View as plain text