...

Source file src/k8s.io/kubernetes/pkg/kubelet/prober/worker.go

Documentation: k8s.io/kubernetes/pkg/kubelet/prober

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package prober
    18  
    19  import (
    20  	"context"
    21  	"math/rand"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/util/runtime"
    26  	"k8s.io/component-base/metrics"
    27  	"k8s.io/klog/v2"
    28  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    29  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    30  	"k8s.io/kubernetes/pkg/kubelet/prober/results"
    31  )
    32  
    33  // worker handles the periodic probing of its assigned container. Each worker has a go-routine
    34  // associated with it which runs the probe loop until the container permanently terminates, or the
    35  // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date
    36  // container IDs.
    37  type worker struct {
    38  	// Channel for stopping the probe.
    39  	stopCh chan struct{}
    40  
    41  	// Channel for triggering the probe manually.
    42  	manualTriggerCh chan struct{}
    43  
    44  	// The pod containing this probe (read-only)
    45  	pod *v1.Pod
    46  
    47  	// The container to probe (read-only)
    48  	container v1.Container
    49  
    50  	// Describes the probe configuration (read-only)
    51  	spec *v1.Probe
    52  
    53  	// The type of the worker.
    54  	probeType probeType
    55  
    56  	// The probe value during the initial delay.
    57  	initialValue results.Result
    58  
    59  	// Where to store this workers results.
    60  	resultsManager results.Manager
    61  	probeManager   *manager
    62  
    63  	// The last known container ID for this worker.
    64  	containerID kubecontainer.ContainerID
    65  	// The last probe result for this worker.
    66  	lastResult results.Result
    67  	// How many times in a row the probe has returned the same result.
    68  	resultRun int
    69  
    70  	// If set, skip probing.
    71  	onHold bool
    72  
    73  	// proberResultsMetricLabels holds the labels attached to this worker
    74  	// for the ProberResults metric by result.
    75  	proberResultsSuccessfulMetricLabels metrics.Labels
    76  	proberResultsFailedMetricLabels     metrics.Labels
    77  	proberResultsUnknownMetricLabels    metrics.Labels
    78  	// proberDurationMetricLabels holds the labels attached to this worker
    79  	// for the ProberDuration metric by result.
    80  	proberDurationSuccessfulMetricLabels metrics.Labels
    81  	proberDurationUnknownMetricLabels    metrics.Labels
    82  }
    83  
    84  // Creates and starts a new probe worker.
    85  func newWorker(
    86  	m *manager,
    87  	probeType probeType,
    88  	pod *v1.Pod,
    89  	container v1.Container) *worker {
    90  
    91  	w := &worker{
    92  		stopCh:          make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
    93  		manualTriggerCh: make(chan struct{}, 1), // Buffer so prober_manager can do non-blocking calls to doProbe.
    94  		pod:             pod,
    95  		container:       container,
    96  		probeType:       probeType,
    97  		probeManager:    m,
    98  	}
    99  
   100  	switch probeType {
   101  	case readiness:
   102  		w.spec = container.ReadinessProbe
   103  		w.resultsManager = m.readinessManager
   104  		w.initialValue = results.Failure
   105  	case liveness:
   106  		w.spec = container.LivenessProbe
   107  		w.resultsManager = m.livenessManager
   108  		w.initialValue = results.Success
   109  	case startup:
   110  		w.spec = container.StartupProbe
   111  		w.resultsManager = m.startupManager
   112  		w.initialValue = results.Unknown
   113  	}
   114  
   115  	basicMetricLabels := metrics.Labels{
   116  		"probe_type": w.probeType.String(),
   117  		"container":  w.container.Name,
   118  		"pod":        w.pod.Name,
   119  		"namespace":  w.pod.Namespace,
   120  		"pod_uid":    string(w.pod.UID),
   121  	}
   122  
   123  	proberDurationLabels := metrics.Labels{
   124  		"probe_type": w.probeType.String(),
   125  		"container":  w.container.Name,
   126  		"pod":        w.pod.Name,
   127  		"namespace":  w.pod.Namespace,
   128  	}
   129  
   130  	w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
   131  	w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful
   132  
   133  	w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
   134  	w.proberResultsFailedMetricLabels["result"] = probeResultFailed
   135  
   136  	w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
   137  	w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown
   138  
   139  	w.proberDurationSuccessfulMetricLabels = deepCopyPrometheusLabels(proberDurationLabels)
   140  	w.proberDurationUnknownMetricLabels = deepCopyPrometheusLabels(proberDurationLabels)
   141  
   142  	return w
   143  }
   144  
   145  // run periodically probes the container.
   146  func (w *worker) run() {
   147  	ctx := context.Background()
   148  	probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
   149  
   150  	// If kubelet restarted the probes could be started in rapid succession.
   151  	// Let the worker wait for a random portion of tickerPeriod before probing.
   152  	// Do it only if the kubelet has started recently.
   153  	if probeTickerPeriod > time.Since(w.probeManager.start) {
   154  		time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
   155  	}
   156  
   157  	probeTicker := time.NewTicker(probeTickerPeriod)
   158  
   159  	defer func() {
   160  		// Clean up.
   161  		probeTicker.Stop()
   162  		if !w.containerID.IsEmpty() {
   163  			w.resultsManager.Remove(w.containerID)
   164  		}
   165  
   166  		w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
   167  		ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
   168  		ProberResults.Delete(w.proberResultsFailedMetricLabels)
   169  		ProberResults.Delete(w.proberResultsUnknownMetricLabels)
   170  		ProberDuration.Delete(w.proberDurationSuccessfulMetricLabels)
   171  		ProberDuration.Delete(w.proberDurationUnknownMetricLabels)
   172  	}()
   173  
   174  probeLoop:
   175  	for w.doProbe(ctx) {
   176  		// Wait for next probe tick.
   177  		select {
   178  		case <-w.stopCh:
   179  			break probeLoop
   180  		case <-probeTicker.C:
   181  		case <-w.manualTriggerCh:
   182  			// continue
   183  		}
   184  	}
   185  }
   186  
   187  // stop stops the probe worker. The worker handles cleanup and removes itself from its manager.
   188  // It is safe to call stop multiple times.
   189  func (w *worker) stop() {
   190  	select {
   191  	case w.stopCh <- struct{}{}:
   192  	default: // Non-blocking.
   193  	}
   194  }
   195  
   196  // doProbe probes the container once and records the result.
   197  // Returns whether the worker should continue.
   198  func (w *worker) doProbe(ctx context.Context) (keepGoing bool) {
   199  	defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
   200  	defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
   201  
   202  	startTime := time.Now()
   203  	status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
   204  	if !ok {
   205  		// Either the pod has not been created yet, or it was already deleted.
   206  		klog.V(3).InfoS("No status for pod", "pod", klog.KObj(w.pod))
   207  		return true
   208  	}
   209  
   210  	// Worker should terminate if pod is terminated.
   211  	if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
   212  		klog.V(3).InfoS("Pod is terminated, exiting probe worker",
   213  			"pod", klog.KObj(w.pod), "phase", status.Phase)
   214  		return false
   215  	}
   216  
   217  	c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
   218  	if !ok || len(c.ContainerID) == 0 {
   219  		c, ok = podutil.GetContainerStatus(status.InitContainerStatuses, w.container.Name)
   220  		if !ok || len(c.ContainerID) == 0 {
   221  			// Either the container has not been created yet, or it was deleted.
   222  			klog.V(3).InfoS("Probe target container not found",
   223  				"pod", klog.KObj(w.pod), "containerName", w.container.Name)
   224  			return true // Wait for more information.
   225  		}
   226  	}
   227  
   228  	if w.containerID.String() != c.ContainerID {
   229  		if !w.containerID.IsEmpty() {
   230  			w.resultsManager.Remove(w.containerID)
   231  		}
   232  		w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
   233  		w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
   234  		// We've got a new container; resume probing.
   235  		w.onHold = false
   236  	}
   237  
   238  	if w.onHold {
   239  		// Worker is on hold until there is a new container.
   240  		return true
   241  	}
   242  
   243  	if c.State.Running == nil {
   244  		klog.V(3).InfoS("Non-running container probed",
   245  			"pod", klog.KObj(w.pod), "containerName", w.container.Name)
   246  		if !w.containerID.IsEmpty() {
   247  			w.resultsManager.Set(w.containerID, results.Failure, w.pod)
   248  		}
   249  		// Abort if the container will not be restarted.
   250  		return c.State.Terminated == nil ||
   251  			w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
   252  	}
   253  
   254  	// Graceful shutdown of the pod.
   255  	if w.pod.ObjectMeta.DeletionTimestamp != nil && (w.probeType == liveness || w.probeType == startup) {
   256  		klog.V(3).InfoS("Pod deletion requested, setting probe result to success",
   257  			"probeType", w.probeType, "pod", klog.KObj(w.pod), "containerName", w.container.Name)
   258  		if w.probeType == startup {
   259  			klog.InfoS("Pod deletion requested before container has fully started",
   260  				"pod", klog.KObj(w.pod), "containerName", w.container.Name)
   261  		}
   262  		// Set a last result to ensure quiet shutdown.
   263  		w.resultsManager.Set(w.containerID, results.Success, w.pod)
   264  		// Stop probing at this point.
   265  		return false
   266  	}
   267  
   268  	// Probe disabled for InitialDelaySeconds.
   269  	if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
   270  		return true
   271  	}
   272  
   273  	if c.Started != nil && *c.Started {
   274  		// Stop probing for startup once container has started.
   275  		// we keep it running to make sure it will work for restarted container.
   276  		if w.probeType == startup {
   277  			return true
   278  		}
   279  	} else {
   280  		// Disable other probes until container has started.
   281  		if w.probeType != startup {
   282  			return true
   283  		}
   284  	}
   285  
   286  	// Note, exec probe does NOT have access to pod environment variables or downward API
   287  	result, err := w.probeManager.prober.probe(ctx, w.probeType, w.pod, status, w.container, w.containerID)
   288  	if err != nil {
   289  		// Prober error, throw away the result.
   290  		return true
   291  	}
   292  
   293  	switch result {
   294  	case results.Success:
   295  		ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
   296  		ProberDuration.With(w.proberDurationSuccessfulMetricLabels).Observe(time.Since(startTime).Seconds())
   297  	case results.Failure:
   298  		ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
   299  	default:
   300  		ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
   301  		ProberDuration.With(w.proberDurationUnknownMetricLabels).Observe(time.Since(startTime).Seconds())
   302  	}
   303  
   304  	if w.lastResult == result {
   305  		w.resultRun++
   306  	} else {
   307  		w.lastResult = result
   308  		w.resultRun = 1
   309  	}
   310  
   311  	if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
   312  		(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
   313  		// Success or failure is below threshold - leave the probe state unchanged.
   314  		return true
   315  	}
   316  
   317  	w.resultsManager.Set(w.containerID, result, w.pod)
   318  
   319  	if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
   320  		// The container fails a liveness/startup check, it will need to be restarted.
   321  		// Stop probing until we see a new container ID. This is to reduce the
   322  		// chance of hitting #21751, where running `docker exec` when a
   323  		// container is being stopped may lead to corrupted container state.
   324  		w.onHold = true
   325  		w.resultRun = 0
   326  	}
   327  
   328  	return true
   329  }
   330  
   331  func deepCopyPrometheusLabels(m metrics.Labels) metrics.Labels {
   332  	ret := make(metrics.Labels, len(m))
   333  	for k, v := range m {
   334  		ret[k] = v
   335  	}
   336  	return ret
   337  }
   338  

View as plain text