1
16
17 package prober
18
19 import (
20 "context"
21 "math/rand"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/util/runtime"
26 "k8s.io/component-base/metrics"
27 "k8s.io/klog/v2"
28 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
29 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
30 "k8s.io/kubernetes/pkg/kubelet/prober/results"
31 )
32
33
34
35
36
37 type worker struct {
38
39 stopCh chan struct{}
40
41
42 manualTriggerCh chan struct{}
43
44
45 pod *v1.Pod
46
47
48 container v1.Container
49
50
51 spec *v1.Probe
52
53
54 probeType probeType
55
56
57 initialValue results.Result
58
59
60 resultsManager results.Manager
61 probeManager *manager
62
63
64 containerID kubecontainer.ContainerID
65
66 lastResult results.Result
67
68 resultRun int
69
70
71 onHold bool
72
73
74
75 proberResultsSuccessfulMetricLabels metrics.Labels
76 proberResultsFailedMetricLabels metrics.Labels
77 proberResultsUnknownMetricLabels metrics.Labels
78
79
80 proberDurationSuccessfulMetricLabels metrics.Labels
81 proberDurationUnknownMetricLabels metrics.Labels
82 }
83
84
85 func newWorker(
86 m *manager,
87 probeType probeType,
88 pod *v1.Pod,
89 container v1.Container) *worker {
90
91 w := &worker{
92 stopCh: make(chan struct{}, 1),
93 manualTriggerCh: make(chan struct{}, 1),
94 pod: pod,
95 container: container,
96 probeType: probeType,
97 probeManager: m,
98 }
99
100 switch probeType {
101 case readiness:
102 w.spec = container.ReadinessProbe
103 w.resultsManager = m.readinessManager
104 w.initialValue = results.Failure
105 case liveness:
106 w.spec = container.LivenessProbe
107 w.resultsManager = m.livenessManager
108 w.initialValue = results.Success
109 case startup:
110 w.spec = container.StartupProbe
111 w.resultsManager = m.startupManager
112 w.initialValue = results.Unknown
113 }
114
115 basicMetricLabels := metrics.Labels{
116 "probe_type": w.probeType.String(),
117 "container": w.container.Name,
118 "pod": w.pod.Name,
119 "namespace": w.pod.Namespace,
120 "pod_uid": string(w.pod.UID),
121 }
122
123 proberDurationLabels := metrics.Labels{
124 "probe_type": w.probeType.String(),
125 "container": w.container.Name,
126 "pod": w.pod.Name,
127 "namespace": w.pod.Namespace,
128 }
129
130 w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
131 w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful
132
133 w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
134 w.proberResultsFailedMetricLabels["result"] = probeResultFailed
135
136 w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
137 w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown
138
139 w.proberDurationSuccessfulMetricLabels = deepCopyPrometheusLabels(proberDurationLabels)
140 w.proberDurationUnknownMetricLabels = deepCopyPrometheusLabels(proberDurationLabels)
141
142 return w
143 }
144
145
146 func (w *worker) run() {
147 ctx := context.Background()
148 probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
149
150
151
152
153 if probeTickerPeriod > time.Since(w.probeManager.start) {
154 time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
155 }
156
157 probeTicker := time.NewTicker(probeTickerPeriod)
158
159 defer func() {
160
161 probeTicker.Stop()
162 if !w.containerID.IsEmpty() {
163 w.resultsManager.Remove(w.containerID)
164 }
165
166 w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
167 ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
168 ProberResults.Delete(w.proberResultsFailedMetricLabels)
169 ProberResults.Delete(w.proberResultsUnknownMetricLabels)
170 ProberDuration.Delete(w.proberDurationSuccessfulMetricLabels)
171 ProberDuration.Delete(w.proberDurationUnknownMetricLabels)
172 }()
173
174 probeLoop:
175 for w.doProbe(ctx) {
176
177 select {
178 case <-w.stopCh:
179 break probeLoop
180 case <-probeTicker.C:
181 case <-w.manualTriggerCh:
182
183 }
184 }
185 }
186
187
188
189 func (w *worker) stop() {
190 select {
191 case w.stopCh <- struct{}{}:
192 default:
193 }
194 }
195
196
197
198 func (w *worker) doProbe(ctx context.Context) (keepGoing bool) {
199 defer func() { recover() }()
200 defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
201
202 startTime := time.Now()
203 status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
204 if !ok {
205
206 klog.V(3).InfoS("No status for pod", "pod", klog.KObj(w.pod))
207 return true
208 }
209
210
211 if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
212 klog.V(3).InfoS("Pod is terminated, exiting probe worker",
213 "pod", klog.KObj(w.pod), "phase", status.Phase)
214 return false
215 }
216
217 c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
218 if !ok || len(c.ContainerID) == 0 {
219 c, ok = podutil.GetContainerStatus(status.InitContainerStatuses, w.container.Name)
220 if !ok || len(c.ContainerID) == 0 {
221
222 klog.V(3).InfoS("Probe target container not found",
223 "pod", klog.KObj(w.pod), "containerName", w.container.Name)
224 return true
225 }
226 }
227
228 if w.containerID.String() != c.ContainerID {
229 if !w.containerID.IsEmpty() {
230 w.resultsManager.Remove(w.containerID)
231 }
232 w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
233 w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
234
235 w.onHold = false
236 }
237
238 if w.onHold {
239
240 return true
241 }
242
243 if c.State.Running == nil {
244 klog.V(3).InfoS("Non-running container probed",
245 "pod", klog.KObj(w.pod), "containerName", w.container.Name)
246 if !w.containerID.IsEmpty() {
247 w.resultsManager.Set(w.containerID, results.Failure, w.pod)
248 }
249
250 return c.State.Terminated == nil ||
251 w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
252 }
253
254
255 if w.pod.ObjectMeta.DeletionTimestamp != nil && (w.probeType == liveness || w.probeType == startup) {
256 klog.V(3).InfoS("Pod deletion requested, setting probe result to success",
257 "probeType", w.probeType, "pod", klog.KObj(w.pod), "containerName", w.container.Name)
258 if w.probeType == startup {
259 klog.InfoS("Pod deletion requested before container has fully started",
260 "pod", klog.KObj(w.pod), "containerName", w.container.Name)
261 }
262
263 w.resultsManager.Set(w.containerID, results.Success, w.pod)
264
265 return false
266 }
267
268
269 if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
270 return true
271 }
272
273 if c.Started != nil && *c.Started {
274
275
276 if w.probeType == startup {
277 return true
278 }
279 } else {
280
281 if w.probeType != startup {
282 return true
283 }
284 }
285
286
287 result, err := w.probeManager.prober.probe(ctx, w.probeType, w.pod, status, w.container, w.containerID)
288 if err != nil {
289
290 return true
291 }
292
293 switch result {
294 case results.Success:
295 ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
296 ProberDuration.With(w.proberDurationSuccessfulMetricLabels).Observe(time.Since(startTime).Seconds())
297 case results.Failure:
298 ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
299 default:
300 ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
301 ProberDuration.With(w.proberDurationUnknownMetricLabels).Observe(time.Since(startTime).Seconds())
302 }
303
304 if w.lastResult == result {
305 w.resultRun++
306 } else {
307 w.lastResult = result
308 w.resultRun = 1
309 }
310
311 if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
312 (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
313
314 return true
315 }
316
317 w.resultsManager.Set(w.containerID, result, w.pod)
318
319 if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
320
321
322
323
324 w.onHold = true
325 w.resultRun = 0
326 }
327
328 return true
329 }
330
331 func deepCopyPrometheusLabels(m metrics.Labels) metrics.Labels {
332 ret := make(metrics.Labels, len(m))
333 for k, v := range m {
334 ret[k] = v
335 }
336 return ret
337 }
338
View as plain text