1
16
17 package kuberuntime
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "os"
24 "path/filepath"
25 "sort"
26 "time"
27
28 cadvisorapi "github.com/google/cadvisor/info/v1"
29 "github.com/google/go-cmp/cmp"
30 "go.opentelemetry.io/otel/trace"
31 crierror "k8s.io/cri-api/pkg/errors"
32 "k8s.io/klog/v2"
33
34 v1 "k8s.io/api/core/v1"
35 "k8s.io/apimachinery/pkg/api/resource"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 kubetypes "k8s.io/apimachinery/pkg/types"
38 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
39 utilversion "k8s.io/apimachinery/pkg/util/version"
40 utilfeature "k8s.io/apiserver/pkg/util/feature"
41 "k8s.io/client-go/tools/record"
42 ref "k8s.io/client-go/tools/reference"
43 "k8s.io/client-go/util/flowcontrol"
44 "k8s.io/component-base/logs/logreduction"
45 internalapi "k8s.io/cri-api/pkg/apis"
46 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
47
48 "k8s.io/kubernetes/pkg/api/legacyscheme"
49 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
50 "k8s.io/kubernetes/pkg/credentialprovider"
51 "k8s.io/kubernetes/pkg/credentialprovider/plugin"
52 "k8s.io/kubernetes/pkg/features"
53 "k8s.io/kubernetes/pkg/kubelet/cm"
54 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
55 "k8s.io/kubernetes/pkg/kubelet/events"
56 "k8s.io/kubernetes/pkg/kubelet/images"
57 runtimeutil "k8s.io/kubernetes/pkg/kubelet/kuberuntime/util"
58 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
59 "k8s.io/kubernetes/pkg/kubelet/logs"
60 "k8s.io/kubernetes/pkg/kubelet/metrics"
61 proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
62 "k8s.io/kubernetes/pkg/kubelet/runtimeclass"
63 "k8s.io/kubernetes/pkg/kubelet/sysctl"
64 "k8s.io/kubernetes/pkg/kubelet/types"
65 "k8s.io/kubernetes/pkg/kubelet/util/cache"
66 "k8s.io/kubernetes/pkg/kubelet/util/format"
67 sc "k8s.io/kubernetes/pkg/securitycontext"
68 )
69
70 const (
71
72 kubeRuntimeAPIVersion = "0.1.0"
73
74 minimumGracePeriodInSeconds = 2
75
76
77 versionCacheTTL = 60 * time.Second
78
79 identicalErrorDelay = 1 * time.Minute
80
81 instrumentationScope = "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
82 )
83
84 var (
85
86 ErrVersionNotSupported = errors.New("runtime api version is not supported")
87 )
88
89
90
91 type podStateProvider interface {
92 IsPodTerminationRequested(kubetypes.UID) bool
93 ShouldPodContentBeRemoved(kubetypes.UID) bool
94 ShouldPodRuntimeBeRemoved(kubetypes.UID) bool
95 }
96
97 type kubeGenericRuntimeManager struct {
98 runtimeName string
99 recorder record.EventRecorder
100 osInterface kubecontainer.OSInterface
101
102
103 machineInfo *cadvisorapi.MachineInfo
104
105
106 containerGC *containerGC
107
108
109 keyring credentialprovider.DockerKeyring
110
111
112 runner kubecontainer.HandlerRunner
113
114
115 runtimeHelper kubecontainer.RuntimeHelper
116
117
118 livenessManager proberesults.Manager
119 readinessManager proberesults.Manager
120 startupManager proberesults.Manager
121
122
123 cpuCFSQuota bool
124
125
126 cpuCFSQuotaPeriod metav1.Duration
127
128
129 imagePuller images.ImageManager
130
131
132 runtimeService internalapi.RuntimeService
133 imageService internalapi.ImageManagerService
134
135
136 versionCache *cache.ObjectCache
137
138
139 seccompProfileRoot string
140
141
142 containerManager cm.ContainerManager
143
144
145 internalLifecycle cm.InternalContainerLifecycle
146
147
148 logManager logs.ContainerLogManager
149
150
151 runtimeClassManager *runtimeclass.Manager
152
153
154 logReduction *logreduction.LogReduction
155
156
157 podStateProvider podStateProvider
158
159
160 seccompDefault bool
161
162
163 memorySwapBehavior string
164
165
166 getNodeAllocatable func() v1.ResourceList
167
168
169 memoryThrottlingFactor float64
170
171
172 podLogsDirectory string
173 }
174
175
176 type KubeGenericRuntime interface {
177 kubecontainer.Runtime
178 kubecontainer.StreamingRuntime
179 kubecontainer.CommandRunner
180 }
181
182
183 func NewKubeGenericRuntimeManager(
184 recorder record.EventRecorder,
185 livenessManager proberesults.Manager,
186 readinessManager proberesults.Manager,
187 startupManager proberesults.Manager,
188 rootDirectory string,
189 podLogsDirectory string,
190 machineInfo *cadvisorapi.MachineInfo,
191 podStateProvider podStateProvider,
192 osInterface kubecontainer.OSInterface,
193 runtimeHelper kubecontainer.RuntimeHelper,
194 insecureContainerLifecycleHTTPClient types.HTTPDoer,
195 imageBackOff *flowcontrol.Backoff,
196 serializeImagePulls bool,
197 maxParallelImagePulls *int32,
198 imagePullQPS float32,
199 imagePullBurst int,
200 imageCredentialProviderConfigFile string,
201 imageCredentialProviderBinDir string,
202 cpuCFSQuota bool,
203 cpuCFSQuotaPeriod metav1.Duration,
204 runtimeService internalapi.RuntimeService,
205 imageService internalapi.ImageManagerService,
206 containerManager cm.ContainerManager,
207 logManager logs.ContainerLogManager,
208 runtimeClassManager *runtimeclass.Manager,
209 seccompDefault bool,
210 memorySwapBehavior string,
211 getNodeAllocatable func() v1.ResourceList,
212 memoryThrottlingFactor float64,
213 podPullingTimeRecorder images.ImagePodPullingTimeRecorder,
214 tracerProvider trace.TracerProvider,
215 ) (KubeGenericRuntime, error) {
216 ctx := context.Background()
217 runtimeService = newInstrumentedRuntimeService(runtimeService)
218 imageService = newInstrumentedImageManagerService(imageService)
219 tracer := tracerProvider.Tracer(instrumentationScope)
220 kubeRuntimeManager := &kubeGenericRuntimeManager{
221 recorder: recorder,
222 cpuCFSQuota: cpuCFSQuota,
223 cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
224 seccompProfileRoot: filepath.Join(rootDirectory, "seccomp"),
225 livenessManager: livenessManager,
226 readinessManager: readinessManager,
227 startupManager: startupManager,
228 machineInfo: machineInfo,
229 osInterface: osInterface,
230 runtimeHelper: runtimeHelper,
231 runtimeService: runtimeService,
232 imageService: imageService,
233 containerManager: containerManager,
234 internalLifecycle: containerManager.InternalContainerLifecycle(),
235 logManager: logManager,
236 runtimeClassManager: runtimeClassManager,
237 logReduction: logreduction.NewLogReduction(identicalErrorDelay),
238 seccompDefault: seccompDefault,
239 memorySwapBehavior: memorySwapBehavior,
240 getNodeAllocatable: getNodeAllocatable,
241 memoryThrottlingFactor: memoryThrottlingFactor,
242 podLogsDirectory: podLogsDirectory,
243 }
244
245 typedVersion, err := kubeRuntimeManager.getTypedVersion(ctx)
246 if err != nil {
247 klog.ErrorS(err, "Get runtime version failed")
248 return nil, err
249 }
250
251
252
253 if typedVersion.Version != kubeRuntimeAPIVersion {
254 klog.ErrorS(err, "This runtime api version is not supported",
255 "apiVersion", typedVersion.Version,
256 "supportedAPIVersion", kubeRuntimeAPIVersion)
257 return nil, ErrVersionNotSupported
258 }
259
260 kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
261 klog.InfoS("Container runtime initialized",
262 "containerRuntime", typedVersion.RuntimeName,
263 "version", typedVersion.RuntimeVersion,
264 "apiVersion", typedVersion.RuntimeApiVersion)
265
266 if imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "" {
267 if err := plugin.RegisterCredentialProviderPlugins(imageCredentialProviderConfigFile, imageCredentialProviderBinDir); err != nil {
268 klog.ErrorS(err, "Failed to register CRI auth plugins")
269 os.Exit(1)
270 }
271 }
272 kubeRuntimeManager.keyring = credentialprovider.NewDockerKeyring()
273
274 kubeRuntimeManager.imagePuller = images.NewImageManager(
275 kubecontainer.FilterEventRecorder(recorder),
276 kubeRuntimeManager,
277 imageBackOff,
278 serializeImagePulls,
279 maxParallelImagePulls,
280 imagePullQPS,
281 imagePullBurst,
282 podPullingTimeRecorder)
283 kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(insecureContainerLifecycleHTTPClient, kubeRuntimeManager, kubeRuntimeManager, recorder)
284 kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager, tracer)
285 kubeRuntimeManager.podStateProvider = podStateProvider
286
287 kubeRuntimeManager.versionCache = cache.NewObjectCache(
288 func() (interface{}, error) {
289 return kubeRuntimeManager.getTypedVersion(ctx)
290 },
291 versionCacheTTL,
292 )
293
294 return kubeRuntimeManager, nil
295 }
296
297
298 func (m *kubeGenericRuntimeManager) Type() string {
299 return m.runtimeName
300 }
301
302 func newRuntimeVersion(version string) (*utilversion.Version, error) {
303 if ver, err := utilversion.ParseSemantic(version); err == nil {
304 return ver, err
305 }
306 return utilversion.ParseGeneric(version)
307 }
308
309 func (m *kubeGenericRuntimeManager) getTypedVersion(ctx context.Context) (*runtimeapi.VersionResponse, error) {
310 typedVersion, err := m.runtimeService.Version(ctx, kubeRuntimeAPIVersion)
311 if err != nil {
312 return nil, fmt.Errorf("get remote runtime typed version failed: %v", err)
313 }
314 return typedVersion, nil
315 }
316
317
318 func (m *kubeGenericRuntimeManager) Version(ctx context.Context) (kubecontainer.Version, error) {
319 typedVersion, err := m.getTypedVersion(ctx)
320 if err != nil {
321 return nil, err
322 }
323
324 return newRuntimeVersion(typedVersion.RuntimeVersion)
325 }
326
327
328
329
330 func (m *kubeGenericRuntimeManager) APIVersion() (kubecontainer.Version, error) {
331 versionObject, err := m.versionCache.Get(m.machineInfo.MachineID)
332 if err != nil {
333 return nil, err
334 }
335 typedVersion := versionObject.(*runtimeapi.VersionResponse)
336
337 return newRuntimeVersion(typedVersion.RuntimeApiVersion)
338 }
339
340
341
342 func (m *kubeGenericRuntimeManager) Status(ctx context.Context) (*kubecontainer.RuntimeStatus, error) {
343 resp, err := m.runtimeService.Status(ctx, false)
344 if err != nil {
345 return nil, err
346 }
347 if resp.GetStatus() == nil {
348 return nil, errors.New("runtime status is nil")
349 }
350 return toKubeRuntimeStatus(resp.GetStatus(), resp.GetRuntimeHandlers()), nil
351 }
352
353
354
355
356 func (m *kubeGenericRuntimeManager) GetPods(ctx context.Context, all bool) ([]*kubecontainer.Pod, error) {
357 pods := make(map[kubetypes.UID]*kubecontainer.Pod)
358 sandboxes, err := m.getKubeletSandboxes(ctx, all)
359 if err != nil {
360 return nil, err
361 }
362 for i := range sandboxes {
363 s := sandboxes[i]
364 if s.Metadata == nil {
365 klog.V(4).InfoS("Sandbox does not have metadata", "sandbox", s)
366 continue
367 }
368 podUID := kubetypes.UID(s.Metadata.Uid)
369 if _, ok := pods[podUID]; !ok {
370 pods[podUID] = &kubecontainer.Pod{
371 ID: podUID,
372 Name: s.Metadata.Name,
373 Namespace: s.Metadata.Namespace,
374 }
375 }
376 p := pods[podUID]
377 converted, err := m.sandboxToKubeContainer(s)
378 if err != nil {
379 klog.V(4).InfoS("Convert sandbox of pod failed", "runtimeName", m.runtimeName, "sandbox", s, "podUID", podUID, "err", err)
380 continue
381 }
382 p.Sandboxes = append(p.Sandboxes, converted)
383 p.CreatedAt = uint64(s.GetCreatedAt())
384 }
385
386 containers, err := m.getKubeletContainers(ctx, all)
387 if err != nil {
388 return nil, err
389 }
390 for i := range containers {
391 c := containers[i]
392 if c.Metadata == nil {
393 klog.V(4).InfoS("Container does not have metadata", "container", c)
394 continue
395 }
396
397 labelledInfo := getContainerInfoFromLabels(c.Labels)
398 pod, found := pods[labelledInfo.PodUID]
399 if !found {
400 pod = &kubecontainer.Pod{
401 ID: labelledInfo.PodUID,
402 Name: labelledInfo.PodName,
403 Namespace: labelledInfo.PodNamespace,
404 }
405 pods[labelledInfo.PodUID] = pod
406 }
407
408 converted, err := m.toKubeContainer(c)
409 if err != nil {
410 klog.V(4).InfoS("Convert container of pod failed", "runtimeName", m.runtimeName, "container", c, "podUID", labelledInfo.PodUID, "err", err)
411 continue
412 }
413
414 pod.Containers = append(pod.Containers, converted)
415 }
416
417
418 var result []*kubecontainer.Pod
419 for _, pod := range pods {
420 result = append(result, pod)
421 }
422
423
424
425
426
427
428 sort.SliceStable(result, func(i, j int) bool {
429 return result[i].CreatedAt > result[j].CreatedAt
430 })
431 klog.V(4).InfoS("Retrieved pods from runtime", "all", all)
432 return result, nil
433 }
434
435
436 type containerKillReason string
437
438 const (
439 reasonStartupProbe containerKillReason = "StartupProbe"
440 reasonLivenessProbe containerKillReason = "LivenessProbe"
441 reasonFailedPostStartHook containerKillReason = "FailedPostStartHook"
442 reasonUnknown containerKillReason = "Unknown"
443 )
444
445
446 type containerToKillInfo struct {
447
448 container *v1.Container
449
450 name string
451
452 message string
453
454
455 reason containerKillReason
456 }
457
458
459 type containerResources struct {
460 memoryLimit int64
461 memoryRequest int64
462 cpuLimit int64
463 cpuRequest int64
464 }
465
466
467 type containerToUpdateInfo struct {
468
469 apiContainerIdx int
470
471 kubeContainerID kubecontainer.ContainerID
472
473 desiredContainerResources containerResources
474
475 currentContainerResources *containerResources
476 }
477
478
479 type podActions struct {
480
481 KillPod bool
482
483
484 CreateSandbox bool
485
486 SandboxID string
487
488 Attempt uint32
489
490
491 NextInitContainerToStart *v1.Container
492
493
494
495
496
497 InitContainersToStart []int
498
499
500
501 ContainersToStart []int
502
503
504
505 ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
506
507
508 EphemeralContainersToStart []int
509
510
511 ContainersToUpdate map[v1.ResourceName][]containerToUpdateInfo
512
513 UpdatePodResources bool
514 }
515
516 func (p podActions) String() string {
517 return fmt.Sprintf("KillPod: %t, CreateSandbox: %t, UpdatePodResources: %t, Attempt: %d, InitContainersToStart: %v, ContainersToStart: %v, EphemeralContainersToStart: %v,ContainersToUpdate: %v, ContainersToKill: %v",
518 p.KillPod, p.CreateSandbox, p.UpdatePodResources, p.Attempt, p.InitContainersToStart, p.ContainersToStart, p.EphemeralContainersToStart, p.ContainersToUpdate, p.ContainersToKill)
519 }
520
521 func containerChanged(container *v1.Container, containerStatus *kubecontainer.Status) (uint64, uint64, bool) {
522 expectedHash := kubecontainer.HashContainer(container)
523 return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash
524 }
525
526 func shouldRestartOnFailure(pod *v1.Pod) bool {
527 return pod.Spec.RestartPolicy != v1.RestartPolicyNever
528 }
529
530 func containerSucceeded(c *v1.Container, podStatus *kubecontainer.PodStatus) bool {
531 cStatus := podStatus.FindContainerStatusByName(c.Name)
532 if cStatus == nil || cStatus.State == kubecontainer.ContainerStateRunning {
533 return false
534 }
535 return cStatus.ExitCode == 0
536 }
537
538 func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
539 if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
540 return false
541 }
542 if types.IsStaticPod(pod) {
543 return false
544 }
545 return true
546 }
547
548 func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool {
549 container := pod.Spec.Containers[containerIdx]
550 if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 {
551 return true
552 }
553
554
555
556
557
558 apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name)
559 if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil ||
560 kubeContainerStatus.State != kubecontainer.ContainerStateRunning ||
561 kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID ||
562 !cmp.Equal(container.Resources.Requests, apiContainerStatus.AllocatedResources) {
563 return true
564 }
565
566 desiredMemoryLimit := container.Resources.Limits.Memory().Value()
567 desiredCPULimit := container.Resources.Limits.Cpu().MilliValue()
568 desiredCPURequest := container.Resources.Requests.Cpu().MilliValue()
569 currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value()
570 currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue()
571 currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue()
572
573 if kubeContainerStatus.Resources != nil {
574 if kubeContainerStatus.Resources.MemoryLimit != nil {
575 currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value()
576 }
577 if kubeContainerStatus.Resources.CPULimit != nil {
578 currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue()
579 }
580 if kubeContainerStatus.Resources.CPURequest != nil {
581 currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue()
582 }
583 }
584
585
586
587 if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest {
588 return true
589 }
590
591 desiredResources := containerResources{
592 memoryLimit: desiredMemoryLimit,
593 memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(),
594 cpuLimit: desiredCPULimit,
595 cpuRequest: desiredCPURequest,
596 }
597 currentResources := containerResources{
598 memoryLimit: currentMemoryLimit,
599 memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(),
600 cpuLimit: currentCPULimit,
601 cpuRequest: currentCPURequest,
602 }
603
604 resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy)
605 for _, pol := range container.ResizePolicy {
606 resizePolicy[pol.ResourceName] = pol.RestartPolicy
607 }
608 determineContainerResize := func(rName v1.ResourceName, specValue, statusValue int64) (resize, restart bool) {
609 if specValue == statusValue {
610 return false, false
611 }
612 if resizePolicy[rName] == v1.RestartContainer {
613 return true, true
614 }
615 return true, false
616 }
617 markContainerForUpdate := func(rName v1.ResourceName, specValue, statusValue int64) {
618 cUpdateInfo := containerToUpdateInfo{
619 apiContainerIdx: containerIdx,
620 kubeContainerID: kubeContainerStatus.ID,
621 desiredContainerResources: desiredResources,
622 currentContainerResources: ¤tResources,
623 }
624
625 switch {
626 case specValue > statusValue:
627 changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], cUpdateInfo)
628 case specValue < statusValue:
629 changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], containerToUpdateInfo{})
630 copy(changes.ContainersToUpdate[rName][1:], changes.ContainersToUpdate[rName])
631 changes.ContainersToUpdate[rName][0] = cUpdateInfo
632 }
633 }
634 resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
635 resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
636 resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
637 if restartCPULim || restartCPUReq || restartMemLim {
638
639 changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{
640 name: kubeContainerStatus.Name,
641 container: &pod.Spec.Containers[containerIdx],
642 message: fmt.Sprintf("Container %s resize requires restart", container.Name),
643 }
644 changes.ContainersToStart = append(changes.ContainersToStart, containerIdx)
645 changes.UpdatePodResources = true
646 return false
647 } else {
648 if resizeMemLim {
649 markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
650 }
651 if resizeCPULim {
652 markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
653 } else if resizeCPUReq {
654 markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
655 }
656 }
657 return true
658 }
659
660 func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podContainerChanges podActions, result kubecontainer.PodSyncResult) {
661 pcm := m.containerManager.NewPodContainerManager()
662
663 podResources := cm.ResourceConfigForPod(pod, m.cpuCFSQuota, uint64((m.cpuCFSQuotaPeriod.Duration)/time.Microsecond), false)
664 if podResources == nil {
665 klog.ErrorS(nil, "Unable to get resource configuration", "pod", pod.Name)
666 result.Fail(fmt.Errorf("Unable to get resource configuration processing resize for pod %s", pod.Name))
667 return
668 }
669 setPodCgroupConfig := func(rName v1.ResourceName, setLimitValue bool) error {
670 var err error
671 switch rName {
672 case v1.ResourceCPU:
673 podCpuResources := &cm.ResourceConfig{CPUPeriod: podResources.CPUPeriod}
674 if setLimitValue {
675 podCpuResources.CPUQuota = podResources.CPUQuota
676 } else {
677 podCpuResources.CPUShares = podResources.CPUShares
678 }
679 err = pcm.SetPodCgroupConfig(pod, rName, podCpuResources)
680 case v1.ResourceMemory:
681 err = pcm.SetPodCgroupConfig(pod, rName, podResources)
682 }
683 if err != nil {
684 klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name)
685 }
686 return err
687 }
688
689
690
691
692 resizeContainers := func(rName v1.ResourceName, currPodCgLimValue, newPodCgLimValue, currPodCgReqValue, newPodCgReqValue int64) error {
693 var err error
694 if newPodCgLimValue > currPodCgLimValue {
695 if err = setPodCgroupConfig(rName, true); err != nil {
696 return err
697 }
698 }
699 if newPodCgReqValue > currPodCgReqValue {
700 if err = setPodCgroupConfig(rName, false); err != nil {
701 return err
702 }
703 }
704 if len(podContainerChanges.ContainersToUpdate[rName]) > 0 {
705 if err = m.updatePodContainerResources(pod, rName, podContainerChanges.ContainersToUpdate[rName]); err != nil {
706 klog.ErrorS(err, "updatePodContainerResources failed", "pod", format.Pod(pod), "resource", rName)
707 return err
708 }
709 }
710 if newPodCgLimValue < currPodCgLimValue {
711 err = setPodCgroupConfig(rName, true)
712 }
713 if newPodCgReqValue < currPodCgReqValue {
714 if err = setPodCgroupConfig(rName, false); err != nil {
715 return err
716 }
717 }
718 return err
719 }
720 if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources {
721 if podResources.Memory == nil {
722 klog.ErrorS(nil, "podResources.Memory is nil", "pod", pod.Name)
723 result.Fail(fmt.Errorf("podResources.Memory is nil for pod %s", pod.Name))
724 return
725 }
726 currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory)
727 if err != nil {
728 klog.ErrorS(err, "GetPodCgroupConfig for memory failed", "pod", pod.Name)
729 result.Fail(err)
730 return
731 }
732 currentPodMemoryUsage, err := pcm.GetPodCgroupMemoryUsage(pod)
733 if err != nil {
734 klog.ErrorS(err, "GetPodCgroupMemoryUsage failed", "pod", pod.Name)
735 result.Fail(err)
736 return
737 }
738 if currentPodMemoryUsage >= uint64(*podResources.Memory) {
739 klog.ErrorS(nil, "Aborting attempt to set pod memory limit less than current memory usage", "pod", pod.Name)
740 result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name))
741 return
742 }
743 if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil {
744 result.Fail(errResize)
745 return
746 }
747 }
748 if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources {
749 if podResources.CPUQuota == nil || podResources.CPUShares == nil {
750 klog.ErrorS(nil, "podResources.CPUQuota or podResources.CPUShares is nil", "pod", pod.Name)
751 result.Fail(fmt.Errorf("podResources.CPUQuota or podResources.CPUShares is nil for pod %s", pod.Name))
752 return
753 }
754 currentPodCpuConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceCPU)
755 if err != nil {
756 klog.ErrorS(err, "GetPodCgroupConfig for CPU failed", "pod", pod.Name)
757 result.Fail(err)
758 return
759 }
760 if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota,
761 int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil {
762 result.Fail(errResize)
763 return
764 }
765 }
766 }
767
768 func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, resourceName v1.ResourceName, containersToUpdate []containerToUpdateInfo) error {
769 klog.V(5).InfoS("Updating container resources", "pod", klog.KObj(pod))
770
771 for _, cInfo := range containersToUpdate {
772 container := pod.Spec.Containers[cInfo.apiContainerIdx].DeepCopy()
773
774
775 switch resourceName {
776 case v1.ResourceMemory:
777 container.Resources.Limits = v1.ResourceList{
778 v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuLimit, resource.DecimalSI),
779 v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryLimit, resource.BinarySI),
780 }
781 container.Resources.Requests = v1.ResourceList{
782 v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuRequest, resource.DecimalSI),
783 v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryRequest, resource.BinarySI),
784 }
785 case v1.ResourceCPU:
786 container.Resources.Limits = v1.ResourceList{
787 v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuLimit, resource.DecimalSI),
788 v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryLimit, resource.BinarySI),
789 }
790 container.Resources.Requests = v1.ResourceList{
791 v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuRequest, resource.DecimalSI),
792 v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryRequest, resource.BinarySI),
793 }
794 }
795 if err := m.updateContainerResources(pod, container, cInfo.kubeContainerID); err != nil {
796
797
798 klog.ErrorS(err, "updateContainerResources failed", "container", container.Name, "cID", cInfo.kubeContainerID,
799 "pod", format.Pod(pod), "resourceName", resourceName)
800 return err
801 }
802
803
804
805 switch resourceName {
806 case v1.ResourceMemory:
807 cInfo.currentContainerResources.memoryLimit = cInfo.desiredContainerResources.memoryLimit
808 cInfo.currentContainerResources.memoryRequest = cInfo.desiredContainerResources.memoryRequest
809 case v1.ResourceCPU:
810 cInfo.currentContainerResources.cpuLimit = cInfo.desiredContainerResources.cpuLimit
811 cInfo.currentContainerResources.cpuRequest = cInfo.desiredContainerResources.cpuRequest
812 }
813 }
814 return nil
815 }
816
817
818 func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
819 klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))
820
821 createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)
822 changes := podActions{
823 KillPod: createPodSandbox,
824 CreateSandbox: createPodSandbox,
825 SandboxID: sandboxID,
826 Attempt: attempt,
827 ContainersToStart: []int{},
828 ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),
829 }
830
831
832
833 if createPodSandbox {
834 if !shouldRestartOnFailure(pod) && attempt != 0 && len(podStatus.ContainerStatuses) != 0 {
835
836
837
838
839
840
841
842
843 changes.CreateSandbox = false
844 return changes
845 }
846
847
848 var containersToStart []int
849 for idx, c := range pod.Spec.Containers {
850 if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) {
851 continue
852 }
853 containersToStart = append(containersToStart, idx)
854 }
855
856
857
858 if len(containersToStart) == 0 {
859 hasInitialized := false
860 if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
861 _, _, hasInitialized = findNextInitContainerToRun(pod, podStatus)
862 } else {
863
864
865 hasInitialized = hasAnyRegularContainerCreated(pod, podStatus)
866 }
867
868 if hasInitialized {
869 changes.CreateSandbox = false
870 return changes
871 }
872 }
873
874
875
876 if len(pod.Spec.InitContainers) != 0 {
877
878 if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
879 changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
880 } else {
881 changes.InitContainersToStart = []int{0}
882 }
883
884 return changes
885 }
886 changes.ContainersToStart = containersToStart
887 return changes
888 }
889
890
891 for i := range pod.Spec.EphemeralContainers {
892 c := (*v1.Container)(&pod.Spec.EphemeralContainers[i].EphemeralContainerCommon)
893
894
895 if podStatus.FindContainerStatusByName(c.Name) == nil {
896 changes.EphemeralContainersToStart = append(changes.EphemeralContainersToStart, i)
897 }
898 }
899
900
901 if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
902 initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
903 if !done {
904 if next != nil {
905 initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)
906 if initFailed && !shouldRestartOnFailure(pod) {
907 changes.KillPod = true
908 } else {
909
910 if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {
911 changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{
912 name: next.Name,
913 container: next,
914 message: fmt.Sprintf("Init container is in %q state, try killing it before restart",
915 initLastStatus.State),
916 reason: reasonUnknown,
917 }
918 }
919 changes.NextInitContainerToStart = next
920 }
921 }
922
923
924 return changes
925 }
926 } else {
927 hasInitialized := m.computeInitContainerActions(pod, podStatus, &changes)
928 if changes.KillPod || !hasInitialized {
929
930
931 return changes
932 }
933 }
934
935 if isInPlacePodVerticalScalingAllowed(pod) {
936 changes.ContainersToUpdate = make(map[v1.ResourceName][]containerToUpdateInfo)
937 latestPodStatus, err := m.GetPodStatus(ctx, podStatus.ID, pod.Name, pod.Namespace)
938 if err == nil {
939 podStatus = latestPodStatus
940 }
941 }
942
943
944 keepCount := 0
945
946 for idx, container := range pod.Spec.Containers {
947 containerStatus := podStatus.FindContainerStatusByName(container.Name)
948
949
950
951
952 if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {
953 if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {
954 klog.ErrorS(err, "Internal container post-stop lifecycle hook failed for container in pod with error",
955 "containerName", container.Name, "pod", klog.KObj(pod))
956 }
957 }
958
959
960
961 if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
962 if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
963 klog.V(3).InfoS("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod))
964 changes.ContainersToStart = append(changes.ContainersToStart, idx)
965 if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {
966
967
968
969 changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
970 name: containerStatus.Name,
971 container: &pod.Spec.Containers[idx],
972 message: fmt.Sprintf("Container is in %q state, try killing it before restart",
973 containerStatus.State),
974 reason: reasonUnknown,
975 }
976 }
977 }
978 continue
979 }
980
981 var message string
982 var reason containerKillReason
983 restart := shouldRestartOnFailure(pod)
984
985 if _, _, changed := containerChanged(&container, containerStatus); changed &&
986 (!isInPlacePodVerticalScalingAllowed(pod) ||
987 kubecontainer.HashContainerWithoutResources(&container) != containerStatus.HashWithoutResources) {
988 message = fmt.Sprintf("Container %s definition changed", container.Name)
989
990
991 restart = true
992 } else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
993
994 message = fmt.Sprintf("Container %s failed liveness probe", container.Name)
995 reason = reasonLivenessProbe
996 } else if startup, found := m.startupManager.Get(containerStatus.ID); found && startup == proberesults.Failure {
997
998 message = fmt.Sprintf("Container %s failed startup probe", container.Name)
999 reason = reasonStartupProbe
1000 } else if isInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) {
1001
1002 continue
1003 } else {
1004
1005 keepCount++
1006 continue
1007 }
1008
1009
1010
1011
1012 if restart {
1013 message = fmt.Sprintf("%s, will be restarted", message)
1014 changes.ContainersToStart = append(changes.ContainersToStart, idx)
1015 }
1016
1017 changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
1018 name: containerStatus.Name,
1019 container: &pod.Spec.Containers[idx],
1020 message: message,
1021 reason: reason,
1022 }
1023 klog.V(2).InfoS("Message for Container of pod", "containerName", container.Name, "containerStatusID", containerStatus.ID, "pod", klog.KObj(pod), "containerMessage", message)
1024 }
1025
1026 if keepCount == 0 && len(changes.ContainersToStart) == 0 {
1027 changes.KillPod = true
1028 if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
1029
1030
1031 changes.InitContainersToStart = nil
1032 }
1033 }
1034
1035 return changes
1036 }
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048 func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
1049
1050 podContainerChanges := m.computePodActions(ctx, pod, podStatus)
1051 klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
1052 if podContainerChanges.CreateSandbox {
1053 ref, err := ref.GetReference(legacyscheme.Scheme, pod)
1054 if err != nil {
1055 klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
1056 }
1057 if podContainerChanges.SandboxID != "" {
1058 m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
1059 } else {
1060 klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
1061 }
1062 }
1063
1064
1065 if podContainerChanges.KillPod {
1066 if podContainerChanges.CreateSandbox {
1067 klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
1068 } else {
1069 klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
1070 }
1071
1072 killResult := m.killPodWithSyncResult(ctx, pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
1073 result.AddPodSyncResult(killResult)
1074 if killResult.Error() != nil {
1075 klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
1076 return
1077 }
1078
1079 if podContainerChanges.CreateSandbox {
1080 m.purgeInitContainers(ctx, pod, podStatus)
1081 }
1082 } else {
1083
1084 for containerID, containerInfo := range podContainerChanges.ContainersToKill {
1085 klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
1086 killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
1087 result.AddSyncResult(killContainerResult)
1088 if err := m.killContainer(ctx, pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil, nil); err != nil {
1089 killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
1090 klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
1091 return
1092 }
1093 }
1094 }
1095
1096
1097
1098
1099 m.pruneInitContainersBeforeStart(ctx, pod, podStatus)
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111 var podIPs []string
1112 if podStatus != nil {
1113 podIPs = podStatus.IPs
1114 }
1115
1116
1117 podSandboxID := podContainerChanges.SandboxID
1118 if podContainerChanges.CreateSandbox {
1119 var msg string
1120 var err error
1121
1122 klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
1123 metrics.StartedPodsTotal.Inc()
1124 createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
1125 result.AddSyncResult(createSandboxResult)
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135 sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)
1136
1137
1138 if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
1139 if err := m.runtimeHelper.PrepareDynamicResources(pod); err != nil {
1140 ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
1141 if referr != nil {
1142 klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
1143 return
1144 }
1145 m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedPrepareDynamicResources, "Failed to prepare dynamic resources: %v", err)
1146 klog.ErrorS(err, "Failed to prepare dynamic resources", "pod", klog.KObj(pod))
1147 return
1148 }
1149 }
1150
1151 podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
1152 if err != nil {
1153
1154
1155
1156
1157
1158
1159
1160 if m.podStateProvider.IsPodTerminationRequested(pod.UID) {
1161 klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)
1162 return
1163 }
1164 metrics.StartedPodsErrorsTotal.Inc()
1165 createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
1166 klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))
1167 ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
1168 if referr != nil {
1169 klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
1170 }
1171 m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
1172 return
1173 }
1174 klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))
1175
1176 resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
1177 if err != nil {
1178 ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
1179 if referr != nil {
1180 klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
1181 }
1182 m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
1183 klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))
1184 result.Fail(err)
1185 return
1186 }
1187 if resp.GetStatus() == nil {
1188 result.Fail(errors.New("pod sandbox status is nil"))
1189 return
1190 }
1191
1192
1193
1194 if !kubecontainer.IsHostNetworkPod(pod) {
1195
1196 podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, resp.GetStatus())
1197 klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
1198 }
1199 }
1200
1201
1202
1203
1204 podIP := ""
1205 if len(podIPs) != 0 {
1206 podIP = podIPs[0]
1207 }
1208
1209
1210 configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
1211 result.AddSyncResult(configPodSandboxResult)
1212 podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
1213 if err != nil {
1214 message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
1215 klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
1216 configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
1217 return
1218 }
1219
1220
1221
1222
1223
1224
1225 start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
1226 startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
1227 result.AddSyncResult(startContainerResult)
1228
1229 isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
1230 if isInBackOff {
1231 startContainerResult.Fail(err, msg)
1232 klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
1233 return err
1234 }
1235
1236 metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
1237 if sc.HasWindowsHostProcessRequest(pod, spec.container) {
1238 metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
1239 }
1240 klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
1241
1242 if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
1243
1244
1245 metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
1246 if sc.HasWindowsHostProcessRequest(pod, spec.container) {
1247 metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
1248 }
1249 startContainerResult.Fail(err, msg)
1250
1251
1252 switch {
1253 case err == images.ErrImagePullBackOff:
1254 klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
1255 default:
1256 utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
1257 }
1258 return err
1259 }
1260
1261 return nil
1262 }
1263
1264
1265
1266
1267
1268 for _, idx := range podContainerChanges.EphemeralContainersToStart {
1269 start(ctx, "ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
1270 }
1271
1272 if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
1273
1274 if container := podContainerChanges.NextInitContainerToStart; container != nil {
1275
1276 if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
1277 return
1278 }
1279
1280
1281 klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
1282 }
1283 } else {
1284
1285 for _, idx := range podContainerChanges.InitContainersToStart {
1286 container := &pod.Spec.InitContainers[idx]
1287
1288 if err := start(ctx, "init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
1289 if types.IsRestartableInitContainer(container) {
1290 klog.V(4).InfoS("Failed to start the restartable init container for the pod, skipping", "initContainerName", container.Name, "pod", klog.KObj(pod))
1291 continue
1292 }
1293 klog.V(4).InfoS("Failed to initialize the pod, as the init container failed to start, aborting", "initContainerName", container.Name, "pod", klog.KObj(pod))
1294 return
1295 }
1296
1297
1298 klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
1299 }
1300 }
1301
1302
1303 if isInPlacePodVerticalScalingAllowed(pod) {
1304 if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
1305 m.doPodResizeAction(pod, podStatus, podContainerChanges, result)
1306 }
1307 }
1308
1309
1310 for _, idx := range podContainerChanges.ContainersToStart {
1311 start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
1312 }
1313
1314 return
1315 }
1316
1317
1318
1319 func (m *kubeGenericRuntimeManager) doBackOff(pod *v1.Pod, container *v1.Container, podStatus *kubecontainer.PodStatus, backOff *flowcontrol.Backoff) (bool, string, error) {
1320 var cStatus *kubecontainer.Status
1321 for _, c := range podStatus.ContainerStatuses {
1322 if c.Name == container.Name && c.State == kubecontainer.ContainerStateExited {
1323 cStatus = c
1324 break
1325 }
1326 }
1327
1328 if cStatus == nil {
1329 return false, "", nil
1330 }
1331
1332 klog.V(3).InfoS("Checking backoff for container in pod", "containerName", container.Name, "pod", klog.KObj(pod))
1333
1334 ts := cStatus.FinishedAt
1335
1336 key := getStableKey(pod, container)
1337 if backOff.IsInBackOffSince(key, ts) {
1338 if containerRef, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {
1339 m.recorder.Eventf(containerRef, v1.EventTypeWarning, events.BackOffStartContainer,
1340 fmt.Sprintf("Back-off restarting failed container %s in pod %s", container.Name, format.Pod(pod)))
1341 }
1342 err := fmt.Errorf("back-off %s restarting failed container=%s pod=%s", backOff.Get(key), container.Name, format.Pod(pod))
1343 klog.V(3).InfoS("Back-off restarting failed container", "err", err.Error())
1344 return true, err.Error(), kubecontainer.ErrCrashLoopBackOff
1345 }
1346
1347 backOff.Next(key, ts)
1348 return false, "", nil
1349 }
1350
1351
1352
1353
1354
1355 func (m *kubeGenericRuntimeManager) KillPod(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error {
1356 err := m.killPodWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
1357 return err.Error()
1358 }
1359
1360
1361
1362 func (m *kubeGenericRuntimeManager) killPodWithSyncResult(ctx context.Context, pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
1363 killContainerResults := m.killContainersWithSyncResult(ctx, pod, runningPod, gracePeriodOverride)
1364 for _, containerResult := range killContainerResults {
1365 result.AddSyncResult(containerResult)
1366 }
1367
1368
1369 killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
1370 result.AddSyncResult(killSandboxResult)
1371
1372 for _, podSandbox := range runningPod.Sandboxes {
1373 if err := m.runtimeService.StopPodSandbox(ctx, podSandbox.ID.ID); err != nil && !crierror.IsNotFound(err) {
1374 killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
1375 klog.ErrorS(nil, "Failed to stop sandbox", "podSandboxID", podSandbox.ID)
1376 }
1377 }
1378
1379 return
1380 }
1381
1382 func (m *kubeGenericRuntimeManager) GeneratePodStatus(event *runtimeapi.ContainerEventResponse) (*kubecontainer.PodStatus, error) {
1383 podIPs := m.determinePodSandboxIPs(event.PodSandboxStatus.Metadata.Namespace, event.PodSandboxStatus.Metadata.Name, event.PodSandboxStatus)
1384
1385 kubeContainerStatuses := []*kubecontainer.Status{}
1386 for _, status := range event.ContainersStatuses {
1387 kubeContainerStatuses = append(kubeContainerStatuses, m.convertToKubeContainerStatus(status))
1388 }
1389
1390 sort.Sort(containerStatusByCreated(kubeContainerStatuses))
1391
1392 return &kubecontainer.PodStatus{
1393 ID: kubetypes.UID(event.PodSandboxStatus.Metadata.Uid),
1394 Name: event.PodSandboxStatus.Metadata.Name,
1395 Namespace: event.PodSandboxStatus.Metadata.Namespace,
1396 IPs: podIPs,
1397 SandboxStatuses: []*runtimeapi.PodSandboxStatus{event.PodSandboxStatus},
1398 ContainerStatuses: kubeContainerStatuses,
1399 }, nil
1400 }
1401
1402
1403
1404 func (m *kubeGenericRuntimeManager) GetPodStatus(ctx context.Context, uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418 podSandboxIDs, err := m.getSandboxIDByPodUID(ctx, uid, nil)
1419 if err != nil {
1420 return nil, err
1421 }
1422
1423 pod := &v1.Pod{
1424 ObjectMeta: metav1.ObjectMeta{
1425 Name: name,
1426 Namespace: namespace,
1427 UID: uid,
1428 },
1429 }
1430
1431 podFullName := format.Pod(pod)
1432
1433 klog.V(4).InfoS("getSandboxIDByPodUID got sandbox IDs for pod", "podSandboxID", podSandboxIDs, "pod", klog.KObj(pod))
1434
1435 sandboxStatuses := []*runtimeapi.PodSandboxStatus{}
1436 containerStatuses := []*kubecontainer.Status{}
1437 var timestamp time.Time
1438
1439 podIPs := []string{}
1440 for idx, podSandboxID := range podSandboxIDs {
1441 resp, err := m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false)
1442
1443
1444
1445
1446 if crierror.IsNotFound(err) {
1447 continue
1448 }
1449 if err != nil {
1450 klog.ErrorS(err, "PodSandboxStatus of sandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))
1451 return nil, err
1452 }
1453 if resp.GetStatus() == nil {
1454 return nil, errors.New("pod sandbox status is nil")
1455
1456 }
1457 sandboxStatuses = append(sandboxStatuses, resp.Status)
1458
1459 if idx == 0 && resp.Status.State == runtimeapi.PodSandboxState_SANDBOX_READY {
1460 podIPs = m.determinePodSandboxIPs(namespace, name, resp.Status)
1461 }
1462
1463 if idx == 0 && utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
1464 if resp.Timestamp == 0 {
1465
1466
1467
1468
1469
1470 klog.V(4).InfoS("Runtime does not set pod status timestamp", "pod", klog.KObj(pod))
1471 containerStatuses, err = m.getPodContainerStatuses(ctx, uid, name, namespace)
1472 if err != nil {
1473 if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
1474 klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
1475 }
1476 return nil, err
1477 }
1478 } else {
1479
1480
1481 timestamp = time.Unix(resp.Timestamp, 0)
1482 for _, cs := range resp.ContainersStatuses {
1483 cStatus := m.convertToKubeContainerStatus(cs)
1484 containerStatuses = append(containerStatuses, cStatus)
1485 }
1486 }
1487 }
1488 }
1489
1490 if !utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
1491
1492 containerStatuses, err = m.getPodContainerStatuses(ctx, uid, name, namespace)
1493 if err != nil {
1494 if m.logReduction.ShouldMessageBePrinted(err.Error(), podFullName) {
1495 klog.ErrorS(err, "getPodContainerStatuses for pod failed", "pod", klog.KObj(pod))
1496 }
1497 return nil, err
1498 }
1499 }
1500
1501 m.logReduction.ClearID(podFullName)
1502 return &kubecontainer.PodStatus{
1503 ID: uid,
1504 Name: name,
1505 Namespace: namespace,
1506 IPs: podIPs,
1507 SandboxStatuses: sandboxStatuses,
1508 ContainerStatuses: containerStatuses,
1509 TimeStamp: timestamp,
1510 }, nil
1511 }
1512
1513
1514 func (m *kubeGenericRuntimeManager) GarbageCollect(ctx context.Context, gcPolicy kubecontainer.GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error {
1515 return m.containerGC.GarbageCollect(ctx, gcPolicy, allSourcesReady, evictNonDeletedPods)
1516 }
1517
1518
1519
1520 func (m *kubeGenericRuntimeManager) UpdatePodCIDR(ctx context.Context, podCIDR string) error {
1521
1522
1523 klog.InfoS("Updating runtime config through cri with podcidr", "CIDR", podCIDR)
1524 return m.runtimeService.UpdateRuntimeConfig(ctx,
1525 &runtimeapi.RuntimeConfig{
1526 NetworkConfig: &runtimeapi.NetworkConfig{
1527 PodCidr: podCIDR,
1528 },
1529 })
1530 }
1531
1532 func (m *kubeGenericRuntimeManager) CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error {
1533 return m.runtimeService.CheckpointContainer(ctx, options)
1534 }
1535
1536 func (m *kubeGenericRuntimeManager) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) {
1537 return m.runtimeService.ListMetricDescriptors(ctx)
1538 }
1539
1540 func (m *kubeGenericRuntimeManager) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
1541 return m.runtimeService.ListPodSandboxMetrics(ctx)
1542 }
1543
View as plain text