1 /* 2 Copyright 2014 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package kubelet 18 19 import ( 20 "context" 21 "fmt" 22 "strings" 23 "sync" 24 "time" 25 26 v1 "k8s.io/api/core/v1" 27 "k8s.io/apimachinery/pkg/types" 28 "k8s.io/apimachinery/pkg/util/runtime" 29 "k8s.io/apimachinery/pkg/util/wait" 30 "k8s.io/client-go/tools/record" 31 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" 32 "k8s.io/klog/v2" 33 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" 34 "k8s.io/kubernetes/pkg/kubelet/events" 35 "k8s.io/kubernetes/pkg/kubelet/eviction" 36 "k8s.io/kubernetes/pkg/kubelet/metrics" 37 kubetypes "k8s.io/kubernetes/pkg/kubelet/types" 38 "k8s.io/kubernetes/pkg/kubelet/util/queue" 39 "k8s.io/utils/clock" 40 ) 41 42 // OnCompleteFunc is a function that is invoked when an operation completes. 43 // If err is non-nil, the operation did not complete successfully. 44 type OnCompleteFunc func(err error) 45 46 // PodStatusFunc is a function that is invoked to override the pod status when a pod is killed. 47 type PodStatusFunc func(podStatus *v1.PodStatus) 48 49 // KillPodOptions are options when performing a pod update whose update type is kill. 50 type KillPodOptions struct { 51 // CompletedCh is closed when the kill request completes (syncTerminatingPod has completed 52 // without error) or if the pod does not exist, or if the pod has already terminated. This 53 // could take an arbitrary amount of time to be closed, but is never left open once 54 // CouldHaveRunningContainers() returns false. 55 CompletedCh chan<- struct{} 56 // Evict is true if this is a pod triggered eviction - once a pod is evicted some resources are 57 // more aggressively reaped than during normal pod operation (stopped containers). 58 Evict bool 59 // PodStatusFunc is invoked (if set) and overrides the status of the pod at the time the pod is killed. 60 // The provided status is populated from the latest state. 61 PodStatusFunc PodStatusFunc 62 // PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation. 63 PodTerminationGracePeriodSecondsOverride *int64 64 } 65 66 // UpdatePodOptions is an options struct to pass to a UpdatePod operation. 67 type UpdatePodOptions struct { 68 // The type of update (create, update, sync, kill). 69 UpdateType kubetypes.SyncPodType 70 // StartTime is an optional timestamp for when this update was created. If set, 71 // when this update is fully realized by the pod worker it will be recorded in 72 // the PodWorkerDuration metric. 73 StartTime time.Time 74 // Pod to update. Required. 75 Pod *v1.Pod 76 // MirrorPod is the mirror pod if Pod is a static pod. Optional when UpdateType 77 // is kill or terminated. 78 MirrorPod *v1.Pod 79 // RunningPod is a runtime pod that is no longer present in config. Required 80 // if Pod is nil, ignored if Pod is set. 81 RunningPod *kubecontainer.Pod 82 // KillPodOptions is used to override the default termination behavior of the 83 // pod or to update the pod status after an operation is completed. Since a 84 // pod can be killed for multiple reasons, PodStatusFunc is invoked in order 85 // and later kills have an opportunity to override the status (i.e. a preemption 86 // may be later turned into an eviction). 87 KillPodOptions *KillPodOptions 88 } 89 90 // PodWorkType classifies the status of pod as seen by the pod worker - setup (sync), 91 // teardown of containers (terminating), or cleanup (terminated). 92 type PodWorkerState int 93 94 const ( 95 // SyncPod is when the pod is expected to be started and running. 96 SyncPod PodWorkerState = iota 97 // TerminatingPod is when the pod is no longer being set up, but some 98 // containers may be running and are being torn down. 99 TerminatingPod 100 // TerminatedPod indicates the pod is stopped, can have no more running 101 // containers, and any foreground cleanup can be executed. 102 TerminatedPod 103 ) 104 105 func (state PodWorkerState) String() string { 106 switch state { 107 case SyncPod: 108 return "sync" 109 case TerminatingPod: 110 return "terminating" 111 case TerminatedPod: 112 return "terminated" 113 default: 114 panic(fmt.Sprintf("the state %d is not defined", state)) 115 } 116 } 117 118 // PodWorkerSync is the summarization of a single pod worker for sync. Values 119 // besides state are used to provide metric counts for operators. 120 type PodWorkerSync struct { 121 // State of the pod. 122 State PodWorkerState 123 // Orphan is true if the pod is no longer in the desired set passed to SyncKnownPods. 124 Orphan bool 125 // HasConfig is true if we have a historical pod spec for this pod. 126 HasConfig bool 127 // Static is true if we have config and the pod came from a static source. 128 Static bool 129 } 130 131 // podWork is the internal changes 132 type podWork struct { 133 // WorkType is the type of sync to perform - sync (create), terminating (stop 134 // containers), terminated (clean up and write status). 135 WorkType PodWorkerState 136 137 // Options contains the data to sync. 138 Options UpdatePodOptions 139 } 140 141 // PodWorkers is an abstract interface for testability. 142 type PodWorkers interface { 143 // UpdatePod notifies the pod worker of a change to a pod, which will then 144 // be processed in FIFO order by a goroutine per pod UID. The state of the 145 // pod will be passed to the syncPod method until either the pod is marked 146 // as deleted, it reaches a terminal phase (Succeeded/Failed), or the pod 147 // is evicted by the kubelet. Once that occurs the syncTerminatingPod method 148 // will be called until it exits successfully, and after that all further 149 // UpdatePod() calls will be ignored for that pod until it has been forgotten 150 // due to significant time passing. A pod that is terminated will never be 151 // restarted. 152 UpdatePod(options UpdatePodOptions) 153 // SyncKnownPods removes workers for pods that are not in the desiredPods set 154 // and have been terminated for a significant period of time. Once this method 155 // has been called once, the workers are assumed to be fully initialized and 156 // subsequent calls to ShouldPodContentBeRemoved on unknown pods will return 157 // true. It returns a map describing the state of each known pod worker. It 158 // is the responsibility of the caller to re-add any desired pods that are not 159 // returned as knownPods. 160 SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) 161 162 // IsPodKnownTerminated returns true once SyncTerminatingPod completes 163 // successfully - the provided pod UID it is known by the pod 164 // worker to be terminated. If the pod has been force deleted and the pod worker 165 // has completed termination this method will return false, so this method should 166 // only be used to filter out pods from the desired set such as in admission. 167 // 168 // Intended for use by the kubelet config loops, but not subsystems, which should 169 // use ShouldPod*(). 170 IsPodKnownTerminated(uid types.UID) bool 171 // CouldHaveRunningContainers returns true before the pod workers have synced, 172 // once the pod workers see the pod (syncPod could be called), and returns false 173 // after the pod has been terminated (running containers guaranteed stopped). 174 // 175 // Intended for use by the kubelet config loops, but not subsystems, which should 176 // use ShouldPod*(). 177 CouldHaveRunningContainers(uid types.UID) bool 178 179 // ShouldPodBeFinished returns true once SyncTerminatedPod completes 180 // successfully - the provided pod UID it is known to the pod worker to 181 // be terminated and have resources reclaimed. It returns false before the 182 // pod workers have synced (syncPod could be called). Once the pod workers 183 // have synced it returns false if the pod has a sync status until 184 // SyncTerminatedPod completes successfully. If the pod workers have synced, 185 // but the pod does not have a status it returns true. 186 // 187 // Intended for use by subsystem sync loops to avoid performing background setup 188 // after termination has been requested for a pod. Callers must ensure that the 189 // syncPod method is non-blocking when their data is absent. 190 ShouldPodBeFinished(uid types.UID) bool 191 // IsPodTerminationRequested returns true when pod termination has been requested 192 // until the termination completes and the pod is removed from config. This should 193 // not be used in cleanup loops because it will return false if the pod has already 194 // been cleaned up - use ShouldPodContainersBeTerminating instead. Also, this method 195 // may return true while containers are still being initialized by the pod worker. 196 // 197 // Intended for use by the kubelet sync* methods, but not subsystems, which should 198 // use ShouldPod*(). 199 IsPodTerminationRequested(uid types.UID) bool 200 201 // ShouldPodContainersBeTerminating returns false before pod workers have synced, 202 // or once a pod has started terminating. This check is similar to 203 // ShouldPodRuntimeBeRemoved but is also true after pod termination is requested. 204 // 205 // Intended for use by subsystem sync loops to avoid performing background setup 206 // after termination has been requested for a pod. Callers must ensure that the 207 // syncPod method is non-blocking when their data is absent. 208 ShouldPodContainersBeTerminating(uid types.UID) bool 209 // ShouldPodRuntimeBeRemoved returns true if runtime managers within the Kubelet 210 // should aggressively cleanup pod resources that are not containers or on disk 211 // content, like attached volumes. This is true when a pod is not yet observed 212 // by a worker after the first sync (meaning it can't be running yet) or after 213 // all running containers are stopped. 214 // TODO: Once pod logs are separated from running containers, this method should 215 // be used to gate whether containers are kept. 216 // 217 // Intended for use by subsystem sync loops to know when to start tearing down 218 // resources that are used by running containers. Callers should ensure that 219 // runtime content they own is not required for post-termination - for instance 220 // containers are required in docker to preserve pod logs until after the pod 221 // is deleted. 222 ShouldPodRuntimeBeRemoved(uid types.UID) bool 223 // ShouldPodContentBeRemoved returns true if resource managers within the Kubelet 224 // should aggressively cleanup all content related to the pod. This is true 225 // during pod eviction (when we wish to remove that content to free resources) 226 // as well as after the request to delete a pod has resulted in containers being 227 // stopped (which is a more graceful action). Note that a deleting pod can still 228 // be evicted. 229 // 230 // Intended for use by subsystem sync loops to know when to start tearing down 231 // resources that are used by non-deleted pods. Content is generally preserved 232 // until deletion+removal_from_etcd or eviction, although garbage collection 233 // can free content when this method returns false. 234 ShouldPodContentBeRemoved(uid types.UID) bool 235 // IsPodForMirrorPodTerminatingByFullName returns true if a static pod with the 236 // provided pod name is currently terminating and has yet to complete. It is 237 // intended to be used only during orphan mirror pod cleanup to prevent us from 238 // deleting a terminating static pod from the apiserver before the pod is shut 239 // down. 240 IsPodForMirrorPodTerminatingByFullName(podFullname string) bool 241 } 242 243 // podSyncer describes the core lifecyle operations of the pod state machine. A pod is first 244 // synced until it naturally reaches termination (true is returned) or an external agent decides 245 // the pod should be terminated. Once a pod should be terminating, SyncTerminatingPod is invoked 246 // until it returns no error. Then the SyncTerminatedPod method is invoked until it exits without 247 // error, and the pod is considered terminal. Implementations of this interface must be threadsafe 248 // for simultaneous invocation of these methods for multiple pods. 249 type podSyncer interface { 250 // SyncPod configures the pod and starts and restarts all containers. If it returns true, the 251 // pod has reached a terminal state and the presence of the error indicates succeeded or failed. 252 // If an error is returned, the sync was not successful and should be rerun in the future. This 253 // is a long running method and should exit early with context.Canceled if the context is canceled. 254 SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) 255 // SyncTerminatingPod attempts to ensure the pod's containers are no longer running and to collect 256 // any final status. This method is repeatedly invoked with diminishing grace periods until it exits 257 // without error. Once this method exits with no error other components are allowed to tear down 258 // supporting resources like volumes and devices. If the context is canceled, the method should 259 // return context.Canceled unless it has successfully finished, which may occur when a shorter 260 // grace period is detected. 261 SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error 262 // SyncTerminatingRuntimePod is invoked when running containers are found that correspond to 263 // a pod that is no longer known to the kubelet to terminate those containers. It should not 264 // exit without error unless all containers are known to be stopped. 265 SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error 266 // SyncTerminatedPod is invoked after all running containers are stopped and is responsible 267 // for releasing resources that should be executed right away rather than in the background. 268 // Once it exits without error the pod is considered finished on the node. 269 SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error 270 } 271 272 type syncPodFnType func(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) 273 type syncTerminatingPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error 274 type syncTerminatingRuntimePodFnType func(ctx context.Context, runningPod *kubecontainer.Pod) error 275 type syncTerminatedPodFnType func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error 276 277 // podSyncerFuncs implements podSyncer and accepts functions for each method. 278 type podSyncerFuncs struct { 279 syncPod syncPodFnType 280 syncTerminatingPod syncTerminatingPodFnType 281 syncTerminatingRuntimePod syncTerminatingRuntimePodFnType 282 syncTerminatedPod syncTerminatedPodFnType 283 } 284 285 func newPodSyncerFuncs(s podSyncer) podSyncerFuncs { 286 return podSyncerFuncs{ 287 syncPod: s.SyncPod, 288 syncTerminatingPod: s.SyncTerminatingPod, 289 syncTerminatingRuntimePod: s.SyncTerminatingRuntimePod, 290 syncTerminatedPod: s.SyncTerminatedPod, 291 } 292 } 293 294 var _ podSyncer = podSyncerFuncs{} 295 296 func (f podSyncerFuncs) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) { 297 return f.syncPod(ctx, updateType, pod, mirrorPod, podStatus) 298 } 299 func (f podSyncerFuncs) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { 300 return f.syncTerminatingPod(ctx, pod, podStatus, gracePeriod, podStatusFn) 301 } 302 func (f podSyncerFuncs) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error { 303 return f.syncTerminatingRuntimePod(ctx, runningPod) 304 } 305 func (f podSyncerFuncs) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { 306 return f.syncTerminatedPod(ctx, pod, podStatus) 307 } 308 309 const ( 310 // jitter factor for resyncInterval 311 workerResyncIntervalJitterFactor = 0.5 312 313 // jitter factor for backOffPeriod and backOffOnTransientErrorPeriod 314 workerBackOffPeriodJitterFactor = 0.5 315 316 // backoff period when transient error occurred. 317 backOffOnTransientErrorPeriod = time.Second 318 ) 319 320 // podSyncStatus tracks per-pod transitions through the three phases of pod 321 // worker sync (setup, terminating, terminated). 322 type podSyncStatus struct { 323 // ctx is the context that is associated with the current pod sync. 324 // TODO: remove this from the struct by having the context initialized 325 // in startPodSync, the cancelFn used by UpdatePod, and cancellation of 326 // a parent context for tearing down workers (if needed) on shutdown 327 ctx context.Context 328 // cancelFn if set is expected to cancel the current podSyncer operation. 329 cancelFn context.CancelFunc 330 331 // fullname of the pod 332 fullname string 333 334 // working is true if an update is pending or being worked by a pod worker 335 // goroutine. 336 working bool 337 // pendingUpdate is the updated state the pod worker should observe. It is 338 // cleared and moved to activeUpdate when a pod worker reads it. A new update 339 // may always replace a pending update as the pod worker does not guarantee 340 // that all intermediate states are synced to a worker, only the most recent. 341 // This state will not be visible to downstream components until a pod worker 342 // has begun processing it. 343 pendingUpdate *UpdatePodOptions 344 // activeUpdate is the most recent version of the pod's state that will be 345 // passed to a sync*Pod function. A pod becomes visible to downstream components 346 // once a worker decides to start a pod (startedAt is set). The pod and mirror 347 // pod fields are accumulated if they are missing on a particular call (the last 348 // known version), and the value of KillPodOptions is accumulated as pods cannot 349 // have their grace period shortened. This is the source of truth for the pod spec 350 // the kubelet is reconciling towards for all components that act on running pods. 351 activeUpdate *UpdatePodOptions 352 353 // syncedAt is the time at which the pod worker first observed this pod. 354 syncedAt time.Time 355 // startedAt is the time at which the pod worker allowed the pod to start. 356 startedAt time.Time 357 // terminatingAt is set once the pod is requested to be killed - note that 358 // this can be set before the pod worker starts terminating the pod, see 359 // terminating. 360 terminatingAt time.Time 361 // terminatedAt is set once the pod worker has completed a successful 362 // syncTerminatingPod call and means all running containers are stopped. 363 terminatedAt time.Time 364 // gracePeriod is the requested gracePeriod once terminatingAt is nonzero. 365 gracePeriod int64 366 // notifyPostTerminating will be closed once the pod transitions to 367 // terminated. After the pod is in terminated state, nothing should be 368 // added to this list. 369 notifyPostTerminating []chan<- struct{} 370 // statusPostTerminating is a list of the status changes associated 371 // with kill pod requests. After the pod is in terminated state, nothing 372 // should be added to this list. The worker will execute the last function 373 // in this list on each termination attempt. 374 statusPostTerminating []PodStatusFunc 375 376 // startedTerminating is true once the pod worker has observed the request to 377 // stop a pod (exited syncPod and observed a podWork with WorkType 378 // TerminatingPod). Once this is set, it is safe for other components 379 // of the kubelet to assume that no other containers may be started. 380 startedTerminating bool 381 // deleted is true if the pod has been marked for deletion on the apiserver 382 // or has no configuration represented (was deleted before). 383 deleted bool 384 // evicted is true if the kill indicated this was an eviction (an evicted 385 // pod can be more aggressively cleaned up). 386 evicted bool 387 // finished is true once the pod worker completes for a pod 388 // (syncTerminatedPod exited with no errors) until SyncKnownPods is invoked 389 // to remove the pod. A terminal pod (Succeeded/Failed) will have 390 // termination status until the pod is deleted. 391 finished bool 392 // restartRequested is true if the pod worker was informed the pod is 393 // expected to exist (update type of create, update, or sync) after 394 // it has been killed. When known pods are synced, any pod that is 395 // terminated and has restartRequested will have its history cleared. 396 restartRequested bool 397 // observedRuntime is true if the pod has been observed to be present in the 398 // runtime. A pod that has been observed at runtime must go through either 399 // SyncTerminatingRuntimePod or SyncTerminatingPod. Otherwise, we can avoid 400 // invoking the terminating methods if the pod is deleted or orphaned before 401 // it has been started. 402 observedRuntime bool 403 } 404 405 func (s *podSyncStatus) IsWorking() bool { return s.working } 406 func (s *podSyncStatus) IsTerminationRequested() bool { return !s.terminatingAt.IsZero() } 407 func (s *podSyncStatus) IsTerminationStarted() bool { return s.startedTerminating } 408 func (s *podSyncStatus) IsTerminated() bool { return !s.terminatedAt.IsZero() } 409 func (s *podSyncStatus) IsFinished() bool { return s.finished } 410 func (s *podSyncStatus) IsEvicted() bool { return s.evicted } 411 func (s *podSyncStatus) IsDeleted() bool { return s.deleted } 412 func (s *podSyncStatus) IsStarted() bool { return !s.startedAt.IsZero() } 413 414 // WorkType returns this pods' current state of the pod in pod lifecycle state machine. 415 func (s *podSyncStatus) WorkType() PodWorkerState { 416 if s.IsTerminated() { 417 return TerminatedPod 418 } 419 if s.IsTerminationRequested() { 420 return TerminatingPod 421 } 422 return SyncPod 423 } 424 425 // mergeLastUpdate records the most recent state from a new update. Pod and MirrorPod are 426 // incremented. KillPodOptions is accumulated. If RunningPod is set, Pod is synthetic and 427 // will *not* be used as the last pod state unless no previous pod state exists (because 428 // the pod worker may be responsible for terminating a pod from a previous run of the 429 // kubelet where no config state is visible). The contents of activeUpdate are used as the 430 // source of truth for components downstream of the pod workers. 431 func (s *podSyncStatus) mergeLastUpdate(other UpdatePodOptions) { 432 opts := s.activeUpdate 433 if opts == nil { 434 opts = &UpdatePodOptions{} 435 s.activeUpdate = opts 436 } 437 438 // UpdatePodOptions states (and UpdatePod enforces) that either Pod or RunningPod 439 // is set, and we wish to preserve the most recent Pod we have observed, so only 440 // overwrite our Pod when we have no Pod or when RunningPod is nil. 441 if opts.Pod == nil || other.RunningPod == nil { 442 opts.Pod = other.Pod 443 } 444 // running pods will not persist but will be remembered for replay 445 opts.RunningPod = other.RunningPod 446 // if mirrorPod was not provided, remember the last one for replay 447 if other.MirrorPod != nil { 448 opts.MirrorPod = other.MirrorPod 449 } 450 // accumulate kill pod options 451 if other.KillPodOptions != nil { 452 opts.KillPodOptions = &KillPodOptions{} 453 if other.KillPodOptions.Evict { 454 opts.KillPodOptions.Evict = true 455 } 456 if override := other.KillPodOptions.PodTerminationGracePeriodSecondsOverride; override != nil { 457 value := *override 458 opts.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &value 459 } 460 } 461 // StartTime is not copied - that is purely for tracking latency of config propagation 462 // from kubelet to pod worker. 463 } 464 465 // podWorkers keeps track of operations on pods and ensures each pod is 466 // reconciled with the container runtime and other subsystems. The worker 467 // also tracks which pods are in flight for starting, which pods are 468 // shutting down but still have running containers, and which pods have 469 // terminated recently and are guaranteed to have no running containers. 470 // 471 // podWorkers is the source of truth for what pods should be active on a 472 // node at any time, and is kept up to date with the desired state of the 473 // node (tracked by the kubelet pod config loops and the state in the 474 // kubelet's podManager) via the UpdatePod method. Components that act 475 // upon running pods should look to the pod worker for state instead of the 476 // kubelet podManager. The pod worker is periodically reconciled with the 477 // state of the podManager via SyncKnownPods() and is responsible for 478 // ensuring the completion of all observed pods no longer present in 479 // the podManager (no longer part of the node's desired config). 480 // 481 // A pod passed to a pod worker is either being synced (expected to be 482 // running), terminating (has running containers but no new containers are 483 // expected to start), terminated (has no running containers but may still 484 // have resources being consumed), or cleaned up (no resources remaining). 485 // Once a pod is set to be "torn down" it cannot be started again for that 486 // UID (corresponding to a delete or eviction) until: 487 // 488 // 1. The pod worker is finalized (syncTerminatingPod and 489 // syncTerminatedPod exit without error sequentially) 490 // 2. The SyncKnownPods method is invoked by kubelet housekeeping and the pod 491 // is not part of the known config. 492 // 493 // Pod workers provide a consistent source of information to other kubelet 494 // loops about the status of the pod and whether containers can be 495 // running. The ShouldPodContentBeRemoved() method tracks whether a pod's 496 // contents should still exist, which includes non-existent pods after 497 // SyncKnownPods() has been called once (as per the contract, all existing 498 // pods should be provided via UpdatePod before SyncKnownPods is invoked). 499 // Generally other sync loops are expected to separate "setup" and 500 // "teardown" responsibilities and the information methods here assist in 501 // each by centralizing that state. A simple visualization of the time 502 // intervals involved might look like: 503 // 504 // ---| = kubelet config has synced at least once 505 // -------| |- = pod exists in apiserver config 506 // --------| |---------------- = CouldHaveRunningContainers() is true 507 // 508 // ^- pod is observed by pod worker . 509 // . . 510 // 511 // ----------| |------------------------- = syncPod is running 512 // 513 // . ^- pod worker loop sees change and invokes syncPod 514 // . . . 515 // 516 // --------------| |------- = ShouldPodContainersBeTerminating() returns true 517 // --------------| |------- = IsPodTerminationRequested() returns true (pod is known) 518 // 519 // . . ^- Kubelet evicts pod . 520 // . . . 521 // 522 // -------------------| |---------------- = syncTerminatingPod runs then exits without error 523 // 524 // . . ^ pod worker loop exits syncPod, sees pod is terminating, 525 // . . invokes syncTerminatingPod 526 // . . . 527 // 528 // ---| |------------------| . = ShouldPodRuntimeBeRemoved() returns true (post-sync) 529 // 530 // . ^ syncTerminatingPod has exited successfully 531 // . . 532 // 533 // ----------------------------| |------- = syncTerminatedPod runs then exits without error 534 // 535 // . ^ other loops can tear down 536 // . . 537 // 538 // ------------------------------------| |---- = status manager is waiting for SyncTerminatedPod() finished 539 // 540 // . ^ . 541 // 542 // ----------| |- = status manager can be writing pod status 543 // 544 // ^ status manager deletes pod because no longer exists in config 545 // 546 // Other components in the Kubelet can request a termination of the pod 547 // via the UpdatePod method or the killPodNow wrapper - this will ensure 548 // the components of the pod are stopped until the kubelet is restarted 549 // or permanently (if the phase of the pod is set to a terminal phase 550 // in the pod status change). 551 type podWorkers struct { 552 // Protects all per worker fields. 553 podLock sync.Mutex 554 // podsSynced is true once the pod worker has been synced at least once, 555 // which means that all working pods have been started via UpdatePod(). 556 podsSynced bool 557 558 // Tracks all running per-pod goroutines - per-pod goroutine will be 559 // processing updates received through its corresponding channel. Sending 560 // a message on this channel will signal the corresponding goroutine to 561 // consume podSyncStatuses[uid].pendingUpdate if set. 562 podUpdates map[types.UID]chan struct{} 563 // Tracks by UID the termination status of a pod - syncing, terminating, 564 // terminated, and evicted. 565 podSyncStatuses map[types.UID]*podSyncStatus 566 567 // Tracks all uids for started static pods by full name 568 startedStaticPodsByFullname map[string]types.UID 569 // Tracks all uids for static pods that are waiting to start by full name 570 waitingToStartStaticPodsByFullname map[string][]types.UID 571 572 workQueue queue.WorkQueue 573 574 // This function is run to sync the desired state of pod. 575 // NOTE: This function has to be thread-safe - it can be called for 576 // different pods at the same time. 577 podSyncer podSyncer 578 579 // workerChannelFn is exposed for testing to allow unit tests to impose delays 580 // in channel communication. The function is invoked once each time a new worker 581 // goroutine starts. 582 workerChannelFn func(uid types.UID, in chan struct{}) (out <-chan struct{}) 583 584 // The EventRecorder to use 585 recorder record.EventRecorder 586 587 // backOffPeriod is the duration to back off when there is a sync error. 588 backOffPeriod time.Duration 589 590 // resyncInterval is the duration to wait until the next sync. 591 resyncInterval time.Duration 592 593 // podCache stores kubecontainer.PodStatus for all pods. 594 podCache kubecontainer.Cache 595 596 // clock is used for testing timing 597 clock clock.PassiveClock 598 } 599 600 func newPodWorkers( 601 podSyncer podSyncer, 602 recorder record.EventRecorder, 603 workQueue queue.WorkQueue, 604 resyncInterval, backOffPeriod time.Duration, 605 podCache kubecontainer.Cache, 606 ) PodWorkers { 607 return &podWorkers{ 608 podSyncStatuses: map[types.UID]*podSyncStatus{}, 609 podUpdates: map[types.UID]chan struct{}{}, 610 startedStaticPodsByFullname: map[string]types.UID{}, 611 waitingToStartStaticPodsByFullname: map[string][]types.UID{}, 612 podSyncer: podSyncer, 613 recorder: recorder, 614 workQueue: workQueue, 615 resyncInterval: resyncInterval, 616 backOffPeriod: backOffPeriod, 617 podCache: podCache, 618 clock: clock.RealClock{}, 619 } 620 } 621 622 func (p *podWorkers) IsPodKnownTerminated(uid types.UID) bool { 623 p.podLock.Lock() 624 defer p.podLock.Unlock() 625 if status, ok := p.podSyncStatuses[uid]; ok { 626 return status.IsTerminated() 627 } 628 // if the pod is not known, we return false (pod worker is not aware of it) 629 return false 630 } 631 632 func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool { 633 p.podLock.Lock() 634 defer p.podLock.Unlock() 635 if status, ok := p.podSyncStatuses[uid]; ok { 636 return !status.IsTerminated() 637 } 638 // once all pods are synced, any pod without sync status is known to not be running. 639 return !p.podsSynced 640 } 641 642 func (p *podWorkers) ShouldPodBeFinished(uid types.UID) bool { 643 p.podLock.Lock() 644 defer p.podLock.Unlock() 645 if status, ok := p.podSyncStatuses[uid]; ok { 646 return status.IsFinished() 647 } 648 // once all pods are synced, any pod without sync status is assumed to 649 // have SyncTerminatedPod finished. 650 return p.podsSynced 651 } 652 653 func (p *podWorkers) IsPodTerminationRequested(uid types.UID) bool { 654 p.podLock.Lock() 655 defer p.podLock.Unlock() 656 if status, ok := p.podSyncStatuses[uid]; ok { 657 // the pod may still be setting up at this point. 658 return status.IsTerminationRequested() 659 } 660 // an unknown pod is considered not to be terminating (use ShouldPodContainersBeTerminating in 661 // cleanup loops to avoid failing to cleanup pods that have already been removed from config) 662 return false 663 } 664 665 func (p *podWorkers) ShouldPodContainersBeTerminating(uid types.UID) bool { 666 p.podLock.Lock() 667 defer p.podLock.Unlock() 668 if status, ok := p.podSyncStatuses[uid]; ok { 669 // we wait until the pod worker goroutine observes the termination, which means syncPod will not 670 // be executed again, which means no new containers can be started 671 return status.IsTerminationStarted() 672 } 673 // once we've synced, if the pod isn't known to the workers we should be tearing them 674 // down 675 return p.podsSynced 676 } 677 678 func (p *podWorkers) ShouldPodRuntimeBeRemoved(uid types.UID) bool { 679 p.podLock.Lock() 680 defer p.podLock.Unlock() 681 if status, ok := p.podSyncStatuses[uid]; ok { 682 return status.IsTerminated() 683 } 684 // a pod that hasn't been sent to the pod worker yet should have no runtime components once we have 685 // synced all content. 686 return p.podsSynced 687 } 688 689 func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool { 690 p.podLock.Lock() 691 defer p.podLock.Unlock() 692 if status, ok := p.podSyncStatuses[uid]; ok { 693 return status.IsEvicted() || (status.IsDeleted() && status.IsTerminated()) 694 } 695 // a pod that hasn't been sent to the pod worker yet should have no content on disk once we have 696 // synced all content. 697 return p.podsSynced 698 } 699 700 func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool { 701 p.podLock.Lock() 702 defer p.podLock.Unlock() 703 uid, started := p.startedStaticPodsByFullname[podFullName] 704 if !started { 705 return false 706 } 707 status, exists := p.podSyncStatuses[uid] 708 if !exists { 709 return false 710 } 711 if !status.IsTerminationRequested() || status.IsTerminated() { 712 return false 713 } 714 715 return true 716 } 717 718 func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool { 719 for _, container := range status.ContainerStatuses { 720 if container.State == kubecontainer.ContainerStateRunning { 721 return false 722 } 723 } 724 for _, sb := range status.SandboxStatuses { 725 if sb.State == runtimeapi.PodSandboxState_SANDBOX_READY { 726 return false 727 } 728 } 729 return true 730 } 731 732 // UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable, 733 // terminating, or terminated, and will transition to terminating if: deleted on the apiserver, 734 // discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet. 735 func (p *podWorkers) UpdatePod(options UpdatePodOptions) { 736 // Handle when the pod is an orphan (no config) and we only have runtime status by running only 737 // the terminating part of the lifecycle. A running pod contains only a minimal set of information 738 // about the pod 739 var isRuntimePod bool 740 var uid types.UID 741 var name, ns string 742 if runningPod := options.RunningPod; runningPod != nil { 743 if options.Pod == nil { 744 // the sythetic pod created here is used only as a placeholder and not tracked 745 if options.UpdateType != kubetypes.SyncPodKill { 746 klog.InfoS("Pod update is ignored, runtime pods can only be killed", "pod", klog.KRef(runningPod.Namespace, runningPod.Name), "podUID", runningPod.ID, "updateType", options.UpdateType) 747 return 748 } 749 uid, ns, name = runningPod.ID, runningPod.Namespace, runningPod.Name 750 isRuntimePod = true 751 } else { 752 options.RunningPod = nil 753 uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name 754 klog.InfoS("Pod update included RunningPod which is only valid when Pod is not specified", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 755 } 756 } else { 757 uid, ns, name = options.Pod.UID, options.Pod.Namespace, options.Pod.Name 758 } 759 760 p.podLock.Lock() 761 defer p.podLock.Unlock() 762 763 // decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it 764 var firstTime bool 765 now := p.clock.Now() 766 status, ok := p.podSyncStatuses[uid] 767 if !ok { 768 klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 769 firstTime = true 770 status = &podSyncStatus{ 771 syncedAt: now, 772 fullname: kubecontainer.BuildPodFullName(name, ns), 773 } 774 // if this pod is being synced for the first time, we need to make sure it is an active pod 775 if options.Pod != nil && (options.Pod.Status.Phase == v1.PodFailed || options.Pod.Status.Phase == v1.PodSucceeded) { 776 // Check to see if the pod is not running and the pod is terminal; if this succeeds then record in the podWorker that it is terminated. 777 // This is needed because after a kubelet restart, we need to ensure terminal pods will NOT be considered active in Pod Admission. See http://issues.k8s.io/105523 778 // However, `filterOutInactivePods`, considers pods that are actively terminating as active. As a result, `IsPodKnownTerminated()` needs to return true and thus `terminatedAt` needs to be set. 779 if statusCache, err := p.podCache.Get(uid); err == nil { 780 if isPodStatusCacheTerminal(statusCache) { 781 // At this point we know: 782 // (1) The pod is terminal based on the config source. 783 // (2) The pod is terminal based on the runtime cache. 784 // This implies that this pod had already completed `SyncTerminatingPod` sometime in the past. The pod is likely being synced for the first time due to a kubelet restart. 785 // These pods need to complete SyncTerminatedPod to ensure that all resources are cleaned and that the status manager makes the final status updates for the pod. 786 // As a result, set finished: false, to ensure a Terminated event will be sent and `SyncTerminatedPod` will run. 787 status = &podSyncStatus{ 788 terminatedAt: now, 789 terminatingAt: now, 790 syncedAt: now, 791 startedTerminating: true, 792 finished: false, 793 fullname: kubecontainer.BuildPodFullName(name, ns), 794 } 795 } 796 } 797 } 798 p.podSyncStatuses[uid] = status 799 } 800 801 // RunningPods represent an unknown pod execution and don't contain pod spec information 802 // sufficient to perform any action other than termination. If we received a RunningPod 803 // after a real pod has already been provided, use the most recent spec instead. Also, 804 // once we observe a runtime pod we must drive it to completion, even if we weren't the 805 // ones who started it. 806 pod := options.Pod 807 if isRuntimePod { 808 status.observedRuntime = true 809 switch { 810 case status.pendingUpdate != nil && status.pendingUpdate.Pod != nil: 811 pod = status.pendingUpdate.Pod 812 options.Pod = pod 813 options.RunningPod = nil 814 case status.activeUpdate != nil && status.activeUpdate.Pod != nil: 815 pod = status.activeUpdate.Pod 816 options.Pod = pod 817 options.RunningPod = nil 818 default: 819 // we will continue to use RunningPod.ToAPIPod() as pod here, but 820 // options.Pod will be nil and other methods must handle that appropriately. 821 pod = options.RunningPod.ToAPIPod() 822 } 823 } 824 825 // When we see a create update on an already terminating pod, that implies two pods with the same UID were created in 826 // close temporal proximity (usually static pod but it's possible for an apiserver to extremely rarely do something 827 // similar) - flag the sync status to indicate that after the pod terminates it should be reset to "not running" to 828 // allow a subsequent add/update to start the pod worker again. This does not apply to the first time we see a pod, 829 // such as when the kubelet restarts and we see already terminated pods for the first time. 830 if !firstTime && status.IsTerminationRequested() { 831 if options.UpdateType == kubetypes.SyncPodCreate { 832 status.restartRequested = true 833 klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled later", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 834 return 835 } 836 } 837 838 // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping) 839 if status.IsFinished() { 840 klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 841 return 842 } 843 844 // check for a transition to terminating 845 var becameTerminating bool 846 if !status.IsTerminationRequested() { 847 switch { 848 case isRuntimePod: 849 klog.V(4).InfoS("Pod is orphaned and must be torn down", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 850 status.deleted = true 851 status.terminatingAt = now 852 becameTerminating = true 853 case pod.DeletionTimestamp != nil: 854 klog.V(4).InfoS("Pod is marked for graceful deletion, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 855 status.deleted = true 856 status.terminatingAt = now 857 becameTerminating = true 858 case pod.Status.Phase == v1.PodFailed, pod.Status.Phase == v1.PodSucceeded: 859 klog.V(4).InfoS("Pod is in a terminal phase (success/failed), begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 860 status.terminatingAt = now 861 becameTerminating = true 862 case options.UpdateType == kubetypes.SyncPodKill: 863 if options.KillPodOptions != nil && options.KillPodOptions.Evict { 864 klog.V(4).InfoS("Pod is being evicted by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 865 status.evicted = true 866 } else { 867 klog.V(4).InfoS("Pod is being removed by the kubelet, begin teardown", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 868 } 869 status.terminatingAt = now 870 becameTerminating = true 871 } 872 } 873 874 // once a pod is terminating, all updates are kills and the grace period can only decrease 875 var wasGracePeriodShortened bool 876 switch { 877 case status.IsTerminated(): 878 // A terminated pod may still be waiting for cleanup - if we receive a runtime pod kill request 879 // due to housekeeping seeing an older cached version of the runtime pod simply ignore it until 880 // after the pod worker completes. 881 if isRuntimePod { 882 klog.V(3).InfoS("Pod is waiting for termination, ignoring runtime-only kill until after pod worker is fully terminated", "pod", klog.KRef(ns, name), "podUID", uid, "updateType", options.UpdateType) 883 return 884 } 885 886 if options.KillPodOptions != nil { 887 if ch := options.KillPodOptions.CompletedCh; ch != nil { 888 close(ch) 889 } 890 } 891 options.KillPodOptions = nil 892 893 case status.IsTerminationRequested(): 894 if options.KillPodOptions == nil { 895 options.KillPodOptions = &KillPodOptions{} 896 } 897 898 if ch := options.KillPodOptions.CompletedCh; ch != nil { 899 status.notifyPostTerminating = append(status.notifyPostTerminating, ch) 900 } 901 if fn := options.KillPodOptions.PodStatusFunc; fn != nil { 902 status.statusPostTerminating = append(status.statusPostTerminating, fn) 903 } 904 905 gracePeriod, gracePeriodShortened := calculateEffectiveGracePeriod(status, pod, options.KillPodOptions) 906 907 wasGracePeriodShortened = gracePeriodShortened 908 status.gracePeriod = gracePeriod 909 // always set the grace period for syncTerminatingPod so we don't have to recalculate, 910 // will never be zero. 911 options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod 912 913 default: 914 // KillPodOptions is not valid for sync actions outside of the terminating phase 915 if options.KillPodOptions != nil { 916 if ch := options.KillPodOptions.CompletedCh; ch != nil { 917 close(ch) 918 } 919 options.KillPodOptions = nil 920 } 921 } 922 923 // start the pod worker goroutine if it doesn't exist 924 podUpdates, exists := p.podUpdates[uid] 925 if !exists { 926 // buffer the channel to avoid blocking this method 927 podUpdates = make(chan struct{}, 1) 928 p.podUpdates[uid] = podUpdates 929 930 // ensure that static pods start in the order they are received by UpdatePod 931 if kubetypes.IsStaticPod(pod) { 932 p.waitingToStartStaticPodsByFullname[status.fullname] = 933 append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) 934 } 935 936 // allow testing of delays in the pod update channel 937 var outCh <-chan struct{} 938 if p.workerChannelFn != nil { 939 outCh = p.workerChannelFn(uid, podUpdates) 940 } else { 941 outCh = podUpdates 942 } 943 944 // spawn a pod worker 945 go func() { 946 // TODO: this should be a wait.Until with backoff to handle panics, and 947 // accept a context for shutdown 948 defer runtime.HandleCrash() 949 defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid) 950 p.podWorkerLoop(uid, outCh) 951 }() 952 } 953 954 // measure the maximum latency between a call to UpdatePod and when the pod worker reacts to it 955 // by preserving the oldest StartTime 956 if status.pendingUpdate != nil && !status.pendingUpdate.StartTime.IsZero() && status.pendingUpdate.StartTime.Before(options.StartTime) { 957 options.StartTime = status.pendingUpdate.StartTime 958 } 959 960 // notify the pod worker there is a pending update 961 status.pendingUpdate = &options 962 status.working = true 963 klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) 964 select { 965 case podUpdates <- struct{}{}: 966 default: 967 } 968 969 if (becameTerminating || wasGracePeriodShortened) && status.cancelFn != nil { 970 klog.V(3).InfoS("Cancelling current pod sync", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType()) 971 status.cancelFn() 972 return 973 } 974 } 975 976 // calculateEffectiveGracePeriod sets the initial grace period for a newly terminating pod or allows a 977 // shorter grace period to be provided, returning the desired value. 978 func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *KillPodOptions) (int64, bool) { 979 // enforce the restriction that a grace period can only decrease and track whatever our value is, 980 // then ensure a calculated value is passed down to lower levels 981 gracePeriod := status.gracePeriod 982 // this value is bedrock truth - the apiserver owns telling us this value calculated by apiserver 983 if override := pod.DeletionGracePeriodSeconds; override != nil { 984 if gracePeriod == 0 || *override < gracePeriod { 985 gracePeriod = *override 986 } 987 } 988 // we allow other parts of the kubelet (namely eviction) to request this pod be terminated faster 989 if options != nil { 990 if override := options.PodTerminationGracePeriodSecondsOverride; override != nil { 991 if gracePeriod == 0 || *override < gracePeriod { 992 gracePeriod = *override 993 } 994 } 995 } 996 // make a best effort to default this value to the pod's desired intent, in the event 997 // the kubelet provided no requested value (graceful termination?) 998 if gracePeriod == 0 && pod.Spec.TerminationGracePeriodSeconds != nil { 999 gracePeriod = *pod.Spec.TerminationGracePeriodSeconds 1000 } 1001 // no matter what, we always supply a grace period of 1 1002 if gracePeriod < 1 { 1003 gracePeriod = 1 1004 } 1005 return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod 1006 } 1007 1008 // allowPodStart tries to start the pod and returns true if allowed, otherwise 1009 // it requeues the pod and returns false. If the pod will never be able to start 1010 // because data is missing, or the pod was terminated before start, canEverStart 1011 // is false. This method can only be called while holding the pod lock. 1012 func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) { 1013 if !kubetypes.IsStaticPod(pod) { 1014 // TODO: Do we want to allow non-static pods with the same full name? 1015 // Note that it may disable the force deletion of pods. 1016 return true, true 1017 } 1018 status, ok := p.podSyncStatuses[pod.UID] 1019 if !ok { 1020 klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID) 1021 return false, false 1022 } 1023 if status.IsTerminationRequested() { 1024 return false, false 1025 } 1026 if !p.allowStaticPodStart(status.fullname, pod.UID) { 1027 p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) 1028 return false, true 1029 } 1030 return true, true 1031 } 1032 1033 // allowStaticPodStart tries to start the static pod and returns true if 1034 // 1. there are no other started static pods with the same fullname 1035 // 2. the uid matches that of the first valid static pod waiting to start 1036 func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool { 1037 startedUID, started := p.startedStaticPodsByFullname[fullname] 1038 if started { 1039 return startedUID == uid 1040 } 1041 1042 waitingPods := p.waitingToStartStaticPodsByFullname[fullname] 1043 // TODO: This is O(N) with respect to the number of updates to static pods 1044 // with overlapping full names, and ideally would be O(1). 1045 for i, waitingUID := range waitingPods { 1046 // has pod already terminated or been deleted? 1047 status, ok := p.podSyncStatuses[waitingUID] 1048 if !ok || status.IsTerminationRequested() || status.IsTerminated() { 1049 continue 1050 } 1051 // another pod is next in line 1052 if waitingUID != uid { 1053 p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:] 1054 return false 1055 } 1056 // we are up next, remove ourselves 1057 waitingPods = waitingPods[i+1:] 1058 break 1059 } 1060 if len(waitingPods) != 0 { 1061 p.waitingToStartStaticPodsByFullname[fullname] = waitingPods 1062 } else { 1063 delete(p.waitingToStartStaticPodsByFullname, fullname) 1064 } 1065 p.startedStaticPodsByFullname[fullname] = uid 1066 return true 1067 } 1068 1069 // cleanupUnstartedPod is invoked if a pod that has never been started receives a termination 1070 // signal before it can be started. This method must be called holding the pod lock. 1071 func (p *podWorkers) cleanupUnstartedPod(pod *v1.Pod, status *podSyncStatus) { 1072 p.cleanupPodUpdates(pod.UID) 1073 1074 if status.terminatingAt.IsZero() { 1075 klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) 1076 } 1077 if !status.terminatedAt.IsZero() { 1078 klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID) 1079 } 1080 status.finished = true 1081 status.working = false 1082 status.terminatedAt = p.clock.Now() 1083 1084 if p.startedStaticPodsByFullname[status.fullname] == pod.UID { 1085 delete(p.startedStaticPodsByFullname, status.fullname) 1086 } 1087 } 1088 1089 // startPodSync is invoked by each pod worker goroutine when a message arrives on the pod update channel. 1090 // This method consumes a pending update, initializes a context, decides whether the pod is already started 1091 // or can be started, and updates the cached pod state so that downstream components can observe what the 1092 // pod worker goroutine is currently attempting to do. If ok is false, there is no available event. If any 1093 // of the boolean values is false, ensure the appropriate cleanup happens before returning. 1094 // 1095 // This method should ensure that either status.pendingUpdate is cleared and merged into status.activeUpdate, 1096 // or when a pod cannot be started status.pendingUpdate remains the same. Pods that have not been started 1097 // should never have an activeUpdate because that is exposed to downstream components on started pods. 1098 func (p *podWorkers) startPodSync(podUID types.UID) (ctx context.Context, update podWork, canStart, canEverStart, ok bool) { 1099 p.podLock.Lock() 1100 defer p.podLock.Unlock() 1101 1102 // verify we are known to the pod worker still 1103 status, ok := p.podSyncStatuses[podUID] 1104 if !ok { 1105 // pod status has disappeared, the worker should exit 1106 klog.V(4).InfoS("Pod worker no longer has status, worker should exit", "podUID", podUID) 1107 return nil, update, false, false, false 1108 } 1109 if !status.working { 1110 // working is used by unit tests to observe whether a worker is currently acting on this pod 1111 klog.V(4).InfoS("Pod should be marked as working by the pod worker, programmer error", "podUID", podUID) 1112 } 1113 if status.pendingUpdate == nil { 1114 // no update available, this means we were queued without work being added or there is a 1115 // race condition, both of which are unexpected 1116 status.working = false 1117 klog.V(4).InfoS("Pod worker received no pending work, programmer error?", "podUID", podUID) 1118 return nil, update, false, false, false 1119 } 1120 1121 // consume the pending update 1122 update.WorkType = status.WorkType() 1123 update.Options = *status.pendingUpdate 1124 status.pendingUpdate = nil 1125 select { 1126 case <-p.podUpdates[podUID]: 1127 // ensure the pod update channel is empty (it is only ever written to under lock) 1128 default: 1129 } 1130 1131 // initialize a context for the worker if one does not exist 1132 if status.ctx == nil || status.ctx.Err() == context.Canceled { 1133 status.ctx, status.cancelFn = context.WithCancel(context.Background()) 1134 } 1135 ctx = status.ctx 1136 1137 // if we are already started, make our state visible to downstream components 1138 if status.IsStarted() { 1139 status.mergeLastUpdate(update.Options) 1140 return ctx, update, true, true, true 1141 } 1142 1143 // if we are already terminating and we only have a running pod, allow the worker 1144 // to "start" since we are immediately moving to terminating 1145 if update.Options.RunningPod != nil && update.WorkType == TerminatingPod { 1146 status.mergeLastUpdate(update.Options) 1147 return ctx, update, true, true, true 1148 } 1149 1150 // If we receive an update where Pod is nil (running pod is set) but haven't 1151 // started yet, we can only terminate the pod, not start it. We should not be 1152 // asked to start such a pod, but guard here just in case an accident occurs. 1153 if update.Options.Pod == nil { 1154 status.mergeLastUpdate(update.Options) 1155 klog.V(4).InfoS("Running pod cannot start ever, programmer error", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) 1156 return ctx, update, false, false, true 1157 } 1158 1159 // verify we can start 1160 canStart, canEverStart = p.allowPodStart(update.Options.Pod) 1161 switch { 1162 case !canEverStart: 1163 p.cleanupUnstartedPod(update.Options.Pod, status) 1164 status.working = false 1165 if start := update.Options.StartTime; !start.IsZero() { 1166 metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) 1167 } 1168 klog.V(4).InfoS("Pod cannot start ever", "pod", klog.KObj(update.Options.Pod), "podUID", podUID, "updateType", update.WorkType) 1169 return ctx, update, canStart, canEverStart, true 1170 case !canStart: 1171 // this is the only path we don't start the pod, so we need to put the change back in pendingUpdate 1172 status.pendingUpdate = &update.Options 1173 status.working = false 1174 klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(update.Options.Pod), "podUID", podUID) 1175 return ctx, update, canStart, canEverStart, true 1176 } 1177 1178 // mark the pod as started 1179 status.startedAt = p.clock.Now() 1180 status.mergeLastUpdate(update.Options) 1181 1182 // If we are admitting the pod and it is new, record the count of containers 1183 // TODO: We should probably move this into syncPod and add an execution count 1184 // to the syncPod arguments, and this should be recorded on the first sync. 1185 // Leaving it here complicates a particularly important loop. 1186 metrics.ContainersPerPodCount.Observe(float64(len(update.Options.Pod.Spec.Containers))) 1187 1188 return ctx, update, true, true, true 1189 } 1190 1191 func podUIDAndRefForUpdate(update UpdatePodOptions) (types.UID, klog.ObjectRef) { 1192 if update.RunningPod != nil { 1193 return update.RunningPod.ID, klog.KObj(update.RunningPod.ToAPIPod()) 1194 } 1195 return update.Pod.UID, klog.KObj(update.Pod) 1196 } 1197 1198 // podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final 1199 // state is reached. The loop is responsible for driving the pod through four main phases: 1200 // 1201 // 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time 1202 // 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod 1203 // 3. Terminating, ensuring all running containers in the pod are stopped 1204 // 4. Terminated, cleaning up any resources that must be released before the pod can be deleted 1205 // 1206 // The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular 1207 // sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet 1208 // to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the 1209 // caller. When a pod transitions working->terminating or terminating->terminated, the next update is 1210 // queued immediately and no kubelet action is required. 1211 func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) { 1212 var lastSyncTime time.Time 1213 for range podUpdates { 1214 ctx, update, canStart, canEverStart, ok := p.startPodSync(podUID) 1215 // If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate. 1216 if !ok { 1217 continue 1218 } 1219 // If the pod was terminated prior to the pod being allowed to start, we exit the loop. 1220 if !canEverStart { 1221 return 1222 } 1223 // If the pod is not yet ready to start, continue and wait for more updates. 1224 if !canStart { 1225 continue 1226 } 1227 1228 podUID, podRef := podUIDAndRefForUpdate(update.Options) 1229 1230 klog.V(4).InfoS("Processing pod event", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) 1231 var isTerminal bool 1232 err := func() error { 1233 // The worker is responsible for ensuring the sync method sees the appropriate 1234 // status updates on resyncs (the result of the last sync), transitions to 1235 // terminating (no wait), or on terminated (whatever the most recent state is). 1236 // Only syncing and terminating can generate pod status changes, while terminated 1237 // pods ensure the most recent status makes it to the api server. 1238 var status *kubecontainer.PodStatus 1239 var err error 1240 switch { 1241 case update.Options.RunningPod != nil: 1242 // when we receive a running pod, we don't need status at all because we are 1243 // guaranteed to be terminating and we skip updates to the pod 1244 default: 1245 // wait until we see the next refresh from the PLEG via the cache (max 2s) 1246 // TODO: this adds ~1s of latency on all transitions from sync to terminating 1247 // to terminated, and on all termination retries (including evictions). We should 1248 // improve latency by making the pleg continuous and by allowing pod status 1249 // changes to be refreshed when key events happen (killPod, sync->terminating). 1250 // Improving this latency also reduces the possibility that a terminated 1251 // container's status is garbage collected before we have a chance to update the 1252 // API server (thus losing the exit code). 1253 status, err = p.podCache.GetNewerThan(update.Options.Pod.UID, lastSyncTime) 1254 1255 if err != nil { 1256 // This is the legacy event thrown by manage pod loop all other events are now dispatched 1257 // from syncPodFn 1258 p.recorder.Eventf(update.Options.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) 1259 return err 1260 } 1261 } 1262 1263 // Take the appropriate action (illegal phases are prevented by UpdatePod) 1264 switch { 1265 case update.WorkType == TerminatedPod: 1266 err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status) 1267 1268 case update.WorkType == TerminatingPod: 1269 var gracePeriod *int64 1270 if opt := update.Options.KillPodOptions; opt != nil { 1271 gracePeriod = opt.PodTerminationGracePeriodSecondsOverride 1272 } 1273 podStatusFn := p.acknowledgeTerminating(podUID) 1274 1275 // if we only have a running pod, terminate it directly 1276 if update.Options.RunningPod != nil { 1277 err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod) 1278 } else { 1279 err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn) 1280 } 1281 1282 default: 1283 isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status) 1284 } 1285 1286 lastSyncTime = p.clock.Now() 1287 return err 1288 }() 1289 1290 var phaseTransition bool 1291 switch { 1292 case err == context.Canceled: 1293 // when the context is cancelled we expect an update to already be queued 1294 klog.V(2).InfoS("Sync exited with context cancellation error", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) 1295 1296 case err != nil: 1297 // we will queue a retry 1298 klog.ErrorS(err, "Error syncing pod, skipping", "pod", podRef, "podUID", podUID) 1299 1300 case update.WorkType == TerminatedPod: 1301 // we can shut down the worker 1302 p.completeTerminated(podUID) 1303 if start := update.Options.StartTime; !start.IsZero() { 1304 metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start)) 1305 } 1306 klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) 1307 return 1308 1309 case update.WorkType == TerminatingPod: 1310 // pods that don't exist in config don't need to be terminated, other loops will clean them up 1311 if update.Options.RunningPod != nil { 1312 p.completeTerminatingRuntimePod(podUID) 1313 if start := update.Options.StartTime; !start.IsZero() { 1314 metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) 1315 } 1316 klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) 1317 return 1318 } 1319 // otherwise we move to the terminating phase 1320 p.completeTerminating(podUID) 1321 phaseTransition = true 1322 1323 case isTerminal: 1324 // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating 1325 klog.V(4).InfoS("Pod is terminal", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) 1326 p.completeSync(podUID) 1327 phaseTransition = true 1328 } 1329 1330 // queue a retry if necessary, then put the next event in the channel if any 1331 p.completeWork(podUID, phaseTransition, err) 1332 if start := update.Options.StartTime; !start.IsZero() { 1333 metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start)) 1334 } 1335 klog.V(4).InfoS("Processing pod event done", "pod", podRef, "podUID", podUID, "updateType", update.WorkType) 1336 } 1337 } 1338 1339 // acknowledgeTerminating sets the terminating flag on the pod status once the pod worker sees 1340 // the termination state so that other components know no new containers will be started in this 1341 // pod. It then returns the status function, if any, that applies to this pod. 1342 func (p *podWorkers) acknowledgeTerminating(podUID types.UID) PodStatusFunc { 1343 p.podLock.Lock() 1344 defer p.podLock.Unlock() 1345 1346 status, ok := p.podSyncStatuses[podUID] 1347 if !ok { 1348 return nil 1349 } 1350 1351 if !status.terminatingAt.IsZero() && !status.startedTerminating { 1352 klog.V(4).InfoS("Pod worker has observed request to terminate", "podUID", podUID) 1353 status.startedTerminating = true 1354 } 1355 1356 if l := len(status.statusPostTerminating); l > 0 { 1357 return status.statusPostTerminating[l-1] 1358 } 1359 return nil 1360 } 1361 1362 // completeSync is invoked when syncPod completes successfully and indicates the pod is now terminal and should 1363 // be terminated. This happens when the natural pod lifecycle completes - any pod which is not RestartAlways 1364 // exits. Unnatural completions, such as evictions, API driven deletion or phase transition, are handled by 1365 // UpdatePod. 1366 func (p *podWorkers) completeSync(podUID types.UID) { 1367 p.podLock.Lock() 1368 defer p.podLock.Unlock() 1369 1370 klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "podUID", podUID) 1371 1372 status, ok := p.podSyncStatuses[podUID] 1373 if !ok { 1374 klog.V(4).InfoS("Pod had no status in completeSync, programmer error?", "podUID", podUID) 1375 return 1376 } 1377 1378 // update the status of the pod 1379 if status.terminatingAt.IsZero() { 1380 status.terminatingAt = p.clock.Now() 1381 } else { 1382 klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "podUID", podUID) 1383 } 1384 status.startedTerminating = true 1385 1386 // the pod has now transitioned to terminating and we want to run syncTerminatingPod 1387 // as soon as possible, so if no update is already waiting queue a synthetic update 1388 p.requeueLastPodUpdate(podUID, status) 1389 } 1390 1391 // completeTerminating is invoked when syncTerminatingPod completes successfully, which means 1392 // no container is running, no container will be started in the future, and we are ready for 1393 // cleanup. This updates the termination state which prevents future syncs and will ensure 1394 // other kubelet loops know this pod is not running any containers. 1395 func (p *podWorkers) completeTerminating(podUID types.UID) { 1396 p.podLock.Lock() 1397 defer p.podLock.Unlock() 1398 1399 klog.V(4).InfoS("Pod terminated all containers successfully", "podUID", podUID) 1400 1401 status, ok := p.podSyncStatuses[podUID] 1402 if !ok { 1403 return 1404 } 1405 1406 // update the status of the pod 1407 if status.terminatingAt.IsZero() { 1408 klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) 1409 } 1410 status.terminatedAt = p.clock.Now() 1411 for _, ch := range status.notifyPostTerminating { 1412 close(ch) 1413 } 1414 status.notifyPostTerminating = nil 1415 status.statusPostTerminating = nil 1416 1417 // the pod has now transitioned to terminated and we want to run syncTerminatedPod 1418 // as soon as possible, so if no update is already waiting queue a synthetic update 1419 p.requeueLastPodUpdate(podUID, status) 1420 } 1421 1422 // completeTerminatingRuntimePod is invoked when syncTerminatingPod completes successfully, 1423 // which means an orphaned pod (no config) is terminated and we can exit. Since orphaned 1424 // pods have no API representation, we want to exit the loop at this point and ensure no 1425 // status is present afterwards - the running pod is truly terminated when this is invoked. 1426 func (p *podWorkers) completeTerminatingRuntimePod(podUID types.UID) { 1427 p.podLock.Lock() 1428 defer p.podLock.Unlock() 1429 1430 klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "podUID", podUID) 1431 1432 p.cleanupPodUpdates(podUID) 1433 1434 status, ok := p.podSyncStatuses[podUID] 1435 if !ok { 1436 return 1437 } 1438 if status.terminatingAt.IsZero() { 1439 klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "podUID", podUID) 1440 } 1441 status.terminatedAt = p.clock.Now() 1442 status.finished = true 1443 status.working = false 1444 1445 if p.startedStaticPodsByFullname[status.fullname] == podUID { 1446 delete(p.startedStaticPodsByFullname, status.fullname) 1447 } 1448 1449 // A runtime pod is transient and not part of the desired state - once it has reached 1450 // terminated we can abandon tracking it. 1451 delete(p.podSyncStatuses, podUID) 1452 } 1453 1454 // completeTerminated is invoked after syncTerminatedPod completes successfully and means we 1455 // can stop the pod worker. The pod is finalized at this point. 1456 func (p *podWorkers) completeTerminated(podUID types.UID) { 1457 p.podLock.Lock() 1458 defer p.podLock.Unlock() 1459 1460 klog.V(4).InfoS("Pod is complete and the worker can now stop", "podUID", podUID) 1461 1462 p.cleanupPodUpdates(podUID) 1463 1464 status, ok := p.podSyncStatuses[podUID] 1465 if !ok { 1466 return 1467 } 1468 if status.terminatingAt.IsZero() { 1469 klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "podUID", podUID) 1470 } 1471 if status.terminatedAt.IsZero() { 1472 klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "podUID", podUID) 1473 } 1474 status.finished = true 1475 status.working = false 1476 1477 if p.startedStaticPodsByFullname[status.fullname] == podUID { 1478 delete(p.startedStaticPodsByFullname, status.fullname) 1479 } 1480 } 1481 1482 // completeWork requeues on error or the next sync interval and then immediately executes any pending 1483 // work. 1484 func (p *podWorkers) completeWork(podUID types.UID, phaseTransition bool, syncErr error) { 1485 // Requeue the last update if the last sync returned error. 1486 switch { 1487 case phaseTransition: 1488 p.workQueue.Enqueue(podUID, 0) 1489 case syncErr == nil: 1490 // No error; requeue at the regular resync interval. 1491 p.workQueue.Enqueue(podUID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor)) 1492 case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg): 1493 // Network is not ready; back off for short period of time and retry as network might be ready soon. 1494 p.workQueue.Enqueue(podUID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor)) 1495 default: 1496 // Error occurred during the sync; back off and then retry. 1497 p.workQueue.Enqueue(podUID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor)) 1498 } 1499 1500 // if there is a pending update for this worker, requeue immediately, otherwise 1501 // clear working status 1502 p.podLock.Lock() 1503 defer p.podLock.Unlock() 1504 if status, ok := p.podSyncStatuses[podUID]; ok { 1505 if status.pendingUpdate != nil { 1506 select { 1507 case p.podUpdates[podUID] <- struct{}{}: 1508 klog.V(4).InfoS("Requeueing pod due to pending update", "podUID", podUID) 1509 default: 1510 klog.V(4).InfoS("Pending update already queued", "podUID", podUID) 1511 } 1512 } else { 1513 status.working = false 1514 } 1515 } 1516 } 1517 1518 // SyncKnownPods will purge any fully terminated pods that are not in the desiredPods 1519 // list, which means SyncKnownPods must be called in a threadsafe manner from calls 1520 // to UpdatePods for new pods. Because the podworker is dependent on UpdatePod being 1521 // invoked to drive a pod's state machine, if a pod is missing in the desired list the 1522 // pod worker must be responsible for delivering that update. The method returns a map 1523 // of known workers that are not finished with a value of SyncPodTerminated, 1524 // SyncPodKill, or SyncPodSync depending on whether the pod is terminated, terminating, 1525 // or syncing. 1526 func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync { 1527 workers := make(map[types.UID]PodWorkerSync) 1528 known := make(map[types.UID]struct{}) 1529 for _, pod := range desiredPods { 1530 known[pod.UID] = struct{}{} 1531 } 1532 1533 p.podLock.Lock() 1534 defer p.podLock.Unlock() 1535 1536 p.podsSynced = true 1537 for uid, status := range p.podSyncStatuses { 1538 // We retain the worker history of any pod that is still desired according to 1539 // its UID. However, there are two scenarios during a sync that result in us 1540 // needing to purge the history: 1541 // 1542 // 1. The pod is no longer desired (the local version is orphaned) 1543 // 2. The pod received a kill update and then a subsequent create, which means 1544 // the UID was reused in the source config (vanishingly rare for API servers, 1545 // common for static pods that have specified a fixed UID) 1546 // 1547 // In the former case we wish to bound the amount of information we store for 1548 // deleted pods. In the latter case we wish to minimize the amount of time before 1549 // we restart the static pod. If we succeed at removing the worker, then we 1550 // omit it from the returned map of known workers, and the caller of SyncKnownPods 1551 // is expected to send a new UpdatePod({UpdateType: Create}). 1552 _, knownPod := known[uid] 1553 orphan := !knownPod 1554 if status.restartRequested || orphan { 1555 if p.removeTerminatedWorker(uid, status, orphan) { 1556 // no worker running, we won't return it 1557 continue 1558 } 1559 } 1560 1561 sync := PodWorkerSync{ 1562 State: status.WorkType(), 1563 Orphan: orphan, 1564 } 1565 switch { 1566 case status.activeUpdate != nil: 1567 if status.activeUpdate.Pod != nil { 1568 sync.HasConfig = true 1569 sync.Static = kubetypes.IsStaticPod(status.activeUpdate.Pod) 1570 } 1571 case status.pendingUpdate != nil: 1572 if status.pendingUpdate.Pod != nil { 1573 sync.HasConfig = true 1574 sync.Static = kubetypes.IsStaticPod(status.pendingUpdate.Pod) 1575 } 1576 } 1577 workers[uid] = sync 1578 } 1579 return workers 1580 } 1581 1582 // removeTerminatedWorker cleans up and removes the worker status for a worker 1583 // that has reached a terminal state of "finished" - has successfully exited 1584 // syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be 1585 // recreated with the same UID. The kubelet preserves state about recently 1586 // terminated pods to prevent accidentally restarting a terminal pod, which is 1587 // proportional to the number of pods described in the pod config. The method 1588 // returns true if the worker was completely removed. 1589 func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus, orphaned bool) bool { 1590 if !status.finished { 1591 // If the pod worker has not reached terminal state and the pod is still known, we wait. 1592 if !orphaned { 1593 klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid) 1594 return false 1595 } 1596 1597 // all orphaned pods are considered deleted 1598 status.deleted = true 1599 1600 // When a pod is no longer in the desired set, the pod is considered orphaned and the 1601 // the pod worker becomes responsible for driving the pod to completion (there is no 1602 // guarantee another component will notify us of updates). 1603 switch { 1604 case !status.IsStarted() && !status.observedRuntime: 1605 // The pod has not been started, which means we can safely clean up the pod - the 1606 // pod worker will shutdown as a result of this change without executing a sync. 1607 klog.V(4).InfoS("Pod is orphaned and has not been started", "podUID", uid) 1608 case !status.IsTerminationRequested(): 1609 // The pod has been started but termination has not been requested - set the appropriate 1610 // timestamp and notify the pod worker. Because the pod has been synced at least once, 1611 // the value of status.activeUpdate will be the fallback for the next sync. 1612 status.terminatingAt = p.clock.Now() 1613 if status.activeUpdate != nil && status.activeUpdate.Pod != nil { 1614 status.gracePeriod, _ = calculateEffectiveGracePeriod(status, status.activeUpdate.Pod, nil) 1615 } else { 1616 status.gracePeriod = 1 1617 } 1618 p.requeueLastPodUpdate(uid, status) 1619 klog.V(4).InfoS("Pod is orphaned and still running, began terminating", "podUID", uid) 1620 return false 1621 default: 1622 // The pod is already moving towards termination, notify the pod worker. Because the pod 1623 // has been synced at least once, the value of status.activeUpdate will be the fallback for 1624 // the next sync. 1625 p.requeueLastPodUpdate(uid, status) 1626 klog.V(4).InfoS("Pod is orphaned and still terminating, notified the pod worker", "podUID", uid) 1627 return false 1628 } 1629 } 1630 1631 if status.restartRequested { 1632 klog.V(4).InfoS("Pod has been terminated but another pod with the same UID was created, remove history to allow restart", "podUID", uid) 1633 } else { 1634 klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid) 1635 } 1636 delete(p.podSyncStatuses, uid) 1637 p.cleanupPodUpdates(uid) 1638 1639 if p.startedStaticPodsByFullname[status.fullname] == uid { 1640 delete(p.startedStaticPodsByFullname, status.fullname) 1641 } 1642 return true 1643 } 1644 1645 // killPodNow returns a KillPodFunc that can be used to kill a pod. 1646 // It is intended to be injected into other modules that need to kill a pod. 1647 func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFunc { 1648 return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, statusFn func(*v1.PodStatus)) error { 1649 // determine the grace period to use when killing the pod 1650 gracePeriod := int64(0) 1651 if gracePeriodOverride != nil { 1652 gracePeriod = *gracePeriodOverride 1653 } else if pod.Spec.TerminationGracePeriodSeconds != nil { 1654 gracePeriod = *pod.Spec.TerminationGracePeriodSeconds 1655 } 1656 1657 // we timeout and return an error if we don't get a callback within a reasonable time. 1658 // the default timeout is relative to the grace period (we settle on 10s to wait for kubelet->runtime traffic to complete in sigkill) 1659 timeout := gracePeriod + (gracePeriod / 2) 1660 minTimeout := int64(10) 1661 if timeout < minTimeout { 1662 timeout = minTimeout 1663 } 1664 timeoutDuration := time.Duration(timeout) * time.Second 1665 1666 // open a channel we block against until we get a result 1667 ch := make(chan struct{}, 1) 1668 podWorkers.UpdatePod(UpdatePodOptions{ 1669 Pod: pod, 1670 UpdateType: kubetypes.SyncPodKill, 1671 KillPodOptions: &KillPodOptions{ 1672 CompletedCh: ch, 1673 Evict: isEvicted, 1674 PodStatusFunc: statusFn, 1675 PodTerminationGracePeriodSecondsOverride: gracePeriodOverride, 1676 }, 1677 }) 1678 1679 // wait for either a response, or a timeout 1680 select { 1681 case <-ch: 1682 return nil 1683 case <-time.After(timeoutDuration): 1684 recorder.Eventf(pod, v1.EventTypeWarning, events.ExceededGracePeriod, "Container runtime did not kill the pod within specified grace period.") 1685 return fmt.Errorf("timeout waiting to kill pod") 1686 } 1687 } 1688 } 1689 1690 // cleanupPodUpdates closes the podUpdates channel and removes it from 1691 // podUpdates map so that the corresponding pod worker can stop. It also 1692 // removes any undelivered work. This method must be called holding the 1693 // pod lock. 1694 func (p *podWorkers) cleanupPodUpdates(uid types.UID) { 1695 if ch, ok := p.podUpdates[uid]; ok { 1696 close(ch) 1697 } 1698 delete(p.podUpdates, uid) 1699 } 1700 1701 // requeueLastPodUpdate creates a new pending pod update from the most recently 1702 // executed update if no update is already queued, and then notifies the pod 1703 // worker goroutine of the update. This method must be called while holding 1704 // the pod lock. 1705 func (p *podWorkers) requeueLastPodUpdate(podUID types.UID, status *podSyncStatus) { 1706 // if there is already an update queued, we can use that instead, or if 1707 // we have no previously executed update, we cannot replay it. 1708 if status.pendingUpdate != nil || status.activeUpdate == nil { 1709 return 1710 } 1711 copied := *status.activeUpdate 1712 status.pendingUpdate = &copied 1713 1714 // notify the pod worker 1715 status.working = true 1716 select { 1717 case p.podUpdates[podUID] <- struct{}{}: 1718 default: 1719 } 1720 } 1721