...

Source file src/k8s.io/kubernetes/pkg/kubelet/prober/prober_manager_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/prober

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package prober
    18  
    19  import (
    20  	"fmt"
    21  	"strconv"
    22  	"testing"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    31  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    32  	"k8s.io/kubernetes/pkg/features"
    33  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    34  	"k8s.io/kubernetes/pkg/kubelet/prober/results"
    35  	"k8s.io/kubernetes/pkg/probe"
    36  )
    37  
    38  func init() {
    39  }
    40  
    41  var defaultProbe = &v1.Probe{
    42  	ProbeHandler: v1.ProbeHandler{
    43  		Exec: &v1.ExecAction{},
    44  	},
    45  	TimeoutSeconds:   1,
    46  	PeriodSeconds:    1,
    47  	SuccessThreshold: 1,
    48  	FailureThreshold: 3,
    49  }
    50  
    51  func TestAddRemovePods(t *testing.T) {
    52  	noProbePod := v1.Pod{
    53  		ObjectMeta: metav1.ObjectMeta{
    54  			UID: "no_probe_pod",
    55  		},
    56  		Spec: v1.PodSpec{
    57  			Containers: []v1.Container{{
    58  				Name: "no_probe1",
    59  			}, {
    60  				Name: "no_probe2",
    61  			}, {
    62  				Name: "no_probe3",
    63  			}},
    64  		},
    65  	}
    66  
    67  	probePod := v1.Pod{
    68  		ObjectMeta: metav1.ObjectMeta{
    69  			UID: "probe_pod",
    70  		},
    71  		Spec: v1.PodSpec{
    72  			Containers: []v1.Container{{
    73  				Name: "probe1",
    74  			}, {
    75  				Name:           "readiness",
    76  				ReadinessProbe: defaultProbe,
    77  			}, {
    78  				Name: "probe2",
    79  			}, {
    80  				Name:          "liveness",
    81  				LivenessProbe: defaultProbe,
    82  			}, {
    83  				Name: "probe3",
    84  			}, {
    85  				Name:         "startup",
    86  				StartupProbe: defaultProbe,
    87  			}},
    88  		},
    89  	}
    90  
    91  	m := newTestManager()
    92  	defer cleanup(t, m)
    93  	if err := expectProbes(m, nil); err != nil {
    94  		t.Error(err)
    95  	}
    96  
    97  	// Adding a pod with no probes should be a no-op.
    98  	m.AddPod(&noProbePod)
    99  	if err := expectProbes(m, nil); err != nil {
   100  		t.Error(err)
   101  	}
   102  
   103  	// Adding a pod with probes.
   104  	m.AddPod(&probePod)
   105  	probePaths := []probeKey{
   106  		{"probe_pod", "readiness", readiness},
   107  		{"probe_pod", "liveness", liveness},
   108  		{"probe_pod", "startup", startup},
   109  	}
   110  	if err := expectProbes(m, probePaths); err != nil {
   111  		t.Error(err)
   112  	}
   113  
   114  	// Removing un-probed pod.
   115  	m.RemovePod(&noProbePod)
   116  	if err := expectProbes(m, probePaths); err != nil {
   117  		t.Error(err)
   118  	}
   119  
   120  	// Removing probed pod.
   121  	m.RemovePod(&probePod)
   122  	if err := waitForWorkerExit(t, m, probePaths); err != nil {
   123  		t.Fatal(err)
   124  	}
   125  	if err := expectProbes(m, nil); err != nil {
   126  		t.Error(err)
   127  	}
   128  
   129  	// Removing already removed pods should be a no-op.
   130  	m.RemovePod(&probePod)
   131  	if err := expectProbes(m, nil); err != nil {
   132  		t.Error(err)
   133  	}
   134  }
   135  
   136  func TestAddRemovePodsWithRestartableInitContainer(t *testing.T) {
   137  	m := newTestManager()
   138  	defer cleanup(t, m)
   139  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, true)()
   140  	if err := expectProbes(m, nil); err != nil {
   141  		t.Error(err)
   142  	}
   143  
   144  	testCases := []struct {
   145  		desc                    string
   146  		probePaths              []probeKey
   147  		enableSidecarContainers bool
   148  	}{
   149  		{
   150  			desc:                    "pod with sidecar (no sidecar containers feature enabled)",
   151  			probePaths:              nil,
   152  			enableSidecarContainers: false,
   153  		},
   154  		{
   155  			desc: "pod with sidecar (sidecar containers feature enabled)",
   156  			probePaths: []probeKey{
   157  				{"restartable_init_container_pod", "restartable-init", liveness},
   158  				{"restartable_init_container_pod", "restartable-init", readiness},
   159  				{"restartable_init_container_pod", "restartable-init", startup},
   160  			},
   161  			enableSidecarContainers: true,
   162  		},
   163  	}
   164  
   165  	containerRestartPolicy := func(enableSidecarContainers bool) *v1.ContainerRestartPolicy {
   166  		if !enableSidecarContainers {
   167  			return nil
   168  		}
   169  		restartPolicy := v1.ContainerRestartPolicyAlways
   170  		return &restartPolicy
   171  	}
   172  
   173  	for _, tc := range testCases {
   174  		t.Run(tc.desc, func(t *testing.T) {
   175  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, tc.enableSidecarContainers)()
   176  
   177  			probePod := v1.Pod{
   178  				ObjectMeta: metav1.ObjectMeta{
   179  					UID: "restartable_init_container_pod",
   180  				},
   181  				Spec: v1.PodSpec{
   182  					InitContainers: []v1.Container{{
   183  						Name: "init",
   184  					}, {
   185  						Name:           "restartable-init",
   186  						LivenessProbe:  defaultProbe,
   187  						ReadinessProbe: defaultProbe,
   188  						StartupProbe:   defaultProbe,
   189  						RestartPolicy:  containerRestartPolicy(tc.enableSidecarContainers),
   190  					}},
   191  					Containers: []v1.Container{{
   192  						Name: "main",
   193  					}},
   194  				},
   195  			}
   196  
   197  			// Adding a pod with probes.
   198  			m.AddPod(&probePod)
   199  			if err := expectProbes(m, tc.probePaths); err != nil {
   200  				t.Error(err)
   201  			}
   202  
   203  			// Removing probed pod.
   204  			m.RemovePod(&probePod)
   205  			if err := waitForWorkerExit(t, m, tc.probePaths); err != nil {
   206  				t.Fatal(err)
   207  			}
   208  			if err := expectProbes(m, nil); err != nil {
   209  				t.Error(err)
   210  			}
   211  
   212  			// Removing already removed pods should be a no-op.
   213  			m.RemovePod(&probePod)
   214  			if err := expectProbes(m, nil); err != nil {
   215  				t.Error(err)
   216  			}
   217  		})
   218  	}
   219  }
   220  
   221  func TestCleanupPods(t *testing.T) {
   222  	m := newTestManager()
   223  	defer cleanup(t, m)
   224  	podToCleanup := v1.Pod{
   225  		ObjectMeta: metav1.ObjectMeta{
   226  			UID: "pod_cleanup",
   227  		},
   228  		Spec: v1.PodSpec{
   229  			Containers: []v1.Container{{
   230  				Name:           "prober1",
   231  				ReadinessProbe: defaultProbe,
   232  			}, {
   233  				Name:          "prober2",
   234  				LivenessProbe: defaultProbe,
   235  			}, {
   236  				Name:         "prober3",
   237  				StartupProbe: defaultProbe,
   238  			}},
   239  		},
   240  	}
   241  	podToKeep := v1.Pod{
   242  		ObjectMeta: metav1.ObjectMeta{
   243  			UID: "pod_keep",
   244  		},
   245  		Spec: v1.PodSpec{
   246  			Containers: []v1.Container{{
   247  				Name:           "prober1",
   248  				ReadinessProbe: defaultProbe,
   249  			}, {
   250  				Name:          "prober2",
   251  				LivenessProbe: defaultProbe,
   252  			}, {
   253  				Name:         "prober3",
   254  				StartupProbe: defaultProbe,
   255  			}},
   256  		},
   257  	}
   258  	m.AddPod(&podToCleanup)
   259  	m.AddPod(&podToKeep)
   260  
   261  	desiredPods := map[types.UID]sets.Empty{}
   262  	desiredPods[podToKeep.UID] = sets.Empty{}
   263  	m.CleanupPods(desiredPods)
   264  
   265  	removedProbes := []probeKey{
   266  		{"pod_cleanup", "prober1", readiness},
   267  		{"pod_cleanup", "prober2", liveness},
   268  		{"pod_cleanup", "prober3", startup},
   269  	}
   270  	expectedProbes := []probeKey{
   271  		{"pod_keep", "prober1", readiness},
   272  		{"pod_keep", "prober2", liveness},
   273  		{"pod_keep", "prober3", startup},
   274  	}
   275  	if err := waitForWorkerExit(t, m, removedProbes); err != nil {
   276  		t.Fatal(err)
   277  	}
   278  	if err := expectProbes(m, expectedProbes); err != nil {
   279  		t.Error(err)
   280  	}
   281  }
   282  
   283  func TestCleanupRepeated(t *testing.T) {
   284  	m := newTestManager()
   285  	defer cleanup(t, m)
   286  	podTemplate := v1.Pod{
   287  		Spec: v1.PodSpec{
   288  			Containers: []v1.Container{{
   289  				Name:           "prober1",
   290  				ReadinessProbe: defaultProbe,
   291  				LivenessProbe:  defaultProbe,
   292  				StartupProbe:   defaultProbe,
   293  			}},
   294  		},
   295  	}
   296  
   297  	const numTestPods = 100
   298  	for i := 0; i < numTestPods; i++ {
   299  		pod := podTemplate
   300  		pod.UID = types.UID(strconv.Itoa(i))
   301  		m.AddPod(&pod)
   302  	}
   303  
   304  	for i := 0; i < 10; i++ {
   305  		m.CleanupPods(map[types.UID]sets.Empty{})
   306  	}
   307  }
   308  
   309  func TestUpdatePodStatus(t *testing.T) {
   310  	unprobed := v1.ContainerStatus{
   311  		Name:        "unprobed_container",
   312  		ContainerID: "test://unprobed_container_id",
   313  		State: v1.ContainerState{
   314  			Running: &v1.ContainerStateRunning{},
   315  		},
   316  	}
   317  	probedReady := v1.ContainerStatus{
   318  		Name:        "probed_container_ready",
   319  		ContainerID: "test://probed_container_ready_id",
   320  		State: v1.ContainerState{
   321  			Running: &v1.ContainerStateRunning{},
   322  		},
   323  	}
   324  	probedPending := v1.ContainerStatus{
   325  		Name:        "probed_container_pending",
   326  		ContainerID: "test://probed_container_pending_id",
   327  		State: v1.ContainerState{
   328  			Running: &v1.ContainerStateRunning{},
   329  		},
   330  	}
   331  	probedUnready := v1.ContainerStatus{
   332  		Name:        "probed_container_unready",
   333  		ContainerID: "test://probed_container_unready_id",
   334  		State: v1.ContainerState{
   335  			Running: &v1.ContainerStateRunning{},
   336  		},
   337  	}
   338  	notStartedNoReadiness := v1.ContainerStatus{
   339  		Name:        "not_started_container_no_readiness",
   340  		ContainerID: "test://not_started_container_no_readiness_id",
   341  		State: v1.ContainerState{
   342  			Running: &v1.ContainerStateRunning{},
   343  		},
   344  	}
   345  	startedNoReadiness := v1.ContainerStatus{
   346  		Name:        "started_container_no_readiness",
   347  		ContainerID: "test://started_container_no_readiness_id",
   348  		State: v1.ContainerState{
   349  			Running: &v1.ContainerStateRunning{},
   350  		},
   351  	}
   352  	terminated := v1.ContainerStatus{
   353  		Name:        "terminated_container",
   354  		ContainerID: "test://terminated_container_id",
   355  		State: v1.ContainerState{
   356  			Terminated: &v1.ContainerStateTerminated{},
   357  		},
   358  	}
   359  	podStatus := v1.PodStatus{
   360  		Phase: v1.PodRunning,
   361  		ContainerStatuses: []v1.ContainerStatus{
   362  			unprobed, probedReady, probedPending, probedUnready, notStartedNoReadiness, startedNoReadiness, terminated,
   363  		},
   364  	}
   365  
   366  	m := newTestManager()
   367  	// no cleanup: using fake workers.
   368  
   369  	// Setup probe "workers" and cached results.
   370  	m.workers = map[probeKey]*worker{
   371  		{testPodUID, unprobed.Name, liveness}:             {},
   372  		{testPodUID, probedReady.Name, readiness}:         {},
   373  		{testPodUID, probedPending.Name, readiness}:       {},
   374  		{testPodUID, probedUnready.Name, readiness}:       {},
   375  		{testPodUID, notStartedNoReadiness.Name, startup}: {},
   376  		{testPodUID, startedNoReadiness.Name, startup}:    {},
   377  		{testPodUID, terminated.Name, readiness}:          {},
   378  	}
   379  	m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, &v1.Pod{})
   380  	m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, &v1.Pod{})
   381  	m.startupManager.Set(kubecontainer.ParseContainerID(startedNoReadiness.ContainerID), results.Success, &v1.Pod{})
   382  	m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, &v1.Pod{})
   383  
   384  	m.UpdatePodStatus(&v1.Pod{
   385  		ObjectMeta: metav1.ObjectMeta{
   386  			UID: testPodUID,
   387  		},
   388  		Spec: v1.PodSpec{
   389  			Containers: []v1.Container{
   390  				{Name: unprobed.Name},
   391  				{Name: probedReady.Name},
   392  				{Name: probedPending.Name},
   393  				{Name: probedUnready.Name},
   394  				{Name: notStartedNoReadiness.Name},
   395  				{Name: startedNoReadiness.Name},
   396  				{Name: terminated.Name},
   397  			},
   398  		},
   399  	}, &podStatus)
   400  
   401  	expectedReadiness := map[probeKey]bool{
   402  		{testPodUID, unprobed.Name, readiness}:              true,
   403  		{testPodUID, probedReady.Name, readiness}:           true,
   404  		{testPodUID, probedPending.Name, readiness}:         false,
   405  		{testPodUID, probedUnready.Name, readiness}:         false,
   406  		{testPodUID, notStartedNoReadiness.Name, readiness}: false,
   407  		{testPodUID, startedNoReadiness.Name, readiness}:    true,
   408  		{testPodUID, terminated.Name, readiness}:            false,
   409  	}
   410  	for _, c := range podStatus.ContainerStatuses {
   411  		expected, ok := expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
   412  		if !ok {
   413  			t.Fatalf("Missing expectation for test case: %v", c.Name)
   414  		}
   415  		if expected != c.Ready {
   416  			t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
   417  				c.Name, expected, c.Ready)
   418  		}
   419  	}
   420  }
   421  
   422  func TestUpdatePodStatusWithInitContainers(t *testing.T) {
   423  	notStarted := v1.ContainerStatus{
   424  		Name:        "not_started_container",
   425  		ContainerID: "test://not_started_container_id",
   426  		State: v1.ContainerState{
   427  			Running: &v1.ContainerStateRunning{},
   428  		},
   429  	}
   430  	started := v1.ContainerStatus{
   431  		Name:        "started_container",
   432  		ContainerID: "test://started_container_id",
   433  		State: v1.ContainerState{
   434  			Running: &v1.ContainerStateRunning{},
   435  		},
   436  	}
   437  	terminated := v1.ContainerStatus{
   438  		Name:        "terminated_container",
   439  		ContainerID: "test://terminated_container_id",
   440  		State: v1.ContainerState{
   441  			Terminated: &v1.ContainerStateTerminated{},
   442  		},
   443  	}
   444  
   445  	m := newTestManager()
   446  	// no cleanup: using fake workers.
   447  
   448  	// Setup probe "workers" and cached results.
   449  	m.workers = map[probeKey]*worker{
   450  		{testPodUID, notStarted.Name, startup}: {},
   451  		{testPodUID, started.Name, startup}:    {},
   452  	}
   453  	m.startupManager.Set(kubecontainer.ParseContainerID(started.ContainerID), results.Success, &v1.Pod{})
   454  
   455  	testCases := []struct {
   456  		desc                    string
   457  		expectedStartup         map[probeKey]bool
   458  		expectedReadiness       map[probeKey]bool
   459  		enableSidecarContainers bool
   460  	}{
   461  		{
   462  			desc: "init containers",
   463  			expectedStartup: map[probeKey]bool{
   464  				{testPodUID, notStarted.Name, startup}: false,
   465  				{testPodUID, started.Name, startup}:    true,
   466  				{testPodUID, terminated.Name, startup}: false,
   467  			},
   468  			expectedReadiness: map[probeKey]bool{
   469  				{testPodUID, notStarted.Name, readiness}: false,
   470  				{testPodUID, started.Name, readiness}:    false,
   471  				{testPodUID, terminated.Name, readiness}: true,
   472  			},
   473  			enableSidecarContainers: false,
   474  		},
   475  		{
   476  			desc: "init container with SidecarContainers feature",
   477  			expectedStartup: map[probeKey]bool{
   478  				{testPodUID, notStarted.Name, startup}: false,
   479  				{testPodUID, started.Name, startup}:    true,
   480  				{testPodUID, terminated.Name, startup}: false,
   481  			},
   482  			expectedReadiness: map[probeKey]bool{
   483  				{testPodUID, notStarted.Name, readiness}: false,
   484  				{testPodUID, started.Name, readiness}:    true,
   485  				{testPodUID, terminated.Name, readiness}: false,
   486  			},
   487  			enableSidecarContainers: true,
   488  		},
   489  	}
   490  
   491  	containerRestartPolicy := func(enableSidecarContainers bool) *v1.ContainerRestartPolicy {
   492  		if !enableSidecarContainers {
   493  			return nil
   494  		}
   495  		restartPolicy := v1.ContainerRestartPolicyAlways
   496  		return &restartPolicy
   497  	}
   498  
   499  	for _, tc := range testCases {
   500  		t.Run(tc.desc, func(t *testing.T) {
   501  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SidecarContainers, tc.enableSidecarContainers)()
   502  			podStatus := v1.PodStatus{
   503  				Phase: v1.PodRunning,
   504  				InitContainerStatuses: []v1.ContainerStatus{
   505  					notStarted, started, terminated,
   506  				},
   507  			}
   508  
   509  			m.UpdatePodStatus(&v1.Pod{
   510  				ObjectMeta: metav1.ObjectMeta{
   511  					UID: testPodUID,
   512  				},
   513  				Spec: v1.PodSpec{
   514  					InitContainers: []v1.Container{
   515  						{
   516  							Name:          notStarted.Name,
   517  							RestartPolicy: containerRestartPolicy(tc.enableSidecarContainers),
   518  						},
   519  						{
   520  							Name:          started.Name,
   521  							RestartPolicy: containerRestartPolicy(tc.enableSidecarContainers),
   522  						},
   523  						{
   524  							Name:          terminated.Name,
   525  							RestartPolicy: containerRestartPolicy(tc.enableSidecarContainers),
   526  						},
   527  					},
   528  				},
   529  			}, &podStatus)
   530  
   531  			for _, c := range podStatus.InitContainerStatuses {
   532  				{
   533  					expected, ok := tc.expectedStartup[probeKey{testPodUID, c.Name, startup}]
   534  					if !ok {
   535  						t.Fatalf("Missing expectation for test case: %v", c.Name)
   536  					}
   537  					if expected != *c.Started {
   538  						t.Errorf("Unexpected startup for container %v: Expected %v but got %v",
   539  							c.Name, expected, *c.Started)
   540  					}
   541  				}
   542  				{
   543  					expected, ok := tc.expectedReadiness[probeKey{testPodUID, c.Name, readiness}]
   544  					if !ok {
   545  						t.Fatalf("Missing expectation for test case: %v", c.Name)
   546  					}
   547  					if expected != c.Ready {
   548  						t.Errorf("Unexpected readiness for container %v: Expected %v but got %v",
   549  							c.Name, expected, c.Ready)
   550  					}
   551  				}
   552  			}
   553  		})
   554  	}
   555  }
   556  
   557  func (m *manager) extractedReadinessHandling() {
   558  	update := <-m.readinessManager.Updates()
   559  	// This code corresponds to an extract from kubelet.syncLoopIteration()
   560  	ready := update.Result == results.Success
   561  	m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
   562  }
   563  
   564  func TestUpdateReadiness(t *testing.T) {
   565  	testPod := getTestPod()
   566  	setTestProbe(testPod, readiness, v1.Probe{})
   567  	m := newTestManager()
   568  	defer cleanup(t, m)
   569  
   570  	// Start syncing readiness without leaking goroutine.
   571  	stopCh := make(chan struct{})
   572  	go wait.Until(m.extractedReadinessHandling, 0, stopCh)
   573  	defer func() {
   574  		close(stopCh)
   575  		// Send an update to exit extractedReadinessHandling()
   576  		m.readinessManager.Set(kubecontainer.ContainerID{}, results.Success, &v1.Pod{})
   577  	}()
   578  
   579  	exec := syncExecProber{}
   580  	exec.set(probe.Success, nil)
   581  	m.prober.exec = &exec
   582  
   583  	m.statusManager.SetPodStatus(testPod, getTestRunningStatus())
   584  
   585  	m.AddPod(testPod)
   586  	probePaths := []probeKey{{testPodUID, testContainerName, readiness}}
   587  	if err := expectProbes(m, probePaths); err != nil {
   588  		t.Error(err)
   589  	}
   590  
   591  	// Wait for ready status.
   592  	if err := waitForReadyStatus(t, m, true); err != nil {
   593  		t.Error(err)
   594  	}
   595  
   596  	// Prober fails.
   597  	exec.set(probe.Failure, nil)
   598  
   599  	// Wait for failed status.
   600  	if err := waitForReadyStatus(t, m, false); err != nil {
   601  		t.Error(err)
   602  	}
   603  }
   604  
   605  func expectProbes(m *manager, expectedProbes []probeKey) error {
   606  	m.workerLock.RLock()
   607  	defer m.workerLock.RUnlock()
   608  
   609  	var unexpected []probeKey
   610  	missing := make([]probeKey, len(expectedProbes))
   611  	copy(missing, expectedProbes)
   612  
   613  outer:
   614  	for probePath := range m.workers {
   615  		for i, expectedPath := range missing {
   616  			if probePath == expectedPath {
   617  				missing = append(missing[:i], missing[i+1:]...)
   618  				continue outer
   619  			}
   620  		}
   621  		unexpected = append(unexpected, probePath)
   622  	}
   623  
   624  	if len(missing) == 0 && len(unexpected) == 0 {
   625  		return nil // Yay!
   626  	}
   627  
   628  	return fmt.Errorf("Unexpected probes: %v; Missing probes: %v;", unexpected, missing)
   629  }
   630  
   631  const interval = 1 * time.Second
   632  
   633  // Wait for the given workers to exit & clean up.
   634  func waitForWorkerExit(t *testing.T, m *manager, workerPaths []probeKey) error {
   635  	for _, w := range workerPaths {
   636  		condition := func() (bool, error) {
   637  			_, exists := m.getWorker(w.podUID, w.containerName, w.probeType)
   638  			return !exists, nil
   639  		}
   640  		if exited, _ := condition(); exited {
   641  			continue // Already exited, no need to poll.
   642  		}
   643  		t.Logf("Polling %v", w)
   644  		if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
   645  			return err
   646  		}
   647  	}
   648  
   649  	return nil
   650  }
   651  
   652  // Wait for the given workers to exit & clean up.
   653  func waitForReadyStatus(t *testing.T, m *manager, ready bool) error {
   654  	condition := func() (bool, error) {
   655  		status, ok := m.statusManager.GetPodStatus(testPodUID)
   656  		if !ok {
   657  			return false, fmt.Errorf("status not found: %q", testPodUID)
   658  		}
   659  		if len(status.ContainerStatuses) != 1 {
   660  			return false, fmt.Errorf("expected single container, found %d", len(status.ContainerStatuses))
   661  		}
   662  		if status.ContainerStatuses[0].ContainerID != testContainerID.String() {
   663  			return false, fmt.Errorf("expected container %q, found %q",
   664  				testContainerID, status.ContainerStatuses[0].ContainerID)
   665  		}
   666  		return status.ContainerStatuses[0].Ready == ready, nil
   667  	}
   668  	t.Logf("Polling for ready state %v", ready)
   669  	if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
   670  		return err
   671  	}
   672  
   673  	return nil
   674  }
   675  
   676  // cleanup running probes to avoid leaking goroutines.
   677  func cleanup(t *testing.T, m *manager) {
   678  	m.CleanupPods(nil)
   679  
   680  	condition := func() (bool, error) {
   681  		workerCount := m.workerCount()
   682  		if workerCount > 0 {
   683  			t.Logf("Waiting for %d workers to exit...", workerCount)
   684  		}
   685  		return workerCount == 0, nil
   686  	}
   687  	if exited, _ := condition(); exited {
   688  		return // Already exited, no need to poll.
   689  	}
   690  	if err := wait.Poll(interval, wait.ForeverTestTimeout, condition); err != nil {
   691  		t.Fatalf("Error during cleanup: %v", err)
   692  	}
   693  }
   694  

View as plain text