1
16
17 package prober
18
19 import (
20 "sync"
21 "time"
22
23 v1 "k8s.io/api/core/v1"
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/sets"
26 "k8s.io/client-go/tools/record"
27 "k8s.io/component-base/metrics"
28 "k8s.io/klog/v2"
29 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
30 "k8s.io/kubernetes/pkg/kubelet/prober/results"
31 "k8s.io/kubernetes/pkg/kubelet/status"
32 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
33 kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
34 "k8s.io/utils/clock"
35 )
36
37
38 var ProberResults = metrics.NewCounterVec(
39 &metrics.CounterOpts{
40 Subsystem: "prober",
41 Name: "probe_total",
42 Help: "Cumulative number of a liveness, readiness or startup probe for a container by result.",
43 StabilityLevel: metrics.ALPHA,
44 },
45 []string{"probe_type",
46 "result",
47 "container",
48 "pod",
49 "namespace",
50 "pod_uid"},
51 )
52
53
54 var ProberDuration = metrics.NewHistogramVec(
55 &metrics.HistogramOpts{
56 Subsystem: "prober",
57 Name: "probe_duration_seconds",
58 Help: "Duration in seconds for a probe response.",
59 StabilityLevel: metrics.ALPHA,
60 },
61 []string{"probe_type",
62 "container",
63 "pod",
64 "namespace"},
65 )
66
67
68
69
70
71 type Manager interface {
72
73
74 AddPod(pod *v1.Pod)
75
76
77 StopLivenessAndStartup(pod *v1.Pod)
78
79
80
81 RemovePod(pod *v1.Pod)
82
83
84
85 CleanupPods(desiredPods map[types.UID]sets.Empty)
86
87
88
89 UpdatePodStatus(*v1.Pod, *v1.PodStatus)
90 }
91
92 type manager struct {
93
94 workers map[probeKey]*worker
95
96 workerLock sync.RWMutex
97
98
99 statusManager status.Manager
100
101
102 readinessManager results.Manager
103
104
105 livenessManager results.Manager
106
107
108 startupManager results.Manager
109
110
111 prober *prober
112
113 start time.Time
114 }
115
116
117 func NewManager(
118 statusManager status.Manager,
119 livenessManager results.Manager,
120 readinessManager results.Manager,
121 startupManager results.Manager,
122 runner kubecontainer.CommandRunner,
123 recorder record.EventRecorder) Manager {
124
125 prober := newProber(runner, recorder)
126 return &manager{
127 statusManager: statusManager,
128 prober: prober,
129 readinessManager: readinessManager,
130 livenessManager: livenessManager,
131 startupManager: startupManager,
132 workers: make(map[probeKey]*worker),
133 start: clock.RealClock{}.Now(),
134 }
135 }
136
137
138 type probeKey struct {
139 podUID types.UID
140 containerName string
141 probeType probeType
142 }
143
144
145 type probeType int
146
147 const (
148 liveness probeType = iota
149 readiness
150 startup
151
152 probeResultSuccessful string = "successful"
153 probeResultFailed string = "failed"
154 probeResultUnknown string = "unknown"
155 )
156
157
158 func (t probeType) String() string {
159 switch t {
160 case readiness:
161 return "Readiness"
162 case liveness:
163 return "Liveness"
164 case startup:
165 return "Startup"
166 default:
167 return "UNKNOWN"
168 }
169 }
170
171 func getRestartableInitContainers(pod *v1.Pod) []v1.Container {
172 var restartableInitContainers []v1.Container
173 for _, c := range pod.Spec.InitContainers {
174 if kubetypes.IsRestartableInitContainer(&c) {
175 restartableInitContainers = append(restartableInitContainers, c)
176 }
177 }
178 return restartableInitContainers
179 }
180
181 func (m *manager) AddPod(pod *v1.Pod) {
182 m.workerLock.Lock()
183 defer m.workerLock.Unlock()
184
185 key := probeKey{podUID: pod.UID}
186 for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
187 key.containerName = c.Name
188
189 if c.StartupProbe != nil {
190 key.probeType = startup
191 if _, ok := m.workers[key]; ok {
192 klog.V(8).ErrorS(nil, "Startup probe already exists for container",
193 "pod", klog.KObj(pod), "containerName", c.Name)
194 return
195 }
196 w := newWorker(m, startup, pod, c)
197 m.workers[key] = w
198 go w.run()
199 }
200
201 if c.ReadinessProbe != nil {
202 key.probeType = readiness
203 if _, ok := m.workers[key]; ok {
204 klog.V(8).ErrorS(nil, "Readiness probe already exists for container",
205 "pod", klog.KObj(pod), "containerName", c.Name)
206 return
207 }
208 w := newWorker(m, readiness, pod, c)
209 m.workers[key] = w
210 go w.run()
211 }
212
213 if c.LivenessProbe != nil {
214 key.probeType = liveness
215 if _, ok := m.workers[key]; ok {
216 klog.V(8).ErrorS(nil, "Liveness probe already exists for container",
217 "pod", klog.KObj(pod), "containerName", c.Name)
218 return
219 }
220 w := newWorker(m, liveness, pod, c)
221 m.workers[key] = w
222 go w.run()
223 }
224 }
225 }
226
227 func (m *manager) StopLivenessAndStartup(pod *v1.Pod) {
228 m.workerLock.RLock()
229 defer m.workerLock.RUnlock()
230
231 key := probeKey{podUID: pod.UID}
232 for _, c := range pod.Spec.Containers {
233 key.containerName = c.Name
234 for _, probeType := range [...]probeType{liveness, startup} {
235 key.probeType = probeType
236 if worker, ok := m.workers[key]; ok {
237 worker.stop()
238 }
239 }
240 }
241 }
242
243 func (m *manager) RemovePod(pod *v1.Pod) {
244 m.workerLock.RLock()
245 defer m.workerLock.RUnlock()
246
247 key := probeKey{podUID: pod.UID}
248 for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
249 key.containerName = c.Name
250 for _, probeType := range [...]probeType{readiness, liveness, startup} {
251 key.probeType = probeType
252 if worker, ok := m.workers[key]; ok {
253 worker.stop()
254 }
255 }
256 }
257 }
258
259 func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
260 m.workerLock.RLock()
261 defer m.workerLock.RUnlock()
262
263 for key, worker := range m.workers {
264 if _, ok := desiredPods[key.podUID]; !ok {
265 worker.stop()
266 }
267 }
268 }
269
270 func (m *manager) isContainerStarted(pod *v1.Pod, containerStatus *v1.ContainerStatus) bool {
271 if containerStatus.State.Running == nil {
272 return false
273 }
274
275 if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(containerStatus.ContainerID)); ok {
276 return result == results.Success
277 }
278
279
280
281 if _, exists := m.getWorker(pod.UID, containerStatus.Name, startup); exists {
282 return false
283 }
284
285
286 return true
287 }
288
289 func (m *manager) UpdatePodStatus(pod *v1.Pod, podStatus *v1.PodStatus) {
290 for i, c := range podStatus.ContainerStatuses {
291 started := m.isContainerStarted(pod, &podStatus.ContainerStatuses[i])
292 podStatus.ContainerStatuses[i].Started = &started
293
294 if !started {
295 continue
296 }
297
298 var ready bool
299 if c.State.Running == nil {
300 ready = false
301 } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok && result == results.Success {
302 ready = true
303 } else {
304
305 w, exists := m.getWorker(pod.UID, c.Name, readiness)
306 ready = !exists
307 if exists {
308
309 select {
310 case w.manualTriggerCh <- struct{}{}:
311 default:
312 klog.InfoS("Failed to trigger a manual run", "probe", w.probeType.String())
313 }
314 }
315 }
316 podStatus.ContainerStatuses[i].Ready = ready
317 }
318
319 for i, c := range podStatus.InitContainerStatuses {
320 started := m.isContainerStarted(pod, &podStatus.InitContainerStatuses[i])
321 podStatus.InitContainerStatuses[i].Started = &started
322
323 initContainer, ok := kubeutil.GetContainerByIndex(pod.Spec.InitContainers, podStatus.InitContainerStatuses, i)
324 if !ok {
325 klog.V(4).InfoS("Mismatch between pod spec and status, likely programmer error", "pod", klog.KObj(pod), "containerName", c.Name)
326 continue
327 }
328 if !kubetypes.IsRestartableInitContainer(&initContainer) {
329 if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
330 podStatus.InitContainerStatuses[i].Ready = true
331 }
332 continue
333 }
334
335 if !started {
336 continue
337 }
338
339 var ready bool
340 if c.State.Running == nil {
341 ready = false
342 } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok && result == results.Success {
343 ready = true
344 } else {
345
346 w, exists := m.getWorker(pod.UID, c.Name, readiness)
347 ready = !exists
348 if exists {
349
350 select {
351 case w.manualTriggerCh <- struct{}{}:
352 default:
353 klog.InfoS("Failed to trigger a manual run", "probe", w.probeType.String())
354 }
355 }
356 }
357 podStatus.InitContainerStatuses[i].Ready = ready
358 }
359 }
360
361 func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
362 m.workerLock.RLock()
363 defer m.workerLock.RUnlock()
364 worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
365 return worker, ok
366 }
367
368
369 func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
370 m.workerLock.Lock()
371 defer m.workerLock.Unlock()
372 delete(m.workers, probeKey{podUID, containerName, probeType})
373 }
374
375
376 func (m *manager) workerCount() int {
377 m.workerLock.RLock()
378 defer m.workerLock.RUnlock()
379 return len(m.workers)
380 }
381
View as plain text