...

Source file src/k8s.io/kubernetes/pkg/kubelet/prober/prober.go

Documentation: k8s.io/kubernetes/pkg/kubelet/prober

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package prober
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"io"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/client-go/tools/record"
    27  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    28  	"k8s.io/kubernetes/pkg/kubelet/events"
    29  	"k8s.io/kubernetes/pkg/kubelet/prober/results"
    30  	"k8s.io/kubernetes/pkg/kubelet/util/format"
    31  	"k8s.io/kubernetes/pkg/probe"
    32  	execprobe "k8s.io/kubernetes/pkg/probe/exec"
    33  	grpcprobe "k8s.io/kubernetes/pkg/probe/grpc"
    34  	httpprobe "k8s.io/kubernetes/pkg/probe/http"
    35  	tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
    36  	"k8s.io/utils/exec"
    37  
    38  	"k8s.io/klog/v2"
    39  )
    40  
    41  const maxProbeRetries = 3
    42  
    43  // Prober helps to check the liveness/readiness/startup of a container.
    44  type prober struct {
    45  	exec   execprobe.Prober
    46  	http   httpprobe.Prober
    47  	tcp    tcpprobe.Prober
    48  	grpc   grpcprobe.Prober
    49  	runner kubecontainer.CommandRunner
    50  
    51  	recorder record.EventRecorder
    52  }
    53  
    54  // NewProber creates a Prober, it takes a command runner and
    55  // several container info managers.
    56  func newProber(
    57  	runner kubecontainer.CommandRunner,
    58  	recorder record.EventRecorder) *prober {
    59  
    60  	const followNonLocalRedirects = false
    61  	return &prober{
    62  		exec:     execprobe.New(),
    63  		http:     httpprobe.New(followNonLocalRedirects),
    64  		tcp:      tcpprobe.New(),
    65  		grpc:     grpcprobe.New(),
    66  		runner:   runner,
    67  		recorder: recorder,
    68  	}
    69  }
    70  
    71  // recordContainerEvent should be used by the prober for all container related events.
    72  func (pb *prober) recordContainerEvent(pod *v1.Pod, container *v1.Container, eventType, reason, message string, args ...interface{}) {
    73  	ref, err := kubecontainer.GenerateContainerRef(pod, container)
    74  	if err != nil {
    75  		klog.ErrorS(err, "Can't make a ref to pod and container", "pod", klog.KObj(pod), "containerName", container.Name)
    76  		return
    77  	}
    78  	pb.recorder.Eventf(ref, eventType, reason, message, args...)
    79  }
    80  
    81  // probe probes the container.
    82  func (pb *prober) probe(ctx context.Context, probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
    83  	var probeSpec *v1.Probe
    84  	switch probeType {
    85  	case readiness:
    86  		probeSpec = container.ReadinessProbe
    87  	case liveness:
    88  		probeSpec = container.LivenessProbe
    89  	case startup:
    90  		probeSpec = container.StartupProbe
    91  	default:
    92  		return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
    93  	}
    94  
    95  	if probeSpec == nil {
    96  		klog.InfoS("Probe is nil", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
    97  		return results.Success, nil
    98  	}
    99  
   100  	result, output, err := pb.runProbeWithRetries(ctx, probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
   101  	if err != nil || (result != probe.Success && result != probe.Warning) {
   102  		// Probe failed in one way or another.
   103  		if err != nil {
   104  			klog.V(1).ErrorS(err, "Probe errored", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
   105  			pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
   106  		} else { // result != probe.Success
   107  			klog.V(1).InfoS("Probe failed", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "probeResult", result, "output", output)
   108  			pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
   109  		}
   110  		return results.Failure, err
   111  	}
   112  	if result == probe.Warning {
   113  		pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
   114  		klog.V(3).InfoS("Probe succeeded with a warning", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "output", output)
   115  	} else {
   116  		klog.V(3).InfoS("Probe succeeded", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
   117  	}
   118  	return results.Success, nil
   119  }
   120  
   121  // runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
   122  // if it never succeeds.
   123  func (pb *prober) runProbeWithRetries(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
   124  	var err error
   125  	var result probe.Result
   126  	var output string
   127  	for i := 0; i < retries; i++ {
   128  		result, output, err = pb.runProbe(ctx, probeType, p, pod, status, container, containerID)
   129  		if err == nil {
   130  			return result, output, nil
   131  		}
   132  	}
   133  	return result, output, err
   134  }
   135  
   136  func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
   137  	timeout := time.Duration(p.TimeoutSeconds) * time.Second
   138  	switch {
   139  	case p.Exec != nil:
   140  		klog.V(4).InfoS("Exec-Probe runProbe", "pod", klog.KObj(pod), "containerName", container.Name, "execCommand", p.Exec.Command)
   141  		command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
   142  		return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
   143  
   144  	case p.HTTPGet != nil:
   145  		req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
   146  		if err != nil {
   147  			return probe.Unknown, "", err
   148  		}
   149  		if klogV4 := klog.V(4); klogV4.Enabled() {
   150  			port := req.URL.Port()
   151  			host := req.URL.Hostname()
   152  			path := req.URL.Path
   153  			scheme := req.URL.Scheme
   154  			headers := p.HTTPGet.HTTPHeaders
   155  			klogV4.InfoS("HTTP-Probe", "scheme", scheme, "host", host, "port", port, "path", path, "timeout", timeout, "headers", headers)
   156  		}
   157  		return pb.http.Probe(req, timeout)
   158  
   159  	case p.TCPSocket != nil:
   160  		port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
   161  		if err != nil {
   162  			return probe.Unknown, "", err
   163  		}
   164  		host := p.TCPSocket.Host
   165  		if host == "" {
   166  			host = status.PodIP
   167  		}
   168  		klog.V(4).InfoS("TCP-Probe", "host", host, "port", port, "timeout", timeout)
   169  		return pb.tcp.Probe(host, port, timeout)
   170  
   171  	case p.GRPC != nil:
   172  		host := status.PodIP
   173  		service := ""
   174  		if p.GRPC.Service != nil {
   175  			service = *p.GRPC.Service
   176  		}
   177  		klog.V(4).InfoS("GRPC-Probe", "host", host, "service", service, "port", p.GRPC.Port, "timeout", timeout)
   178  		return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
   179  
   180  	default:
   181  		klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
   182  		return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
   183  	}
   184  }
   185  
   186  type execInContainer struct {
   187  	// run executes a command in a container. Combined stdout and stderr output is always returned. An
   188  	// error is returned if one occurred.
   189  	run    func() ([]byte, error)
   190  	writer io.Writer
   191  }
   192  
   193  func (pb *prober) newExecInContainer(ctx context.Context, container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
   194  	return &execInContainer{run: func() ([]byte, error) {
   195  		return pb.runner.RunInContainer(ctx, containerID, cmd, timeout)
   196  	}}
   197  }
   198  
   199  func (eic *execInContainer) Run() error {
   200  	return nil
   201  }
   202  
   203  func (eic *execInContainer) CombinedOutput() ([]byte, error) {
   204  	return eic.run()
   205  }
   206  
   207  func (eic *execInContainer) Output() ([]byte, error) {
   208  	return nil, fmt.Errorf("unimplemented")
   209  }
   210  
   211  func (eic *execInContainer) SetDir(dir string) {
   212  	// unimplemented
   213  }
   214  
   215  func (eic *execInContainer) SetStdin(in io.Reader) {
   216  	// unimplemented
   217  }
   218  
   219  func (eic *execInContainer) SetStdout(out io.Writer) {
   220  	eic.writer = out
   221  }
   222  
   223  func (eic *execInContainer) SetStderr(out io.Writer) {
   224  	eic.writer = out
   225  }
   226  
   227  func (eic *execInContainer) SetEnv(env []string) {
   228  	// unimplemented
   229  }
   230  
   231  func (eic *execInContainer) Stop() {
   232  	// unimplemented
   233  }
   234  
   235  func (eic *execInContainer) Start() error {
   236  	data, err := eic.run()
   237  	if eic.writer != nil {
   238  		// only record the write error, do not cover the command run error
   239  		if p, err := eic.writer.Write(data); err != nil {
   240  			klog.ErrorS(err, "Unable to write all bytes from execInContainer", "expectedBytes", len(data), "actualBytes", p)
   241  		}
   242  	}
   243  	return err
   244  }
   245  
   246  func (eic *execInContainer) Wait() error {
   247  	return nil
   248  }
   249  
   250  func (eic *execInContainer) StdoutPipe() (io.ReadCloser, error) {
   251  	return nil, fmt.Errorf("unimplemented")
   252  }
   253  
   254  func (eic *execInContainer) StderrPipe() (io.ReadCloser, error) {
   255  	return nil, fmt.Errorf("unimplemented")
   256  }
   257  

View as plain text