
Source file src/k8s.io/kubernetes/pkg/kubelet/prober/prober_manager_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/prober

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package prober
    19  import (
    20  	"fmt"
    21  	"strconv"
    22  	"testing"
    23  	"time"
    25  	v1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    31  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    32  	"k8s.io/kubernetes/pkg/features"
    33  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    34  	"k8s.io/kubernetes/pkg/kubelet/prober/results"
    35  	"k8s.io/kubernetes/pkg/probe"
    36  )
    38  func init() {
    39  }
    41  var defaultProbe = &v1.Probe{
    42  	ProbeHandler: v1.ProbeHandler{
    43  		Exec: &v1.ExecAction{},
    44  	},
    45  	TimeoutSeconds:   1,
    46  	PeriodSeconds:    1,
    47  	SuccessThreshold: 1,
    48  	FailureThreshold: 3,
    49  }
    51  func TestAddRemovePods(t *testing.T) {
    52  	noProbePod := v1.Pod{
    53  		ObjectMeta: metav1.ObjectMeta{
    54  			UID: "no_probe_pod",
    55  		},
    56  		Spec: v1.PodSpec{
    57  			Containers: []v1.Container{{
    58  				Name: "no_probe1",
    59  			}, {
    60  				Name: "no_probe2",
    61  			}, {
    62  				Name: "no_probe3",
    63  			}},
    64  		},
    65  	}
    67  	probePod := v1.Pod{
    68  		ObjectMeta: metav1.ObjectMeta{
    69  			UID: "probe_pod",
    70  		},
    71  		Spec: v1.PodSpec{
    72  			Containers: []v1.Container{{
    73  				Name: "probe1",
    74  			}, {
    75  				Name:           "readiness",
    76  				ReadinessProbe: defaultProbe,
    77  			}, {
    78  				Name: "probe2",
    79  			}, {
    80  				Name:          "liveness",
    81  				LivenessProbe: defaultProbe,
    82  			}, {
    83  				Name: "probe3",
    84  			}, {
    85  				Name:         "startup",
    86  				StartupProbe: defaultProbe,
    87  			}},
    88  		},
    89  	}
    91  	m := newTestManager()
    92  	defer cleanup(t, m)
    93  	if err := expectProbes(m, nil); err != nil {
    94  		t.Error(err)
    95  	}
    97  	// Adding a pod with no probes should be a no-op.
    98  	m.AddPod(&noProbePod)
    99  	if err := expectProbes(m, nil); err != nil {
   100  		t.Error(err)
   101  	}
   103  	// Adding a pod with probes.
   104  	m.AddPod(&probePod)
   105  	probePaths := []probeKey{
   106  		{"probe_pod", "readiness", readiness},
   107  		{"probe_pod", "liveness", liveness},
   108  		{"probe_pod", "startup", startup},
   109  	}
   110  	if err := expectProbes(m, probePaths); err != nil {
   111  		t.Error(err)
   112  	}
   114  	// Removing un-probed pod.
   115  	m.RemovePod(&noProbePod)
   116  	if err := expectProbes(m, probePaths); err != nil {
   117  		t.Error(err)
   118  	}
   120  	// Removing probed pod.
   121  	m.RemovePod(&probePod)
   122  	if err := waitForWorkerExit(t, m, probePaths); err != nil {
   123  		t.Fatal(err)
   124  	}
   125  	if err := expectProbes(m, nil); err != nil {
   126  		t.Error(err)
   127  	}
   129  	// Removing already removed pods should be a no-op.
   130  	m.RemovePod(&probePod)
   131  	if err := expectProbes(m, nil); err != nil {
   132  		t.Error(err)
   133  	}
   134  }
   136  func TestAddRemovePodsWithRestartableInitContainer(t *testing.T) {
   137  	m := newTestManager()
   138  	defer cleanup(t, m)
   139  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, true)()
   140  	if err := expectProbes(m, nil); err != nil {
   141  		t.Error(err)
   142  	}
   144  	testCases := []struct {
   145  		desc                    string
   146  		probePaths              []probeKey
   147  		enableSidecarContainers bool
   148  	}{
   149  		{
   150  			desc:                    "pod with sidecar (no sidecar containers feature enabled)",
   151  			probePaths:              nil,
   152  			enableSidecarContainers: false,
   153  		},
   154  		{
   155  			desc: "pod with sidecar (sidecar containers feature enabled)",
   156  			probePaths: []probeKey{
   157  				{"restartable_init_container_pod", "restartable-init", liveness},
   158  				{"restartable_init_container_pod", "restartable-init", readiness},
   159  				{"restartable_init_container_pod", "restartable-init", startup},
   160  			},
   161  			enableSidecarContainers: true,
   162  		},
   163  	}
   165  	containerRestartPolicy := func(enableSidecarContainers bool) *v1.ContainerRestartPolicy {
   166  		if !enableSidecarContainers {
   167  			return nil
   168  		}
   169  		restartPolicy := v1.ContainerRestartPolicyAlways
   170  		return &restartPolicy
   171  	}
   173  	for _, tc := range testCases {
   174  		t.Run(tc.desc, func(t *testing.T) {
   175  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, tc.enableSidecarContainers)()
   177  			probePod := v1.Pod{
   178  				ObjectMeta: metav1.ObjectMeta{
   179  					UID: "restartable_init_container_pod",
   180  				},
   181  				Spec: v1.PodSpec{
   182  					InitContainers: []v1.Container{{
   183  						Name: "init",
   184  					}, {
   185  						Name:           "restartable-init",
   186  						LivenessProbe:  defaultProbe,
   187  						ReadinessProbe: defaultProbe,
   188  						StartupProbe:   defaultProbe,
   189  						RestartPolicy:  containerRestartPolicy(tc.enableSidecarContainers),
   190  					}},
   191  					Containers: []v1.Container{{
   192  						Name: "main",
   193  					}},
   194  				},
   195  			}
   197  			// Adding a pod with probes.
   198  			m.AddPod(&probePod)
   199  			if err := expectProbes(m, tc.probePaths); err != nil {
   200  				t.Error(err)
   201  			}
   203  			// Removing probed pod.
   204  			m.RemovePod(&probePod)
   205  			if err := waitForWorkerExit(t, m, tc.probePaths); err != nil {
   206  				t.Fatal(err)
   207  			}
   208  			if err := expectProbes(m, nil); err != nil {
   209  				t.Error(err)
   210  			}
   212  			// Removing already removed pods should be a no-op.
   213  			m.RemovePod(&probePod)
   214  			if err := expectProbes(m, nil); err != nil {
   215  				t.Error(err)
   216  			}
   217  		})
   218  	}
   219  }
   221  func TestCleanupPods(t *testing.T) {
   222  	m := newTestManager()
   223  	defer cleanup(t, m)
   224  	podToCleanup := v1.Pod{
   225  		ObjectMeta: metav1.ObjectMeta{
   226  			UID: "pod_cleanup",
   227  		},
   228  		Spec: v1.PodSpec{
   229  			Containers: []v1.Container{{
   230  				Name:           "prober1",
   231  				ReadinessProbe: defaultProbe,
   232  			}, {
   233  				Name:          "prober2",
   234  				LivenessProbe: defaultProbe,
   235  			}, {
   236  				Name:         "prober3",
   237  				StartupProbe: defaultProbe,
   238  			}},
   239  		},
   240  	}
   241  	podToKeep := v1.Pod{
   242  		ObjectMeta: metav1.ObjectMeta{
   243  			UID: "pod_keep",
   244  		},
   245  		Spec: v1.PodSpec{
   246  			Containers: []v1.Container{{
   247  				Name:           "prober1",
   248  				ReadinessProbe: defaultProbe,
   249  			}, {
   250  				Name:          "prober2",
   251  				LivenessProbe: defaultProbe,
   252  			}, {
   253  				Name:         "prober3",
   254  				StartupProbe: defaultProbe,
   255  			}},
   256  		},
   257  	}
   258  	m.AddPod(&podToCleanup)
   259  	m.AddPod(&podToKeep)
   261  	desiredPods := map[types.UID]sets.Empty{}
   262  	desiredPods[podToKeep.UID] = sets.Empty{}
   263  	m.CleanupPods(desiredPods)
   265  	removedProbes := []probeKey{
   266  		{"pod_cleanup", "prober1", readiness},
   267  		{"pod_cleanup", "prober2", liveness},
   268  		{"pod_cleanup", "prober3", startup},
   269  	}
   270  	expectedProbes := []probeKey{
   271  		{"pod_keep", "prober1", readiness},
   272  		{"pod_keep", "prober2", liveness},
   273  		{"pod_keep", "prober3", startup},
   274  	}
   275  	if err := waitForWorkerExit(t, m, removedProbes); err != nil {
   276  		t.Fatal(err)
   277  	}
   278  	if err := expectProbes(m, expectedProbes); err != nil {
   279  		t.Error(err)
   280  	}
   281  }
   283  func TestCleanupRepeated(t *testing.T) {
   284  	m := newTestManager()
   285  	defer cleanup(t, m)
   286  	podTemplate := v1.Pod{
   287  		Spec: v1.PodSpec{
   288  			Containers: []v1.Container{{
   289  				Name:           "prober1",
   290  				ReadinessProbe: defaultProbe,
   291  				LivenessProbe:  defaultProbe,
   292  				StartupProbe:   defaultProbe,
   293  			}},
   294  		},
   295  	}
   297  	const numTestPods = 100
   298  	for i := 0; i < numTestPods; i++ {
   299  		pod := podTemplate
   300  		pod.UID = types.UID(strconv.Itoa(i))
   301  		m.AddPod(&pod)
   302  	}
   304  	for i := 0; i < 10; i++ {
   305  		m.CleanupPods(map[types.UID]sets.Empty{})
   306  	}
   307  }
   309  func TestUpdatePodStatus(t *testing.T) {
   310  	unprobed := v1.ContainerStatus{
   311  		Name:        "unprobed_container",
   312  		ContainerID: "test://unprobed_container_id",
   313  		State: v1.ContainerState{
   314  			Running: &v1.ContainerStateRunning{},
   315  		},
   316  	}
   317  	probedReady := v1.ContainerStatus{
   318  		Name:        "probed_container_ready",
   319  		ContainerID: "test://probed_container_ready_id",
   320  		State: v1.ContainerState{
   321  			Running: &v1.ContainerStateRunning{},
   322  		},
   323  	}
   324  	probedPending := v1.ContainerStatus{
   325  		Name:        "probed_container_pending",
   326  		ContainerID: "test://probed_container_pending_id",
   327  		State: v1.ContainerState{
   328  			Running: &v1.ContainerStateRunning{},
   329  		},
   330  	}
   331  	probedUnready := v1.ContainerStatus{
   332  		Name:        "probed_container_unready",
   333  		ContainerID: "test://probed_container_unready_id",
   334  		State: v1.ContainerState{
   335  			Running: &v1.ContainerStateRunning{},
   336  		},
   337  	}
   338  	notStartedNoReadiness := v1.ContainerStatus{
   339  		Name:        "not_started_container_no_readiness",
   340  		ContainerID: "test://not_started_container_no_readiness_id",
   341  		State: v1.ContainerState{
   342  			Running: &v1.ContainerStateRunning{},
   343  		},
   344  	}
   345  	startedNoReadiness := v1.ContainerStatus{
   346  		Name:        "started_container_no_readiness",
   347  		ContainerID: "test://started_container_no_readiness_id",
   348  		State: v1.ContainerState{
   349  			Running: &v1.ContainerStateRunning{},
   350  		},
   351  	}
   352  	terminated := v1.ContainerStatus{
   353  		Name:        "terminated_container",
   354  		ContainerID: "test://terminated_container_id",
   355  		State: v1.ContainerState{
   356  			Terminated: &v1.ContainerStateTerminated{},
   357  		},
   358  	}
   359  	podStatus := v1.PodStatus{
   360  		Phase: v1.PodRunning,
   361  		ContainerStatuses: []v1.ContainerStatus{
   362  			unprobed, probedReady, probedPending, probedUnready, notStartedNoReadiness, startedNoReadiness, terminated,
   363  		},
   364  	}
   366  	m := newTestManager()
   367  	// no cleanup: using fake workers.
   369  	// Setup probe "workers" and cached results.
   370  	m.workers = map[probeKey]*worker{
   371  		{testPodUID, unprobed.Name, liveness}:             {},
   372  		{testPodUID, probedReady.Name, readiness}:         {},
   373  		{testPodUID, probedPending.Name, readiness}:       {},
   374  		{testPodUID, probedUnready.Name, readiness}:       {},
   375  		{testPodUID, notStartedNoReadiness.Name, startup}: {},
   376  		{testPodUID, startedNoReadiness.Name, startup}:    {},
   377  		{testPodUID, terminated.Name, readiness}:          {},
   378  	}
   379  	m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &v1.Pod{})
   380  	m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &v1.Pod{})
   381  	m.startupManager.Set(kubecontainer.ParseContainerID(startedNoReadiness.ContainerID), results.Success, &v1.Pod{})
   382  	m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &v1.Pod{})
   384  	m.UpdatePodStatus(&v1.Pod{
   385  		ObjectMeta: metav1.ObjectMeta{
   386  			UID: testPodUID,
   387  		},
   388  		Spec: v1.PodSpec{
   389  			Containers: []v1.Container{
   390  				{Name: unprobed.Name},
   391  				{Name: probedReady.Name},
   392  				{Name: probedPending.Name},
   393  				{Name: probedUnready.Name},
   394  				{Name: notStartedNoReadiness.Name},
   395  				{Name: startedNoReadiness.Name},
   396  				{Name: terminated.Name},
   397  			},
   398  		},
   399  	}, &podStatus)
   401  	expectedReadiness := map[probeKey]bool{
   402  		{testPodUID, unprobed.Name, readiness}:              true,
   403  		{testPodUID, probedReady.Name, readiness}:           true,
   404  		{testPodUID, probedPending.Name, readiness}:         false,
   405  		{testPodUID, probedUnready.Name, readiness}:         false,
   406  		{testPodUID, notStartedNoReadiness.Name, readiness}: false,
   407  		{testPodUID, startedNoReadiness.Name, readiness}:    true,
   408  		{testPodUID, terminated.Name, readiness}:            false,
   409  	}
   410  	for _, c := range podStatus.ContainerStatuses {
   411  		expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
   412  		if !ok {
   413  			t.Fatalf("Missing expectation for test case: %v", c.Name)
   414  		}
   415  		if expected != c.Ready {
   416  			t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
   417  				c.Name, expected, c.Ready)
   418  		}
   419  	}
   420  }
   422  func TestUpdatePodStatusWithInitContainers(t *testing.T) {
   423  	notStarted := v1.ContainerStatus{
   424  		Name:        "not_started_container",
   425  		ContainerID: "test://not_started_container_id",
   426  		State: v1.ContainerState{
   427  			Running: &v1.ContainerStateRunning{},
   428  		},
   429  	}
   430  	started := v1.ContainerStatus{
   431  		Name:        "started_container",
   432  		ContainerID: "test://started_container_id",
   433  		State: v1.ContainerState{
   434  			Running: &v1.ContainerStateRunning{},
   435  		},
   436  	}
   437  	terminated := v1.ContainerStatus{
   438  		Name:        "terminated_container",
   439  		ContainerID: "test://terminated_container_id",
   440  		State: v1.ContainerState{
   441  			Terminated: &v1.ContainerStateTerminated{},
   442  		},
   443  	}
   445  	m := newTestManager()
   446  	// no cleanup: using fake workers.
   448  	// Setup probe "workers" and cached results.
   449  	m.workers = map[probeKey]*worker{
   450  		{testPodUID, notStarted.Name, startup}: {},
   451  		{testPodUID, started.Name, startup}:    {},
   452  	}
   453  	m.startupManager.Set(kubecontainer.ParseContainerID(started.ContainerID), results.Success, &v1.Pod{})
   455  	testCases := []struct {
   456  		desc                    string
   457  		expectedStartup         map[probeKey]bool
   458  		expectedReadiness       map[probeKey]bool
   459  		enableSidecarContainers bool
   460  	}{
   461  		{
   462  			desc: "init containers",
   463  			expectedStartup: map[probeKey]bool{
   464  				{testPodUID, notStarted.Name, startup}: false,
   465  				{testPodUID, started.Name, startup}:    true,
   466  				{testPodUID, terminated.Name, startup}: false,
   467  			},
   468  			expectedReadiness: map[probeKey]bool{
   469  				{testPodUID, notStarted.Name, readiness}: false,
   470  				{testPodUID, started.Name, readiness}:    false,
   471  				{testPodUID, terminated.Name, readiness}: true,
   472  			},
   473  			enableSidecarContainers: false,
   474  		},
   475  		{
   476  			desc: "init container with SidecarContainers feature",
   477  			expectedStartup: map[probeKey]bool{
   478  				{testPodUID, notStarted.Name, startup}: false,
   479  				{testPodUID, started.Name, startup}:    true,
   480  				{testPodUID, terminated.Name, startup}: false,
   481  			},
   482  			expectedReadiness: map[probeKey]bool{
   483  				{testPodUID, notStarted.Name, readiness}: false,
   484  				{testPodUID, started.Name, readiness}:    true,
   485  				{testPodUID, terminated.Name, readiness}: false,
   486  			},
   487  			enableSidecarContainers: true,
   488  		},
   489  	}
   491  	containerRestartPolicy := func(enableSidecarContainers bool) *v1.ContainerRestartPolicy {
   492  		if !enableSidecarContainers {
   493  			return nil
   494  		}
   495  		restartPolicy := v1.ContainerRestartPolicyAlways
   496  		return &restartPolicy
   497  	}
   499  	for _, tc := range testCases {
   500  		t.Run(tc.desc, func(t *testing.T) {
   501  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, tc.enableSidecarContainers)()
   502  			podStatus := v1.PodStatus{
   503  				Phase: v1.PodRunning,
   504  				InitContainerStatuses: []v1.ContainerStatus{
   505  					notStarted, started, terminated,
   506  				},
   507  			}
   509  			m.UpdatePodStatus(&v1.Pod{
   510  				ObjectMeta: metav1.ObjectMeta{
   511  					UID: testPodUID,
   512  				},
   513  				Spec: v1.PodSpec{
   514  					InitContainers: []v1.Container{
   515  						{
   516  							Name:          notStarted.Name,
   517  							RestartPolicy: containerRestartPolicy(tc.enableSidecarContainers),
   518  						},
   519  						{
   520  							Name:          started.Name,
   521  							RestartPolicy: containerRestartPolicy(tc.enableSidecarContainers),
   522  						},
   523  						{
   524  							Name:          terminated.Name,
   525  							RestartPolicy: containerRestartPolicy(tc.enableSidecarContainers),
   526  						},
   527  					},
   528  				},
   529  			}, &podStatus)
   531  			for _, c := range podStatus.InitContainerStatuses {
   532  				{
   533  					expected, ok := tc.expectedStartup[probeKey{testPodUID, c.Name, startup}]
   534  					if !ok {
   535  						t.Fatalf("Missing expectation for test case: %v", c.Name)
   536  					}
   537  					if expected != *c.Started {
   538  						t.Errorf("Unexpected startup for container %v: Expected %v but got %v",
   539  							c.Name, expected, *c.Started)
   540  					}
   541  				}
   542  				{
   543  					expected, ok := tc.expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
   544  					if !ok {
   545  						t.Fatalf("Missing expectation for test case: %v", c.Name)
   546  					}
   547  					if expected != c.Ready {
   548  						t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
   549  							c.Name, expected, c.Ready)
   550  					}
   551  				}
   552  			}
   553  		})
   554  	}
   555  }
   557  func (m *manager) extractedReadinessHandling() {
   558  	update := <-m.readinessManager.Updates()
   559  	// This code corresponds to an extract from kubelet.syncLoopIteration()
   560  	ready := update.Result == results.Success
   561  	m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
   562  }
   564  func TestUpdateReadiness(t *testing.T) {
   565  	testPod := getTestPod()
   566  	setTestProbe(testPod, readiness, v1.Probe{})
   567  	m := newTestManager()
   568  	defer cleanup(t, m)
   570  	// Start syncing readiness without leaking goroutine.
   571  	stopCh := make(chan struct{})
   572  	go wait.Until(m.extractedReadinessHandling, 0, stopCh)
   573  	defer func() {
   574  		close(stopCh)
   575  		// Send an update to exit extractedReadinessHandling()
   576  		m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
   577  	}()
   579  	exec := syncExecProber{}
   580  	exec.set(probe.Success, nil)
   581  	m.prober.exec = &exec
   583  	m.statusManager.SetPodStatus(testPod, getTestRunningStatus())
   585  	m.AddPod(testPod)
   586  	probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
   587  	if err := expectProbes(m, probePaths); err != nil {
   588  		t.Error(err)
   589  	}
   591  	// Wait for ready status.
   592  	if err := waitForReadyStatus(t, m, true); err != nil {
   593  		t.Error(err)
   594  	}
   596  	// Prober fails.
   597  	exec.set(probe.Failure, nil)
   599  	// Wait for failed status.
   600  	if err := waitForReadyStatus(t, m, false); err != nil {
   601  		t.Error(err)
   602  	}
   603  }
   605  func expectProbes(m *manager, expectedProbes []probeKey) error {
   606  	m.workerLock.RLock()
   607  	defer m.workerLock.RUnlock()
   609  	var unexpected []probeKey
   610  	missing := make([]probeKey, len(expectedProbes))
   611  	copy(missing, expectedProbes)
   613  outer:
   614  	for probePath := range m.workers {
   615  		for i, expectedPath := range missing {
   616  			if probePath == expectedPath {
   617  				missing = append(missing[:i], missing[i+1:]...)
   618  				continue outer
   619  			}
   620  		}
   621  		unexpected = append(unexpected, probePath)
   622  	}
   624  	if len(missing) == 0 && len(unexpected) == 0 {
   625  		return nil // Yay!
   626  	}
   628  	return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
   629  }
   631  const interval = 1 * time.Second
   633  // Wait for the given workers to exit & clean up.
   634  func waitForWorkerExit(t *testing.T, m *manager, workerPaths []probeKey) error {
   635  	for _, w := range workerPaths {
   636  		condition := func() (bool, error) {
   637  			_, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
   638  			return !exists, nil
   639  		}
   640  		if exited, _ := condition(); exited {
   641  			continue // Already exited, no need to poll.
   642  		}
   643  		t.Logf("Polling %v", w)
   644  		if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
   645  			return err
   646  		}
   647  	}
   649  	return nil
   650  }
   652  // Wait for the given workers to exit & clean up.
   653  func waitForReadyStatus(t *testing.T, m *manager, ready bool) error {
   654  	condition := func() (bool, error) {
   655  		status, ok := m.statusManager.GetPodStatus(testPodUID)
   656  		if !ok {
   657  			return false, fmt.Errorf("status not found: %q", testPodUID)
   658  		}
   659  		if len(status.ContainerStatuses) != 1 {
   660  			return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses))
   661  		}
   662  		if status.ContainerStatuses[0].ContainerID != testContainerID.String() {
   663  			return false, fmt.Errorf("expected container %q, found %q",
   664  				testContainerID, status.ContainerStatuses[0].ContainerID)
   665  		}
   666  		return status.ContainerStatuses[0].Ready == ready, nil
   667  	}
   668  	t.Logf("Polling for ready state %v", ready)
   669  	if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
   670  		return err
   671  	}
   673  	return nil
   674  }
   676  // cleanup running probes to avoid leaking goroutines.
   677  func cleanup(t *testing.T, m *manager) {
   678  	m.CleanupPods(nil)
   680  	condition := func() (bool, error) {
   681  		workerCount := m.workerCount()
   682  		if workerCount > 0 {
   683  			t.Logf("Waiting for %d workers to exit...", workerCount)
   684  		}
   685  		return workerCount == 0, nil
   686  	}
   687  	if exited, _ := condition(); exited {
   688  		return // Already exited, no need to poll.
   689  	}
   690  	if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
   691  		t.Fatalf("Error during cleanup: %v", err)
   692  	}
   693  }

View as plain text