/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package kubelet import ( "context" "fmt" "strings" "sync" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" "k8s.io/klog/v2" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/metrics" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/utils/clock" ) // OnCompleteFunc is a function that is invoked when an operation completes. // If err is non-nil, the operation did not complete successfully. type OnCompleteFunc func(err error) // PodStatusFunc is a function that is invoked to override the pod status when a pod is killed. type PodStatusFunc func(podStatus *v1.PodStatus) // KillPodOptions are options when performing a pod update whose update type is kill. type KillPodOptions struct { // CompletedCh is closed when the kill request completes (syncTerminatingPod has completed // without error) or if the pod does not exist, or if the pod has already terminated. This // could take an arbitrary amount of time to be closed, but is never left open once // CouldHaveRunningContainers() returns false. CompletedCh chan<- struct{} // Evict is true if this is a pod triggered eviction - once a pod is evicted some resources are // more aggressively reaped than during normal pod operation (stopped containers). Evict bool // PodStatusFunc is invoked (if set) and overrides the status of the pod at the time the pod is killed. // The provided status is populated from the latest state. PodStatusFunc PodStatusFunc // PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation. PodTerminationGracePeriodSecondsOverride *int64 } // UpdatePodOptions is an options struct to pass to a UpdatePod operation. type UpdatePodOptions struct { // The type of update (create, update, sync, kill). UpdateType kubetypes.SyncPodType // StartTime is an optional timestamp for when this update was created. If set, // when this update is fully realized by the pod worker it will be recorded in // the PodWorkerDuration metric. StartTime time.Time // Pod to update. Required. Pod *v1.Pod // MirrorPod is the mirror pod if Pod is a static pod. Optional when UpdateType // is kill or terminated. MirrorPod *v1.Pod // RunningPod is a runtime pod that is no longer present in config. Required // if Pod is nil, ignored if Pod is set. RunningPod *kubecontainer.Pod // KillPodOptions is used to override the default termination behavior of the // pod or to update the pod status after an operation is completed. Since a // pod can be killed for multiple reasons, PodStatusFunc is invoked in order // and later kills have an opportunity to override the status (i.e. a preemption // may be later turned into an eviction). KillPodOptions *KillPodOptions } // PodWorkType classifies the status of pod as seen by the pod worker - setup (sync), // teardown of containers (terminating), or cleanup (terminated). type PodWorkerState int const ( // SyncPod is when the pod is expected to be started and running. SyncPod PodWorkerState = iota // TerminatingPod is when the pod is no longer being set up, but some // containers may be running and are being torn down. TerminatingPod // TerminatedPod indicates the pod is stopped, can have no more running // containers, and any foreground cleanup can be executed. TerminatedPod ) func (state PodWorkerState) String() string { switch state { case SyncPod: return "sync" case TerminatingPod: return "terminating" case TerminatedPod: return "terminated" default: panic(fmt.Sprintf("the state %d is not defined", state)) } } // PodWorkerSync is the summarization of a single pod worker for sync. Values // besides state are used to provide metric counts for operators. type PodWorkerSync struct { // State of the pod. State PodWorkerState // Orphan is true if the pod is no longer in the desired set passed to SyncKnownPods. Orphan bool // HasConfig is true if we have a historical pod spec for this pod. HasConfig bool // Static is true if we have config and the pod came from a static source. Static bool } // podWork is the internal changes type podWork struct { // WorkType is the type of sync to perform - sync (create), terminating (stop // containers), terminated (clean up and write status). WorkType PodWorkerState // Options contains the data to sync. Options UpdatePodOptions } // PodWorkers is an abstract interface for testability. type PodWorkers interface { // UpdatePod notifies the pod worker of a change to a pod, which will then // be processed in FIFO order by a goroutine per pod UID. The state of the // pod will be passed to the syncPod method until either the pod is marked // as deleted, it reaches a terminal phase (Succeeded/Failed), or the pod // is evicted by the kubelet. Once that occurs the syncTerminatingPod method // will be called until it exits successfully, and after that all further // UpdatePod() calls will be ignored for that pod until it has been forgotten // due to significant time passing. A pod that is terminated will never be // restarted. UpdatePod(options UpdatePodOptions) // SyncKnownPods removes workers for pods that are not in the desiredPods set // and have been terminated for a significant period of time. Once this method // has been called once, the workers are assumed to be fully initialized and // subsequent calls to ShouldPodContentBeRemoved on unknown pods will return // true. It returns a map describing the state of each known pod worker. It // is the responsibility of the caller to re-add any desired pods that are not // returned as knownPods. SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) // IsPodKnownTerminated returns true once SyncTerminatingPod completes // successfully - the provided pod UID it is known by the pod // worker to be terminated. If the pod has been force deleted and the pod worker // has completed termination this method will return false, so this method should // only be used to filter out pods from the desired set such as in admission. // // Intended for use by the kubelet config loops, but not subsystems, which should // use ShouldPod*(). IsPodKnownTerminated(uid types.UID) bool // CouldHaveRunningContainers returns true before the pod workers have synced, // once the pod workers see the pod (syncPod could be called), and returns false // after the pod has been terminated (running containers guaranteed stopped). // // Intended for use by the kubelet config loops, but not subsystems, which should // use ShouldPod*(). CouldHaveRunningContainers(uid types.UID) bool // ShouldPodBeFinished returns true once SyncTerminatedPod completes // successfully - the provided pod UID it is known to the pod worker to // be terminated and have resources reclaimed. It returns false before the // pod workers have synced (syncPod could be called). Once the pod workers // have synced it returns false if the pod has a sync status until // SyncTerminatedPod completes successfully. If the pod workers have synced, // but the pod does not have a status it returns true. // // Intended for use by subsystem sync loops to avoid performing background setup // after termination has been requested for a pod. Callers must ensure that the // syncPod method is non-blocking when their data is absent. ShouldPodBeFinished(uid types.UID) bool // IsPodTerminationRequested returns true when pod termination has been requested // until the termination completes and the pod is removed from config. This should // not be used in cleanup loops because it will return false if the pod has already // been cleaned up - use ShouldPodContainersBeTerminating instead. Also, this method // may return true while containers are still being initialized by the pod worker. // // Intended for use by the kubelet sync* methods, but not subsystems, which should // use ShouldPod*(). IsPodTerminationRequested(uid types.UID) bool // ShouldPodContainersBeTerminating returns false before pod workers have synced, // or once a pod has started terminating. This check is similar to // ShouldPodRuntimeBeRemoved but is also true after pod termination is requested. // // Intended for use by subsystem sync loops to avoid performing background setup // after termination has been requested for a pod. Callers must ensure that the // syncPod method is non-blocking when their data is absent. ShouldPodContainersBeTerminating(uid types.UID) bool // ShouldPodRuntimeBeRemoved returns true if runtime managers within the Kubelet // should aggressively cleanup pod resources that are not containers or on disk // content, like attached volumes. This is true when a pod is not yet observed // by a worker after the first sync (meaning it can't be running yet) or after // all running containers are stopped. // TODO: Once pod logs are separated from running containers, this method should // be used to gate whether containers are kept. // // Intended for use by subsystem sync loops to know when to start tearing down // resources that are used by running containers. Callers should ensure that // runtime content they own is not required for post-termination - for instance // containers are required in docker to preserve pod logs until after the pod // is deleted. ShouldPodRuntimeBeRemoved(uid types.UID) bool // ShouldPodContentBeRemoved returns true if resource managers within the Kubelet // should aggressively cleanup all content related to the pod. This is true // during pod eviction (when we wish to remove that content to free resources) // as well as after the request to delete a pod has resulted in containers being // stopped (which is a more graceful action). Note that a deleting pod can still // be evicted. // // Intended for use by subsystem sync loops to know when to start tearing down // resources that are used by non-deleted pods. Content is generally preserved // until deletion+removal_from_etcd or eviction, although garbage collection // can free content when this method returns false. ShouldPodContentBeRemoved(uid types.UID) bool // IsPodForMirrorPodTerminatingByFullName returns true if a static pod with the // provided pod name is currently terminating and has yet to complete. It is // intended to be used only during orphan mirror pod cleanup to prevent us from // deleting a terminating static pod from the apiserver before the pod is shut // down. IsPodForMirrorPodTerminatingByFullName(podFullname string) bool } // podSyncer describes the core lifecyle operations of the pod state machine. A pod is first // synced until it naturally reaches termination (true is returned) or an external agent decides // the pod should be terminated. Once a pod should be terminating, SyncTerminatingPod is invoked // until it returns no error. Then the SyncTerminatedPod method is invoked until it exits without // error, and the pod is considered terminal. Implementations of this interface must be threadsafe // for simultaneous invocation of these methods for multiple pods. type podSyncer interface { // SyncPod configures the pod and starts and restarts all containers. If it returns true, the // pod has reached a terminal state and the presence of the error indicates succeeded or failed. // If an error is returned, the sync was not successful and should be rerun in the future. This // is a long running method and should exit early with context.Canceled if the context is canceled. SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) // SyncTerminatingPod attempts to ensure the pod's containers are no longer running and to collect // any final status. This method is repeatedly invoked with diminishing grace periods until it exits // without error. Once this method exits with no error other components are allowed to tear down // supporting resources like volumes and devices. If the context is canceled, the method should // return context.Canceled unless it has successfully finished, which may occur when a shorter // grace period is detected. SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error // SyncTerminatingRuntimePod is invoked when running containers are found that correspond to // a pod that is no longer known to the kubelet to terminate those containers. It should not // exit without error unless all containers are known to be stopped. SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error // SyncTerminatedPod is invoked after all running containers are stopped and is responsible // for releasing resources that should be executed right away rather than in the background. // Once it exits without error the pod is considered finished on the node. SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error } type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error type syncTerminatingRuntimePodFnType func(ctx context.Context, runningPod *kubecontainer.Pod) error type syncTerminatedPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error // podSyncerFuncs implements podSyncer and accepts functions for each method. type podSyncerFuncs struct { syncPod syncPodFnType syncTerminatingPod syncTerminatingPodFnType syncTerminatingRuntimePod syncTerminatingRuntimePodFnType syncTerminatedPod syncTerminatedPodFnType } func newPodSyncerFuncs(s podSyncer) podSyncerFuncs { return podSyncerFuncs{ syncPod: s.SyncPod, syncTerminatingPod: s.SyncTerminatingPod, syncTerminatingRuntimePod: s.SyncTerminatingRuntimePod, syncTerminatedPod: s.SyncTerminatedPod, } } var _ podSyncer = podSyncerFuncs{} func (f podSyncerFuncs) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { return f.syncPod(ctx, updateType, pod, mirrorPod, podStatus) } func (f podSyncerFuncs) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { return f.syncTerminatingPod(ctx, pod, podStatus, gracePeriod, podStatusFn) } func (f podSyncerFuncs) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error { return f.syncTerminatingRuntimePod(ctx, runningPod) } func (f podSyncerFuncs) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { return f.syncTerminatedPod(ctx, pod, podStatus) } const ( // jitter factor for resyncInterval workerResyncIntervalJitterFactor = 0.5 // jitter factor for backOffPeriod and backOffOnTransientErrorPeriod workerBackOffPeriodJitterFactor = 0.5 // backoff period when transient error occurred. backOffOnTransientErrorPeriod = time.Second ) // podSyncStatus tracks per-pod transitions through the three phases of pod // worker sync (setup, terminating, terminated). type podSyncStatus struct { // ctx is the context that is associated with the current pod sync. // TODO: remove this from the struct by having the context initialized // in startPodSync, the cancelFn used by UpdatePod, and cancellation of // a parent context for tearing down workers (if needed) on shutdown ctx context.Context // cancelFn if set is expected to cancel the current podSyncer operation. cancelFn context.CancelFunc // fullname of the pod fullname string // working is true if an update is pending or being worked by a pod worker // goroutine. working bool // pendingUpdate is the updated state the pod worker should observe. It is // cleared and moved to activeUpdate when a pod worker reads it. A new update // may always replace a pending update as the pod worker does not guarantee // that all intermediate states are synced to a worker, only the most recent. // This state will not be visible to downstream components until a pod worker // has begun processing it. pendingUpdate *UpdatePodOptions // activeUpdate is the most recent version of the pod's state that will be // passed to a sync*Pod function. A pod becomes visible to downstream components // once a worker decides to start a pod (startedAt is set). The pod and mirror // pod fields are accumulated if they are missing on a particular call (the last // known version), and the value of KillPodOptions is accumulated as pods cannot // have their grace period shortened. This is the source of truth for the pod spec // the kubelet is reconciling towards for all components that act on running pods. activeUpdate *UpdatePodOptions // syncedAt is the time at which the pod worker first observed this pod. syncedAt time.Time // startedAt is the time at which the pod worker allowed the pod to start. startedAt time.Time // terminatingAt is set once the pod is requested to be killed - note that // this can be set before the pod worker starts terminating the pod, see // terminating. terminatingAt time.Time // terminatedAt is set once the pod worker has completed a successful // syncTerminatingPod call and means all running containers are stopped. terminatedAt time.Time // gracePeriod is the requested gracePeriod once terminatingAt is nonzero. gracePeriod int64 // notifyPostTerminating will be closed once the pod transitions to // terminated. After the pod is in terminated state, nothing should be // added to this list. notifyPostTerminating []chan<- struct{} // statusPostTerminating is a list of the status changes associated // with kill pod requests. After the pod is in terminated state, nothing // should be added to this list. The worker will execute the last function // in this list on each termination attempt. statusPostTerminating []PodStatusFunc // startedTerminating is true once the pod worker has observed the request to // stop a pod (exited syncPod and observed a podWork with WorkType // TerminatingPod). Once this is set, it is safe for other components // of the kubelet to assume that no other containers may be started. startedTerminating bool // deleted is true if the pod has been marked for deletion on the apiserver // or has no configuration represented (was deleted before). deleted bool // evicted is true if the kill indicated this was an eviction (an evicted // pod can be more aggressively cleaned up). evicted bool // finished is true once the pod worker completes for a pod // (syncTerminatedPod exited with no errors) until SyncKnownPods is invoked // to remove the pod. A terminal pod (Succeeded/Failed) will have // termination status until the pod is deleted. finished bool // restartRequested is true if the pod worker was informed the pod is // expected to exist (update type of create, update, or sync) after // it has been killed. When known pods are synced, any pod that is // terminated and has restartRequested will have its history cleared. restartRequested bool // observedRuntime is true if the pod has been observed to be present in the // runtime. A pod that has been observed at runtime must go through either // SyncTerminatingRuntimePod or SyncTerminatingPod. Otherwise, we can avoid // invoking the terminating methods if the pod is deleted or orphaned before // it has been started. observedRuntime bool } func (s *podSyncStatus) IsWorking() bool { return s.working } func (s *podSyncStatus) IsTerminationRequested() bool { return !s.terminatingAt.IsZero() } func (s *podSyncStatus) IsTerminationStarted() bool { return s.startedTerminating } func (s *podSyncStatus) IsTerminated() bool { return !s.terminatedAt.IsZero() } func (s *podSyncStatus) IsFinished() bool { return s.finished } func (s *podSyncStatus) IsEvicted() bool { return s.evicted } func (s *podSyncStatus) IsDeleted() bool { return s.deleted } func (s *podSyncStatus) IsStarted() bool { return !s.startedAt.IsZero() } // WorkType returns this pods' current state of the pod in pod lifecycle state machine. func (s *podSyncStatus) WorkType() PodWorkerState { if s.IsTerminated() { return TerminatedPod } if s.IsTerminationRequested() { return TerminatingPod } return SyncPod } // mergeLastUpdate records the most recent state from a new update. Pod and MirrorPod are // incremented. KillPodOptions is accumulated. If RunningPod is set, Pod is synthetic and // will *not* be used as the last pod state unless no previous pod state exists (because // the pod worker may be responsible for terminating a pod from a previous run of the // kubelet where no config state is visible). The contents of activeUpdate are used as the // source of truth for components downstream of the pod workers. func (s *podSyncStatus) mergeLastUpdate(other UpdatePodOptions) { opts := s.activeUpdate if opts == nil { opts = &UpdatePodOptions{} s.activeUpdate = opts } // UpdatePodOptions states (and UpdatePod enforces) that either Pod or RunningPod // is set, and we wish to preserve the most recent Pod we have observed, so only // overwrite our Pod when we have no Pod or when RunningPod is nil. if opts.Pod == nil || other.RunningPod == nil { opts.Pod = other.Pod } // running pods will not persist but will be remembered for replay opts.RunningPod = other.RunningPod // if mirrorPod was not provided, remember the last one for replay if other.MirrorPod != nil { opts.MirrorPod = other.MirrorPod } // accumulate kill pod options if other.KillPodOptions != nil { opts.KillPodOptions = &KillPodOptions{} if other.KillPodOptions.Evict { opts.KillPodOptions.Evict = true } if override := other.KillPodOptions.PodTerminationGracePeriodSecondsOverride; override != nil { value := *override opts.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &value } } // StartTime is not copied - that is purely for tracking latency of config propagation // from kubelet to pod worker. } // podWorkers keeps track of operations on pods and ensures each pod is // reconciled with the container runtime and other subsystems. The worker // also tracks which pods are in flight for starting, which pods are // shutting down but still have running containers, and which pods have // terminated recently and are guaranteed to have no running containers. // // podWorkers is the source of truth for what pods should be active on a // node at any time, and is kept up to date with the desired state of the // node (tracked by the kubelet pod config loops and the state in the // kubelet's podManager) via the UpdatePod method. Components that act // upon running pods should look to the pod worker for state instead of the // kubelet podManager. The pod worker is periodically reconciled with the // state of the podManager via SyncKnownPods() and is responsible for // ensuring the completion of all observed pods no longer present in // the podManager (no longer part of the node's desired config). // // A pod passed to a pod worker is either being synced (expected to be // running), terminating (has running containers but no new containers are // expected to start), terminated (has no running containers but may still // have resources being consumed), or cleaned up (no resources remaining). // Once a pod is set to be "torn down" it cannot be started again for that // UID (corresponding to a delete or eviction) until: // // 1. The pod worker is finalized (syncTerminatingPod and // syncTerminatedPod exit without error sequentially) // 2. The SyncKnownPods method is invoked by kubelet housekeeping and the pod // is not part of the known config. // // Pod workers provide a consistent source of information to other kubelet // loops about the status of the pod and whether containers can be // running. The ShouldPodContentBeRemoved() method tracks whether a pod's // contents should still exist, which includes non-existent pods after // SyncKnownPods() has been called once (as per the contract, all existing // pods should be provided via UpdatePod before SyncKnownPods is invoked). // Generally other sync loops are expected to separate "setup" and // "teardown" responsibilities and the information methods here assist in // each by centralizing that state. A simple visualization of the time // intervals involved might look like: // // ---| = kubelet config has synced at least once // -------| |- = pod exists in apiserver config // --------| |---------------- = CouldHaveRunningContainers() is true // // ^- pod is observed by pod worker . // . . // // ----------| |------------------------- = syncPod is running // // . ^- pod worker loop sees change and invokes syncPod // . . . // // --------------| |------- = ShouldPodContainersBeTerminating() returns true // --------------| |------- = IsPodTerminationRequested() returns true (pod is known) // // . . ^- Kubelet evicts pod . // . . . // // -------------------| |---------------- = syncTerminatingPod runs then exits without error // // . . ^ pod worker loop exits syncPod, sees pod is terminating, // . . invokes syncTerminatingPod // . . . // // ---| |------------------| . = ShouldPodRuntimeBeRemoved() returns true (post-sync) // // . ^ syncTerminatingPod has exited successfully // . . // // ----------------------------| |------- = syncTerminatedPod runs then exits without error // // . ^ other loops can tear down // . . // // ------------------------------------| |---- = status manager is waiting for SyncTerminatedPod() finished // // . ^ . // // ----------| |- = status manager can be writing pod status // // ^ status manager deletes pod because no longer exists in config // // Other components in the Kubelet can request a termination of the pod // via the UpdatePod method or the killPodNow wrapper - this will ensure // the components of the pod are stopped until the kubelet is restarted // or permanently (if the phase of the pod is set to a terminal phase // in the pod status change). type podWorkers struct { // Protects all per worker fields. podLock sync.Mutex // podsSynced is true once the pod worker has been synced at least once, // which means that all working pods have been started via UpdatePod(). podsSynced bool // Tracks all running per-pod goroutines - per-pod goroutine will be // processing updates received through its corresponding channel. Sending // a message on this channel will signal the corresponding goroutine to // consume podSyncStatuses[uid].pendingUpdate if set. podUpdates map[types.UID]chan struct{} // Tracks by UID the termination status of a pod - syncing, terminating, // terminated, and evicted. podSyncStatuses map[types.UID]*podSyncStatus // Tracks all uids for started static pods by full name startedStaticPodsByFullname map[string]types.UID // Tracks all uids for static pods that are waiting to start by full name waitingToStartStaticPodsByFullname map[string][]types.UID workQueue queue.WorkQueue // This function is run to sync the desired state of pod. // NOTE: This function has to be thread-safe - it can be called for // different pods at the same time. podSyncer podSyncer // workerChannelFn is exposed for testing to allow unit tests to impose delays // in channel communication. The function is invoked once each time a new worker // goroutine starts. workerChannelFn func(uid types.UID, in chan struct{}) (out <-chan struct{}) // The EventRecorder to use recorder record.EventRecorder // backOffPeriod is the duration to back off when there is a sync error. backOffPeriod time.Duration // resyncInterval is the duration to wait until the next sync. resyncInterval time.Duration // podCache stores kubecontainer.PodStatus for all pods. podCache kubecontainer.Cache // clock is used for testing timing clock clock.PassiveClock } func newPodWorkers( podSyncer podSyncer, recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache, ) PodWorkers { return &podWorkers{ podSyncStatuses: map[types.UID]*podSyncStatus{}, podUpdates: map[types.UID]chan struct{}{}, startedStaticPodsByFullname: map[string]types.UID{}, waitingToStartStaticPodsByFullname: map[string][]types.UID{}, podSyncer: podSyncer, recorder: recorder, workQueue: workQueue, resyncInterval: resyncInterval, backOffPeriod: backOffPeriod, podCache: podCache, clock: clock.RealClock{}, } } func (p *podWorkers) IsPodKnownTerminated(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { return status.IsTerminated() } // if the pod is not known, we return false (pod worker is not aware of it) return false } func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { return !status.IsTerminated() } // once all pods are synced, any pod without sync status is known to not be running. return !p.podsSynced } func (p *podWorkers) ShouldPodBeFinished(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { return status.IsFinished() } // once all pods are synced, any pod without sync status is assumed to // have SyncTerminatedPod finished. return p.podsSynced } func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { // the pod may still be setting up at this point. return status.IsTerminationRequested() } // an unknown pod is considered not to be terminating (use ShouldPodContainersBeTerminating in // cleanup loops to avoid failing to cleanup pods that have already been removed from config) return false } func (p *podWorkers) ShouldPodContainersBeTerminating(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { // we wait until the pod worker goroutine observes the termination, which means syncPod will not // be executed again, which means no new containers can be started return status.IsTerminationStarted() } // once we've synced, if the pod isn't known to the workers we should be tearing them // down return p.podsSynced } func (p *podWorkers) ShouldPodRuntimeBeRemoved(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { return status.IsTerminated() } // a pod that hasn't been sent to the pod worker yet should have no runtime components once we have // synced all content. return p.podsSynced } func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[uid]; ok { return status.IsEvicted() || (status.IsDeleted() && status.IsTerminated()) } // a pod that hasn't been sent to the pod worker yet should have no content on disk once we have // synced all content. return p.podsSynced } func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool { p.podLock.Lock() defer p.podLock.Unlock() uid, started := p.startedStaticPodsByFullname[podFullName] if !started { return false } status, exists := p.podSyncStatuses[uid] if !exists { return false } if !status.IsTerminationRequested() || status.IsTerminated() { return false } return true } func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { for _, container := range status.ContainerStatuses { if container.State == kubecontainer.ContainerStateRunning { return false } } for _, sb := range status.SandboxStatuses { if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY { return false } } return true } // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable, // terminating, or terminated, and will transition to terminating if: deleted on the apiserver, // discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet. func (p *podWorkers) UpdatePod(options UpdatePodOptions) { // Handle when the pod is an orphan (no config) and we only have runtime status by running only // the terminating part of the lifecycle. A running pod contains only a minimal set of information // about the pod var isRuntimePod bool var uid types.UID var name, ns string if runningPod := options.RunningPod; runningPod != nil { if options.Pod == nil { // the sythetic pod created here is used only as a placeholder and not tracked if options.UpdateType != kubetypes.SyncPodKill { klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType) return } uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name isRuntimePod = true } else { options.RunningPod = nil uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) } } else { uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name } p.podLock.Lock() defer p.podLock.Unlock() // decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it var firstTime bool now := p.clock.Now() status, ok := p.podSyncStatuses[uid] if !ok { klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) firstTime = true status = &podSyncStatus{ syncedAt: now, fullname: kubecontainer.BuildPodFullName(name, ns), } // if this pod is being synced for the first time, we need to make sure it is an active pod if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) { // Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated. // This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523 // However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set. if statusCache, err := p.podCache.Get(uid); err == nil { if isPodStatusCacheTerminal(statusCache) { // At this point we know: // (1) The pod is terminal based on the config source. // (2) The pod is terminal based on the runtime cache. // This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart. // These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod. // As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run. status = &podSyncStatus{ terminatedAt: now, terminatingAt: now, syncedAt: now, startedTerminating: true, finished: false, fullname: kubecontainer.BuildPodFullName(name, ns), } } } } p.podSyncStatuses[uid] = status } // RunningPods represent an unknown pod execution and don't contain pod spec information // sufficient to perform any action other than termination. If we received a RunningPod // after a real pod has already been provided, use the most recent spec instead. Also, // once we observe a runtime pod we must drive it to completion, even if we weren't the // ones who started it. pod := options.Pod if isRuntimePod { status.observedRuntime = true switch { case status.pendingUpdate != nil && status.pendingUpdate.Pod != nil: pod = status.pendingUpdate.Pod options.Pod = pod options.RunningPod = nil case status.activeUpdate != nil && status.activeUpdate.Pod != nil: pod = status.activeUpdate.Pod options.Pod = pod options.RunningPod = nil default: // we will continue to use RunningPod.ToAPIPod() as pod here, but // options.Pod will be nil and other methods must handle that appropriately. pod = options.RunningPod.ToAPIPod() } } // When we see a create update on an already terminating pod, that implies two pods with the same UID were created in // close temporal proximity (usually static pod but it's possible for an apiserver to extremely rarely do something // similar) - flag the sync status to indicate that after the pod terminates it should be reset to "not running" to // allow a subsequent add/update to start the pod worker again. This does not apply to the first time we see a pod, // such as when the kubelet restarts and we see already terminated pods for the first time. if !firstTime && status.IsTerminationRequested() { if options.UpdateType == kubetypes.SyncPodCreate { status.restartRequested = true klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) return } } // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping) if status.IsFinished() { klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) return } // check for a transition to terminating var becameTerminating bool if !status.IsTerminationRequested() { switch { case isRuntimePod: klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.deleted = true status.terminatingAt = now becameTerminating = true case pod.DeletionTimestamp != nil: klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.deleted = true status.terminatingAt = now becameTerminating = true case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded: klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.terminatingAt = now becameTerminating = true case options.UpdateType == kubetypes.SyncPodKill: if options.KillPodOptions != nil && options.KillPodOptions.Evict { klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) status.evicted = true } else { klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) } status.terminatingAt = now becameTerminating = true } } // once a pod is terminating, all updates are kills and the grace period can only decrease var wasGracePeriodShortened bool switch { case status.IsTerminated(): // A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request // due to housekeeping seeing an older cached version of the runtime pod simply ignore it until // after the pod worker completes. if isRuntimePod { klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) return } if options.KillPodOptions != nil { if ch := options.KillPodOptions.CompletedCh; ch != nil { close(ch) } } options.KillPodOptions = nil case status.IsTerminationRequested(): if options.KillPodOptions == nil { options.KillPodOptions = &KillPodOptions{} } if ch := options.KillPodOptions.CompletedCh; ch != nil { status.notifyPostTerminating = append(status.notifyPostTerminating, ch) } if fn := options.KillPodOptions.PodStatusFunc; fn != nil { status.statusPostTerminating = append(status.statusPostTerminating, fn) } gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions) wasGracePeriodShortened = gracePeriodShortened status.gracePeriod = gracePeriod // always set the grace period for syncTerminatingPod so we don't have to recalculate, // will never be zero. options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod default: // KillPodOptions is not valid for sync actions outside of the terminating phase if options.KillPodOptions != nil { if ch := options.KillPodOptions.CompletedCh; ch != nil { close(ch) } options.KillPodOptions = nil } } // start the pod worker goroutine if it doesn't exist podUpdates, exists := p.podUpdates[uid] if !exists { // buffer the channel to avoid blocking this method podUpdates = make(chan struct{}, 1) p.podUpdates[uid] = podUpdates // ensure that static pods start in the order they are received by UpdatePod if kubetypes.IsStaticPod(pod) { p.waitingToStartStaticPodsByFullname[status.fullname] = append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) } // allow testing of delays in the pod update channel var outCh <-chan struct{} if p.workerChannelFn != nil { outCh = p.workerChannelFn(uid, podUpdates) } else { outCh = podUpdates } // spawn a pod worker go func() { // TODO: this should be a wait.Until with backoff to handle panics, and // accept a context for shutdown defer runtime.HandleCrash() defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid) p.podWorkerLoop(uid, outCh) }() } // measure the maximum latency between a call to UpdatePod and when the pod worker reacts to it // by preserving the oldest StartTime if status.pendingUpdate != nil && !status.pendingUpdate.StartTime.IsZero() && status.pendingUpdate.StartTime.Before(options.StartTime) { options.StartTime = status.pendingUpdate.StartTime } // notify the pod worker there is a pending update status.pendingUpdate = &options status.working = true klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) select { case podUpdates <- struct{}{}: default: } if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil { klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) status.cancelFn() return } } // calculateEffectiveGracePeriod sets the initial grace period for a newly terminating pod or allows a // shorter grace period to be provided, returning the desired value. func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *KillPodOptions) (int64, bool) { // enforce the restriction that a grace period can only decrease and track whatever our value is, // then ensure a calculated value is passed down to lower levels gracePeriod := status.gracePeriod // this value is bedrock truth - the apiserver owns telling us this value calculated by apiserver if override := pod.DeletionGracePeriodSeconds; override != nil { if gracePeriod == 0 || *override < gracePeriod { gracePeriod = *override } } // we allow other parts of the kubelet (namely eviction) to request this pod be terminated faster if options != nil { if override := options.PodTerminationGracePeriodSecondsOverride; override != nil { if gracePeriod == 0 || *override < gracePeriod { gracePeriod = *override } } } // make a best effort to default this value to the pod's desired intent, in the event // the kubelet provided no requested value (graceful termination?) if gracePeriod == 0 && pod.Spec.TerminationGracePeriodSeconds != nil { gracePeriod = *pod.Spec.TerminationGracePeriodSeconds } // no matter what, we always supply a grace period of 1 if gracePeriod < 1 { gracePeriod = 1 } return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod } // allowPodStart tries to start the pod and returns true if allowed, otherwise // it requeues the pod and returns false. If the pod will never be able to start // because data is missing, or the pod was terminated before start, canEverStart // is false. This method can only be called while holding the pod lock. func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) { if !kubetypes.IsStaticPod(pod) { // TODO: Do we want to allow non-static pods with the same full name? // Note that it may disable the force deletion of pods. return true, true } status, ok := p.podSyncStatuses[pod.UID] if !ok { klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID) return false, false } if status.IsTerminationRequested() { return false, false } if !p.allowStaticPodStart(status.fullname, pod.UID) { p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) return false, true } return true, true } // allowStaticPodStart tries to start the static pod and returns true if // 1. there are no other started static pods with the same fullname // 2. the uid matches that of the first valid static pod waiting to start func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool { startedUID, started := p.startedStaticPodsByFullname[fullname] if started { return startedUID == uid } waitingPods := p.waitingToStartStaticPodsByFullname[fullname] // TODO: This is O(N) with respect to the number of updates to static pods // with overlapping full names, and ideally would be O(1). for i, waitingUID := range waitingPods { // has pod already terminated or been deleted? status, ok := p.podSyncStatuses[waitingUID] if !ok || status.IsTerminationRequested() || status.IsTerminated() { continue } // another pod is next in line if waitingUID != uid { p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:] return false } // we are up next, remove ourselves waitingPods = waitingPods[i+1:] break } if len(waitingPods) != 0 { p.waitingToStartStaticPodsByFullname[fullname] = waitingPods } else { delete(p.waitingToStartStaticPodsByFullname, fullname) } p.startedStaticPodsByFullname[fullname] = uid return true } // cleanupUnstartedPod is invoked if a pod that has never been started receives a termination // signal before it can be started. This method must be called holding the pod lock. func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) { p.cleanupPodUpdates(pod.UID) if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) } if !status.terminatedAt.IsZero() { klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) } status.finished = true status.working = false status.terminatedAt = p.clock.Now() if p.startedStaticPodsByFullname[status.fullname] == pod.UID { delete(p.startedStaticPodsByFullname, status.fullname) } } // startPodSync is invoked by each pod worker goroutine when a message arrives on the pod update channel. // This method consumes a pending update, initializes a context, decides whether the pod is already started // or can be started, and updates the cached pod state so that downstream components can observe what the // pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any // of the boolean values is false, ensure the appropriate cleanup happens before returning. // // This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate, // or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started // should never have an activeUpdate because that is exposed to downstream components on started pods. func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) { p.podLock.Lock() defer p.podLock.Unlock() // verify we are known to the pod worker still status, ok := p.podSyncStatuses[podUID] if !ok { // pod status has disappeared, the worker should exit klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID) return nil, update, false, false, false } if !status.working { // working is used by unit tests to observe whether a worker is currently acting on this pod klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID) } if status.pendingUpdate == nil { // no update available, this means we were queued without work being added or there is a // race condition, both of which are unexpected status.working = false klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID) return nil, update, false, false, false } // consume the pending update update.WorkType = status.WorkType() update.Options = *status.pendingUpdate status.pendingUpdate = nil select { case <-p.podUpdates[podUID]: // ensure the pod update channel is empty (it is only ever written to under lock) default: } // initialize a context for the worker if one does not exist if status.ctx == nil || status.ctx.Err() == context.Canceled { status.ctx, status.cancelFn = context.WithCancel(context.Background()) } ctx = status.ctx // if we are already started, make our state visible to downstream components if status.IsStarted() { status.mergeLastUpdate(update.Options) return ctx, update, true, true, true } // if we are already terminating and we only have a running pod, allow the worker // to "start" since we are immediately moving to terminating if update.Options.RunningPod != nil && update.WorkType == TerminatingPod { status.mergeLastUpdate(update.Options) return ctx, update, true, true, true } // If we receive an update where Pod is nil (running pod is set) but haven't // started yet, we can only terminate the pod, not start it. We should not be // asked to start such a pod, but guard here just in case an accident occurs. if update.Options.Pod == nil { status.mergeLastUpdate(update.Options) klog.V(4).InfoS("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) return ctx, update, false, false, true } // verify we can start canStart, canEverStart = p.allowPodStart(update.Options.Pod) switch { case !canEverStart: p.cleanupUnstartedPod(update.Options.Pod, status) status.working = false if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) } klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) return ctx, update, canStart, canEverStart, true case !canStart: // this is the only path we don't start the pod, so we need to put the change back in pendingUpdate status.pendingUpdate = &update.Options status.working = false klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID) return ctx, update, canStart, canEverStart, true } // mark the pod as started status.startedAt = p.clock.Now() status.mergeLastUpdate(update.Options) // If we are admitting the pod and it is new, record the count of containers // TODO: We should probably move this into syncPod and add an execution count // to the syncPod arguments, and this should be recorded on the first sync. // Leaving it here complicates a particularly important loop. metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers))) return ctx, update, true, true, true } func podUIDAndRefForUpdate(update UpdatePodOptions) (types.UID, klog.ObjectRef) { if update.RunningPod != nil { return update.RunningPod.ID, klog.KObj(update.RunningPod.ToAPIPod()) } return update.Pod.UID, klog.KObj(update.Pod) } // podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final // state is reached. The loop is responsible for driving the pod through four main phases: // // 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time // 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod // 3. Terminating, ensuring all running containers in the pod are stopped // 4. Terminated, cleaning up any resources that must be released before the pod can be deleted // // The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular // sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet // to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the // caller. When a pod transitions working->terminating or terminating->terminated, the next update is // queued immediately and no kubelet action is required. func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) { var lastSyncTime time.Time for range podUpdates { ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID) // If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate. if !ok { continue } // If the pod was terminated prior to the pod being allowed to start, we exit the loop. if !canEverStart { return } // If the pod is not yet ready to start, continue and wait for more updates. if !canStart { continue } podUID, podRef := podUIDAndRefForUpdate(update.Options) klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) var isTerminal bool err := func() error { // The worker is responsible for ensuring the sync method sees the appropriate // status updates on resyncs (the result of the last sync), transitions to // terminating (no wait), or on terminated (whatever the most recent state is). // Only syncing and terminating can generate pod status changes, while terminated // pods ensure the most recent status makes it to the api server. var status *kubecontainer.PodStatus var err error switch { case update.Options.RunningPod != nil: // when we receive a running pod, we don't need status at all because we are // guaranteed to be terminating and we skip updates to the pod default: // wait until we see the next refresh from the PLEG via the cache (max 2s) // TODO: this adds ~1s of latency on all transitions from sync to terminating // to terminated, and on all termination retries (including evictions). We should // improve latency by making the pleg continuous and by allowing pod status // changes to be refreshed when key events happen (killPod, sync->terminating). // Improving this latency also reduces the possibility that a terminated // container's status is garbage collected before we have a chance to update the // API server (thus losing the exit code). status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime) if err != nil { // This is the legacy event thrown by manage pod loop all other events are now dispatched // from syncPodFn p.recorder.Eventf(update.Options.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) return err } } // Take the appropriate action (illegal phases are prevented by UpdatePod) switch { case update.WorkType == TerminatedPod: err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status) case update.WorkType == TerminatingPod: var gracePeriod *int64 if opt := update.Options.KillPodOptions; opt != nil { gracePeriod = opt.PodTerminationGracePeriodSecondsOverride } podStatusFn := p.acknowledgeTerminating(podUID) // if we only have a running pod, terminate it directly if update.Options.RunningPod != nil { err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod) } else { err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn) } default: isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status) } lastSyncTime = p.clock.Now() return err }() var phaseTransition bool switch { case err == context.Canceled: // when the context is cancelled we expect an update to already be queued klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) case err != nil: // we will queue a retry klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID) case update.WorkType == TerminatedPod: // we can shut down the worker p.completeTerminated(podUID) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) } klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) return case update.WorkType == TerminatingPod: // pods that don't exist in config don't need to be terminated, other loops will clean them up if update.Options.RunningPod != nil { p.completeTerminatingRuntimePod(podUID) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) return } // otherwise we move to the terminating phase p.completeTerminating(podUID) phaseTransition = true case isTerminal: // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) p.completeSync(podUID) phaseTransition = true } // queue a retry if necessary, then put the next event in the channel if any p.completeWork(podUID, phaseTransition, err) if start := update.Options.StartTime; !start.IsZero() { metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) } klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) } } // acknowledgeTerminating sets the terminating flag on the pod status once the pod worker sees // the termination state so that other components know no new containers will be started in this // pod. It then returns the status function, if any, that applies to this pod. func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc { p.podLock.Lock() defer p.podLock.Unlock() status, ok := p.podSyncStatuses[podUID] if !ok { return nil } if !status.terminatingAt.IsZero() && !status.startedTerminating { klog.V(4).InfoS("Pod worker has observed request to terminate", "podUID", podUID) status.startedTerminating = true } if l := len(status.statusPostTerminating); l > 0 { return status.statusPostTerminating[l-1] } return nil } // completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should // be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways // exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by // UpdatePod. func (p *podWorkers) completeSync(podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID) status, ok := p.podSyncStatuses[podUID] if !ok { klog.V(4).InfoS("Pod had no status in completeSync, programmer error?", "podUID", podUID) return } // update the status of the pod if status.terminatingAt.IsZero() { status.terminatingAt = p.clock.Now() } else { klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID) } status.startedTerminating = true // the pod has now transitioned to terminating and we want to run syncTerminatingPod // as soon as possible, so if no update is already waiting queue a synthetic update p.requeueLastPodUpdate(podUID, status) } // completeTerminating is invoked when syncTerminatingPod completes successfully, which means // no container is running, no container will be started in the future, and we are ready for // cleanup. This updates the termination state which prevents future syncs and will ensure // other kubelet loops know this pod is not running any containers. func (p *podWorkers) completeTerminating(podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID) status, ok := p.podSyncStatuses[podUID] if !ok { return } // update the status of the pod if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) } status.terminatedAt = p.clock.Now() for _, ch := range status.notifyPostTerminating { close(ch) } status.notifyPostTerminating = nil status.statusPostTerminating = nil // the pod has now transitioned to terminated and we want to run syncTerminatedPod // as soon as possible, so if no update is already waiting queue a synthetic update p.requeueLastPodUpdate(podUID, status) } // completeTerminatingRuntimePod is invoked when syncTerminatingPod completes successfully, // which means an orphaned pod (no config) is terminated and we can exit. Since orphaned // pods have no API representation, we want to exit the loop at this point and ensure no // status is present afterwards - the running pod is truly terminated when this is invoked. func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID) p.cleanupPodUpdates(podUID) status, ok := p.podSyncStatuses[podUID] if !ok { return } if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) } status.terminatedAt = p.clock.Now() status.finished = true status.working = false if p.startedStaticPodsByFullname[status.fullname] == podUID { delete(p.startedStaticPodsByFullname, status.fullname) } // A runtime pod is transient and not part of the desired state - once it has reached // terminated we can abandon tracking it. delete(p.podSyncStatuses, podUID) } // completeTerminated is invoked after syncTerminatedPod completes successfully and means we // can stop the pod worker. The pod is finalized at this point. func (p *podWorkers) completeTerminated(podUID types.UID) { p.podLock.Lock() defer p.podLock.Unlock() klog.V(4).InfoS("Pod is complete and the worker can now stop", "podUID", podUID) p.cleanupPodUpdates(podUID) status, ok := p.podSyncStatuses[podUID] if !ok { return } if status.terminatingAt.IsZero() { klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID) } if status.terminatedAt.IsZero() { klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID) } status.finished = true status.working = false if p.startedStaticPodsByFullname[status.fullname] == podUID { delete(p.startedStaticPodsByFullname, status.fullname) } } // completeWork requeues on error or the next sync interval and then immediately executes any pending // work. func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncErr error) { // Requeue the last update if the last sync returned error. switch { case phaseTransition: p.workQueue.Enqueue(podUID, 0) case syncErr == nil: // No error; requeue at the regular resync interval. p.workQueue.Enqueue(podUID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor)) case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg): // Network is not ready; back off for short period of time and retry as network might be ready soon. p.workQueue.Enqueue(podUID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor)) default: // Error occurred during the sync; back off and then retry. p.workQueue.Enqueue(podUID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) } // if there is a pending update for this worker, requeue immediately, otherwise // clear working status p.podLock.Lock() defer p.podLock.Unlock() if status, ok := p.podSyncStatuses[podUID]; ok { if status.pendingUpdate != nil { select { case p.podUpdates[podUID] <- struct{}{}: klog.V(4).InfoS("Requeueing pod due to pending update", "podUID", podUID) default: klog.V(4).InfoS("Pending update already queued", "podUID", podUID) } } else { status.working = false } } } // SyncKnownPods will purge any fully terminated pods that are not in the desiredPods // list, which means SyncKnownPods must be called in a threadsafe manner from calls // to UpdatePods for new pods. Because the podworker is dependent on UpdatePod being // invoked to drive a pod's state machine, if a pod is missing in the desired list the // pod worker must be responsible for delivering that update. The method returns a map // of known workers that are not finished with a value of SyncPodTerminated, // SyncPodKill, or SyncPodSync depending on whether the pod is terminated, terminating, // or syncing. func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync { workers := make(map[types.UID]PodWorkerSync) known := make(map[types.UID]struct{}) for _, pod := range desiredPods { known[pod.UID] = struct{}{} } p.podLock.Lock() defer p.podLock.Unlock() p.podsSynced = true for uid, status := range p.podSyncStatuses { // We retain the worker history of any pod that is still desired according to // its UID. However, there are two scenarios during a sync that result in us // needing to purge the history: // // 1. The pod is no longer desired (the local version is orphaned) // 2. The pod received a kill update and then a subsequent create, which means // the UID was reused in the source config (vanishingly rare for API servers, // common for static pods that have specified a fixed UID) // // In the former case we wish to bound the amount of information we store for // deleted pods. In the latter case we wish to minimize the amount of time before // we restart the static pod. If we succeed at removing the worker, then we // omit it from the returned map of known workers, and the caller of SyncKnownPods // is expected to send a new UpdatePod({UpdateType: Create}). _, knownPod := known[uid] orphan := !knownPod if status.restartRequested || orphan { if p.removeTerminatedWorker(uid, status, orphan) { // no worker running, we won't return it continue } } sync := PodWorkerSync{ State: status.WorkType(), Orphan: orphan, } switch { case status.activeUpdate != nil: if status.activeUpdate.Pod != nil { sync.HasConfig = true sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod) } case status.pendingUpdate != nil: if status.pendingUpdate.Pod != nil { sync.HasConfig = true sync.Static = kubetypes.IsStaticPod(status.pendingUpdate.Pod) } } workers[uid] = sync } return workers } // removeTerminatedWorker cleans up and removes the worker status for a worker // that has reached a terminal state of "finished" - has successfully exited // syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be // recreated with the same UID. The kubelet preserves state about recently // terminated pods to prevent accidentally restarting a terminal pod, which is // proportional to the number of pods described in the pod config. The method // returns true if the worker was completely removed. func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus, orphaned bool) bool { if !status.finished { // If the pod worker has not reached terminal state and the pod is still known, we wait. if !orphaned { klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) return false } // all orphaned pods are considered deleted status.deleted = true // When a pod is no longer in the desired set, the pod is considered orphaned and the // the pod worker becomes responsible for driving the pod to completion (there is no // guarantee another component will notify us of updates). switch { case !status.IsStarted() && !status.observedRuntime: // The pod has not been started, which means we can safely clean up the pod - the // pod worker will shutdown as a result of this change without executing a sync. klog.V(4).InfoS("Pod is orphaned and has not been started", "podUID", uid) case !status.IsTerminationRequested(): // The pod has been started but termination has not been requested - set the appropriate // timestamp and notify the pod worker. Because the pod has been synced at least once, // the value of status.activeUpdate will be the fallback for the next sync. status.terminatingAt = p.clock.Now() if status.activeUpdate != nil && status.activeUpdate.Pod != nil { status.gracePeriod, _ = calculateEffectiveGracePeriod(status, status.activeUpdate.Pod, nil) } else { status.gracePeriod = 1 } p.requeueLastPodUpdate(uid, status) klog.V(4).InfoS("Pod is orphaned and still running, began terminating", "podUID", uid) return false default: // The pod is already moving towards termination, notify the pod worker. Because the pod // has been synced at least once, the value of status.activeUpdate will be the fallback for // the next sync. p.requeueLastPodUpdate(uid, status) klog.V(4).InfoS("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid) return false } } if status.restartRequested { klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid) } else { klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid) } delete(p.podSyncStatuses, uid) p.cleanupPodUpdates(uid) if p.startedStaticPodsByFullname[status.fullname] == uid { delete(p.startedStaticPodsByFullname, status.fullname) } return true } // killPodNow returns a KillPodFunc that can be used to kill a pod. // It is intended to be injected into other modules that need to kill a pod. func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc { return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error { // determine the grace period to use when killing the pod gracePeriod := int64(0) if gracePeriodOverride != nil { gracePeriod = *gracePeriodOverride } else if pod.Spec.TerminationGracePeriodSeconds != nil { gracePeriod = *pod.Spec.TerminationGracePeriodSeconds } // we timeout and return an error if we don't get a callback within a reasonable time. // the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill) timeout := gracePeriod + (gracePeriod / 2) minTimeout := int64(10) if timeout < minTimeout { timeout = minTimeout } timeoutDuration := time.Duration(timeout) * time.Second // open a channel we block against until we get a result ch := make(chan struct{}, 1) podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, UpdateType: kubetypes.SyncPodKill, KillPodOptions: &KillPodOptions{ CompletedCh: ch, Evict: isEvicted, PodStatusFunc: statusFn, PodTerminationGracePeriodSecondsOverride: gracePeriodOverride, }, }) // wait for either a response, or a timeout select { case <-ch: return nil case <-time.After(timeoutDuration): recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.") return fmt.Errorf("timeout waiting to kill pod") } } } // cleanupPodUpdates closes the podUpdates channel and removes it from // podUpdates map so that the corresponding pod worker can stop. It also // removes any undelivered work. This method must be called holding the // pod lock. func (p *podWorkers) cleanupPodUpdates(uid types.UID) { if ch, ok := p.podUpdates[uid]; ok { close(ch) } delete(p.podUpdates, uid) } // requeueLastPodUpdate creates a new pending pod update from the most recently // executed update if no update is already queued, and then notifies the pod // worker goroutine of the update. This method must be called while holding // the pod lock. func (p *podWorkers) requeueLastPodUpdate(podUID types.UID, status *podSyncStatus) { // if there is already an update queued, we can use that instead, or if // we have no previously executed update, we cannot replay it. if status.pendingUpdate != nil || status.activeUpdate == nil { return } copied := *status.activeUpdate status.pendingUpdate = &copied // notify the pod worker status.working = true select { case p.podUpdates[podUID] <- struct{}{}: default: } }