1
16
17 package prober
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/client-go/tools/record"
27 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
28 "k8s.io/kubernetes/pkg/kubelet/events"
29 "k8s.io/kubernetes/pkg/kubelet/prober/results"
30 "k8s.io/kubernetes/pkg/kubelet/util/format"
31 "k8s.io/kubernetes/pkg/probe"
32 execprobe "k8s.io/kubernetes/pkg/probe/exec"
33 grpcprobe "k8s.io/kubernetes/pkg/probe/grpc"
34 httpprobe "k8s.io/kubernetes/pkg/probe/http"
35 tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
36 "k8s.io/utils/exec"
37
38 "k8s.io/klog/v2"
39 )
40
41 const maxProbeRetries = 3
42
43
44 type prober struct {
45 exec execprobe.Prober
46 http httpprobe.Prober
47 tcp tcpprobe.Prober
48 grpc grpcprobe.Prober
49 runner kubecontainer.CommandRunner
50
51 recorder record.EventRecorder
52 }
53
54
55
56 func newProber(
57 runner kubecontainer.CommandRunner,
58 recorder record.EventRecorder) *prober {
59
60 const followNonLocalRedirects = false
61 return &prober{
62 exec: execprobe.New(),
63 http: httpprobe.New(followNonLocalRedirects),
64 tcp: tcpprobe.New(),
65 grpc: grpcprobe.New(),
66 runner: runner,
67 recorder: recorder,
68 }
69 }
70
71
72 func (pb *prober) recordContainerEvent(pod *v1.Pod, container *v1.Container, eventType, reason, message string, args ...interface{}) {
73 ref, err := kubecontainer.GenerateContainerRef(pod, container)
74 if err != nil {
75 klog.ErrorS(err, "Can't make a ref to pod and container", "pod", klog.KObj(pod), "containerName", container.Name)
76 return
77 }
78 pb.recorder.Eventf(ref, eventType, reason, message, args...)
79 }
80
81
82 func (pb *prober) probe(ctx context.Context, probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
83 var probeSpec *v1.Probe
84 switch probeType {
85 case readiness:
86 probeSpec = container.ReadinessProbe
87 case liveness:
88 probeSpec = container.LivenessProbe
89 case startup:
90 probeSpec = container.StartupProbe
91 default:
92 return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
93 }
94
95 if probeSpec == nil {
96 klog.InfoS("Probe is nil", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
97 return results.Success, nil
98 }
99
100 result, output, err := pb.runProbeWithRetries(ctx, probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
101 if err != nil || (result != probe.Success && result != probe.Warning) {
102
103 if err != nil {
104 klog.V(1).ErrorS(err, "Probe errored", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
105 pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
106 } else {
107 klog.V(1).InfoS("Probe failed", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "probeResult", result, "output", output)
108 pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
109 }
110 return results.Failure, err
111 }
112 if result == probe.Warning {
113 pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
114 klog.V(3).InfoS("Probe succeeded with a warning", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "output", output)
115 } else {
116 klog.V(3).InfoS("Probe succeeded", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
117 }
118 return results.Success, nil
119 }
120
121
122
123 func (pb *prober) runProbeWithRetries(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
124 var err error
125 var result probe.Result
126 var output string
127 for i := 0; i < retries; i++ {
128 result, output, err = pb.runProbe(ctx, probeType, p, pod, status, container, containerID)
129 if err == nil {
130 return result, output, nil
131 }
132 }
133 return result, output, err
134 }
135
136 func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
137 timeout := time.Duration(p.TimeoutSeconds) * time.Second
138 switch {
139 case p.Exec != nil:
140 klog.V(4).InfoS("Exec-Probe runProbe", "pod", klog.KObj(pod), "containerName", container.Name, "execCommand", p.Exec.Command)
141 command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
142 return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
143
144 case p.HTTPGet != nil:
145 req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
146 if err != nil {
147 return probe.Unknown, "", err
148 }
149 if klogV4 := klog.V(4); klogV4.Enabled() {
150 port := req.URL.Port()
151 host := req.URL.Hostname()
152 path := req.URL.Path
153 scheme := req.URL.Scheme
154 headers := p.HTTPGet.HTTPHeaders
155 klogV4.InfoS("HTTP-Probe", "scheme", scheme, "host", host, "port", port, "path", path, "timeout", timeout, "headers", headers)
156 }
157 return pb.http.Probe(req, timeout)
158
159 case p.TCPSocket != nil:
160 port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
161 if err != nil {
162 return probe.Unknown, "", err
163 }
164 host := p.TCPSocket.Host
165 if host == "" {
166 host = status.PodIP
167 }
168 klog.V(4).InfoS("TCP-Probe", "host", host, "port", port, "timeout", timeout)
169 return pb.tcp.Probe(host, port, timeout)
170
171 case p.GRPC != nil:
172 host := status.PodIP
173 service := ""
174 if p.GRPC.Service != nil {
175 service = *p.GRPC.Service
176 }
177 klog.V(4).InfoS("GRPC-Probe", "host", host, "service", service, "port", p.GRPC.Port, "timeout", timeout)
178 return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
179
180 default:
181 klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
182 return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
183 }
184 }
185
186 type execInContainer struct {
187
188
189 run func() ([]byte, error)
190 writer io.Writer
191 }
192
193 func (pb *prober) newExecInContainer(ctx context.Context, container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
194 return &execInContainer{run: func() ([]byte, error) {
195 return pb.runner.RunInContainer(ctx, containerID, cmd, timeout)
196 }}
197 }
198
199 func (eic *execInContainer) Run() error {
200 return nil
201 }
202
203 func (eic *execInContainer) CombinedOutput() ([]byte, error) {
204 return eic.run()
205 }
206
207 func (eic *execInContainer) Output() ([]byte, error) {
208 return nil, fmt.Errorf("unimplemented")
209 }
210
211 func (eic *execInContainer) SetDir(dir string) {
212
213 }
214
215 func (eic *execInContainer) SetStdin(in io.Reader) {
216
217 }
218
219 func (eic *execInContainer) SetStdout(out io.Writer) {
220 eic.writer = out
221 }
222
223 func (eic *execInContainer) SetStderr(out io.Writer) {
224 eic.writer = out
225 }
226
227 func (eic *execInContainer) SetEnv(env []string) {
228
229 }
230
231 func (eic *execInContainer) Stop() {
232
233 }
234
235 func (eic *execInContainer) Start() error {
236 data, err := eic.run()
237 if eic.writer != nil {
238
239 if p, err := eic.writer.Write(data); err != nil {
240 klog.ErrorS(err, "Unable to write all bytes from execInContainer", "expectedBytes", len(data), "actualBytes", p)
241 }
242 }
243 return err
244 }
245
246 func (eic *execInContainer) Wait() error {
247 return nil
248 }
249
250 func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
251 return nil, fmt.Errorf("unimplemented")
252 }
253
254 func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
255 return nil, fmt.Errorf("unimplemented")
256 }
257
View as plain text