1
16
17 package kubelet
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/klog/v2"
27 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
28 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
29 "k8s.io/kubernetes/pkg/kubelet/util/format"
30 )
31
32 const (
33 runOnceManifestDelay = 1 * time.Second
34 runOnceMaxRetries = 10
35 runOnceRetryDelay = 1 * time.Second
36 runOnceRetryDelayBackoff = 2
37 )
38
39
40 type RunPodResult struct {
41 Pod *v1.Pod
42 Err error
43 }
44
45
46 func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
47 ctx := context.Background()
48
49 if err := kl.setupDataDirs(); err != nil {
50 return nil, err
51 }
52
53
54 if _, err := os.Stat(ContainerLogsDir); err != nil {
55 if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
56 klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
57 }
58 }
59
60 select {
61 case u := <-updates:
62 klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
63 result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
64 klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
65 return result, err
66 case <-time.After(runOnceManifestDelay):
67 return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
68 }
69 }
70
71
72 func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
73 ch := make(chan RunPodResult)
74 admitted := []*v1.Pod{}
75 for _, pod := range pods {
76
77 if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
78 kl.rejectPod(pod, reason, message)
79 results = append(results, RunPodResult{pod, nil})
80 continue
81 }
82
83 admitted = append(admitted, pod)
84 go func(pod *v1.Pod) {
85 err := kl.runPod(ctx, pod, retryDelay)
86 ch <- RunPodResult{pod, err}
87 }(pod)
88 }
89
90 klog.InfoS("Waiting for pods", "numPods", len(admitted))
91 failedPods := []string{}
92 for i := 0; i < len(admitted); i++ {
93 res := <-ch
94 results = append(results, res)
95 if res.Err != nil {
96 failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
97 if err != nil {
98 klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
99 } else {
100 klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
101 }
102 failedPods = append(failedPods, format.Pod(res.Pod))
103 } else {
104 klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
105 }
106 }
107 if len(failedPods) > 0 {
108 return results, fmt.Errorf("error running pods: %v", failedPods)
109 }
110 klog.InfoS("Pods started", "numPods", len(pods))
111 return results, err
112 }
113
114
115 func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Duration) error {
116 var isTerminal bool
117 delay := retryDelay
118 retry := 0
119 for !isTerminal {
120 status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
121 if err != nil {
122 return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
123 }
124
125 if kl.isPodRunning(pod, status) {
126 klog.InfoS("Pod's containers running", "pod", klog.KObj(pod))
127 return nil
128 }
129 klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
130
131 klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
132 if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
133 klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
134 }
135 mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
136 if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
137 return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
138 }
139 if retry >= runOnceMaxRetries {
140 return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries)
141 }
142
143 klog.InfoS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay)
144 time.Sleep(delay)
145 retry++
146 delay *= runOnceRetryDelayBackoff
147 }
148 return nil
149 }
150
151
152 func (kl *Kubelet) isPodRunning(pod *v1.Pod, status *kubecontainer.PodStatus) bool {
153 for _, c := range pod.Spec.Containers {
154 cs := status.FindContainerStatusByName(c.Name)
155 if cs == nil || cs.State != kubecontainer.ContainerStateRunning {
156 klog.InfoS("Container not running", "pod", klog.KObj(pod), "containerName", c.Name)
157 return false
158 }
159 }
160 return true
161 }
162
163
164 func (kl *Kubelet) getFailedContainers(ctx context.Context, pod *v1.Pod) ([]string, error) {
165 status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
166 if err != nil {
167 return nil, fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
168 }
169 var containerNames []string
170 for _, cs := range status.ContainerStatuses {
171 if cs.State != kubecontainer.ContainerStateRunning && cs.ExitCode != 0 {
172 containerNames = append(containerNames, cs.Name)
173 }
174 }
175 return containerNames, nil
176 }
177
View as plain text