
Source file src/k8s.io/kubernetes/pkg/kubelet/prober/prober_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/prober

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package prober
    19  import (
    20  	"sync"
    21  	"time"
    23  	v1 "k8s.io/api/core/v1"
    24  	"k8s.io/apimachinery/pkg/types"
    25  	"k8s.io/apimachinery/pkg/util/sets"
    26  	"k8s.io/client-go/tools/record"
    27  	"k8s.io/component-base/metrics"
    28  	"k8s.io/klog/v2"
    29  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    30  	"k8s.io/kubernetes/pkg/kubelet/prober/results"
    31  	"k8s.io/kubernetes/pkg/kubelet/status"
    32  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    33  	kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
    34  	"k8s.io/utils/clock"
    35  )
    37  // ProberResults stores the cumulative number of a probe by result as prometheus metrics.
    38  var ProberResults = metrics.NewCounterVec(
    39  	&metrics.CounterOpts{
    40  		Subsystem:      "prober",
    41  		Name:           "probe_total",
    42  		Help:           "Cumulative number of a liveness, readiness or startup probe for a container by result.",
    43  		StabilityLevel: metrics.ALPHA,
    44  	},
    45  	[]string{"probe_type",
    46  		"result",
    47  		"container",
    48  		"pod",
    49  		"namespace",
    50  		"pod_uid"},
    51  )
    53  // ProberDuration stores the duration of a successful probe lifecycle by result as prometheus metrics.
    54  var ProberDuration = metrics.NewHistogramVec(
    55  	&metrics.HistogramOpts{
    56  		Subsystem:      "prober",
    57  		Name:           "probe_duration_seconds",
    58  		Help:           "Duration in seconds for a probe response.",
    59  		StabilityLevel: metrics.ALPHA,
    60  	},
    61  	[]string{"probe_type",
    62  		"container",
    63  		"pod",
    64  		"namespace"},
    65  )
    67  // Manager manages pod probing. It creates a probe "worker" for every container that specifies a
    68  // probe (AddPod). The worker periodically probes its assigned container and caches the results. The
    69  // manager use the cached probe results to set the appropriate Ready state in the PodStatus when
    70  // requested (UpdatePodStatus). Updating probe parameters is not currently supported.
    71  type Manager interface {
    72  	// AddPod creates new probe workers for every container probe. This should be called for every
    73  	// pod created.
    74  	AddPod(pod *v1.Pod)
    76  	// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
    77  	StopLivenessAndStartup(pod *v1.Pod)
    79  	// RemovePod handles cleaning up the removed pod state, including terminating probe workers and
    80  	// deleting cached results.
    81  	RemovePod(pod *v1.Pod)
    83  	// CleanupPods handles cleaning up pods which should no longer be running.
    84  	// It takes a map of "desired pods" which should not be cleaned up.
    85  	CleanupPods(desiredPods map[types.UID]sets.Empty)
    87  	// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
    88  	// container based on container running status, cached probe results and worker states.
    89  	UpdatePodStatus(*v1.Pod, *v1.PodStatus)
    90  }
    92  type manager struct {
    93  	// Map of active workers for probes
    94  	workers map[probeKey]*worker
    95  	// Lock for accessing & mutating workers
    96  	workerLock sync.RWMutex
    98  	// The statusManager cache provides pod IP and container IDs for probing.
    99  	statusManager status.Manager
   101  	// readinessManager manages the results of readiness probes
   102  	readinessManager results.Manager
   104  	// livenessManager manages the results of liveness probes
   105  	livenessManager results.Manager
   107  	// startupManager manages the results of startup probes
   108  	startupManager results.Manager
   110  	// prober executes the probe actions.
   111  	prober *prober
   113  	start time.Time
   114  }
   116  // NewManager creates a Manager for pod probing.
   117  func NewManager(
   118  	statusManager status.Manager,
   119  	livenessManager results.Manager,
   120  	readinessManager results.Manager,
   121  	startupManager results.Manager,
   122  	runner kubecontainer.CommandRunner,
   123  	recorder record.EventRecorder) Manager {
   125  	prober := newProber(runner, recorder)
   126  	return &manager{
   127  		statusManager:    statusManager,
   128  		prober:           prober,
   129  		readinessManager: readinessManager,
   130  		livenessManager:  livenessManager,
   131  		startupManager:   startupManager,
   132  		workers:          make(map[probeKey]*worker),
   133  		start:            clock.RealClock{}.Now(),
   134  	}
   135  }
   137  // Key uniquely identifying container probes
   138  type probeKey struct {
   139  	podUID        types.UID
   140  	containerName string
   141  	probeType     probeType
   142  }
   144  // Type of probe (liveness, readiness or startup)
   145  type probeType int
   147  const (
   148  	liveness probeType = iota
   149  	readiness
   150  	startup
   152  	probeResultSuccessful string = "successful"
   153  	probeResultFailed     string = "failed"
   154  	probeResultUnknown    string = "unknown"
   155  )
   157  // For debugging.
   158  func (t probeType) String() string {
   159  	switch t {
   160  	case readiness:
   161  		return "Readiness"
   162  	case liveness:
   163  		return "Liveness"
   164  	case startup:
   165  		return "Startup"
   166  	default:
   167  		return "UNKNOWN"
   168  	}
   169  }
   171  func getRestartableInitContainers(pod *v1.Pod) []v1.Container {
   172  	var restartableInitContainers []v1.Container
   173  	for _, c := range pod.Spec.InitContainers {
   174  		if kubetypes.IsRestartableInitContainer(&c) {
   175  			restartableInitContainers = append(restartableInitContainers, c)
   176  		}
   177  	}
   178  	return restartableInitContainers
   179  }
   181  func (m *manager) AddPod(pod *v1.Pod) {
   182  	m.workerLock.Lock()
   183  	defer m.workerLock.Unlock()
   185  	key := probeKey{podUID: pod.UID}
   186  	for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
   187  		key.containerName = c.Name
   189  		if c.StartupProbe != nil {
   190  			key.probeType = startup
   191  			if _, ok := m.workers[key]; ok {
   192  				klog.V(8).ErrorS(nil, "Startup probe already exists for container",
   193  					"pod", klog.KObj(pod), "containerName", c.Name)
   194  				return
   195  			}
   196  			w := newWorker(m, startup, pod, c)
   197  			m.workers[key] = w
   198  			go w.run()
   199  		}
   201  		if c.ReadinessProbe != nil {
   202  			key.probeType = readiness
   203  			if _, ok := m.workers[key]; ok {
   204  				klog.V(8).ErrorS(nil, "Readiness probe already exists for container",
   205  					"pod", klog.KObj(pod), "containerName", c.Name)
   206  				return
   207  			}
   208  			w := newWorker(m, readiness, pod, c)
   209  			m.workers[key] = w
   210  			go w.run()
   211  		}
   213  		if c.LivenessProbe != nil {
   214  			key.probeType = liveness
   215  			if _, ok := m.workers[key]; ok {
   216  				klog.V(8).ErrorS(nil, "Liveness probe already exists for container",
   217  					"pod", klog.KObj(pod), "containerName", c.Name)
   218  				return
   219  			}
   220  			w := newWorker(m, liveness, pod, c)
   221  			m.workers[key] = w
   222  			go w.run()
   223  		}
   224  	}
   225  }
   227  func (m *manager) StopLivenessAndStartup(pod *v1.Pod) {
   228  	m.workerLock.RLock()
   229  	defer m.workerLock.RUnlock()
   231  	key := probeKey{podUID: pod.UID}
   232  	for _, c := range pod.Spec.Containers {
   233  		key.containerName = c.Name
   234  		for _, probeType := range [...]probeType{liveness, startup} {
   235  			key.probeType = probeType
   236  			if worker, ok := m.workers[key]; ok {
   237  				worker.stop()
   238  			}
   239  		}
   240  	}
   241  }
   243  func (m *manager) RemovePod(pod *v1.Pod) {
   244  	m.workerLock.RLock()
   245  	defer m.workerLock.RUnlock()
   247  	key := probeKey{podUID: pod.UID}
   248  	for _, c := range append(pod.Spec.Containers, getRestartableInitContainers(pod)...) {
   249  		key.containerName = c.Name
   250  		for _, probeType := range [...]probeType{readiness, liveness, startup} {
   251  			key.probeType = probeType
   252  			if worker, ok := m.workers[key]; ok {
   253  				worker.stop()
   254  			}
   255  		}
   256  	}
   257  }
   259  func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
   260  	m.workerLock.RLock()
   261  	defer m.workerLock.RUnlock()
   263  	for key, worker := range m.workers {
   264  		if _, ok := desiredPods[key.podUID]; !ok {
   265  			worker.stop()
   266  		}
   267  	}
   268  }
   270  func (m *manager) isContainerStarted(pod *v1.Pod, containerStatus *v1.ContainerStatus) bool {
   271  	if containerStatus.State.Running == nil {
   272  		return false
   273  	}
   275  	if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(containerStatus.ContainerID)); ok {
   276  		return result == results.Success
   277  	}
   279  	// if there is a startup probe which hasn't run yet, the container is not
   280  	// started.
   281  	if _, exists := m.getWorker(pod.UID, containerStatus.Name, startup); exists {
   282  		return false
   283  	}
   285  	// there is no startup probe, so the container is started.
   286  	return true
   287  }
   289  func (m *manager) UpdatePodStatus(pod *v1.Pod, podStatus *v1.PodStatus) {
   290  	for i, c := range podStatus.ContainerStatuses {
   291  		started := m.isContainerStarted(pod, &podStatus.ContainerStatuses[i])
   292  		podStatus.ContainerStatuses[i].Started = &started
   294  		if !started {
   295  			continue
   296  		}
   298  		var ready bool
   299  		if c.State.Running == nil {
   300  			ready = false
   301  		} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok && result == results.Success {
   302  			ready = true
   303  		} else {
   304  			// The check whether there is a probe which hasn't run yet.
   305  			w, exists := m.getWorker(pod.UID, c.Name, readiness)
   306  			ready = !exists // no readinessProbe -> always ready
   307  			if exists {
   308  				// Trigger an immediate run of the readinessProbe to update ready state
   309  				select {
   310  				case w.manualTriggerCh <- struct{}{}:
   311  				default: // Non-blocking.
   312  					klog.InfoS("Failed to trigger a manual run", "probe", w.probeType.String())
   313  				}
   314  			}
   315  		}
   316  		podStatus.ContainerStatuses[i].Ready = ready
   317  	}
   319  	for i, c := range podStatus.InitContainerStatuses {
   320  		started := m.isContainerStarted(pod, &podStatus.InitContainerStatuses[i])
   321  		podStatus.InitContainerStatuses[i].Started = &started
   323  		initContainer, ok := kubeutil.GetContainerByIndex(pod.Spec.InitContainers, podStatus.InitContainerStatuses, i)
   324  		if !ok {
   325  			klog.V(4).InfoS("Mismatch between pod spec and status, likely programmer error", "pod", klog.KObj(pod), "containerName", c.Name)
   326  			continue
   327  		}
   328  		if !kubetypes.IsRestartableInitContainer(&initContainer) {
   329  			if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
   330  				podStatus.InitContainerStatuses[i].Ready = true
   331  			}
   332  			continue
   333  		}
   335  		if !started {
   336  			continue
   337  		}
   339  		var ready bool
   340  		if c.State.Running == nil {
   341  			ready = false
   342  		} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok && result == results.Success {
   343  			ready = true
   344  		} else {
   345  			// The check whether there is a probe which hasn't run yet.
   346  			w, exists := m.getWorker(pod.UID, c.Name, readiness)
   347  			ready = !exists // no readinessProbe -> always ready
   348  			if exists {
   349  				// Trigger an immediate run of the readinessProbe to update ready state
   350  				select {
   351  				case w.manualTriggerCh <- struct{}{}:
   352  				default: // Non-blocking.
   353  					klog.InfoS("Failed to trigger a manual run", "probe", w.probeType.String())
   354  				}
   355  			}
   356  		}
   357  		podStatus.InitContainerStatuses[i].Ready = ready
   358  	}
   359  }
   361  func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
   362  	m.workerLock.RLock()
   363  	defer m.workerLock.RUnlock()
   364  	worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
   365  	return worker, ok
   366  }
   368  // Called by the worker after exiting.
   369  func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
   370  	m.workerLock.Lock()
   371  	defer m.workerLock.Unlock()
   372  	delete(m.workers, probeKey{podUID, containerName, probeType})
   373  }
   375  // workerCount returns the total number of probe workers. For testing.
   376  func (m *manager) workerCount() int {
   377  	m.workerLock.RLock()
   378  	defer m.workerLock.RUnlock()
   379  	return len(m.workers)
   380  }

View as plain text