...

Source file src/k8s.io/kubernetes/pkg/kubelet/pleg/generic_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pleg

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pleg
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"reflect"
    24  	"sort"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/golang/mock/gomock"
    30  	"github.com/google/go-cmp/cmp"
    31  	"github.com/stretchr/testify/assert"
    32  
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/component-base/metrics/testutil"
    35  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    36  	containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
    37  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    38  	"k8s.io/utils/clock"
    39  	testingclock "k8s.io/utils/clock/testing"
    40  )
    41  
    42  const (
    43  	testContainerRuntimeType = "fooRuntime"
    44  	// largeChannelCap is a large enough capacity to hold all events in a single test.
    45  	largeChannelCap = 100
    46  )
    47  
    48  type TestGenericPLEG struct {
    49  	pleg    *GenericPLEG
    50  	runtime *containertest.FakeRuntime
    51  	clock   *testingclock.FakeClock
    52  }
    53  
    54  func newTestGenericPLEG() *TestGenericPLEG {
    55  	return newTestGenericPLEGWithChannelSize(largeChannelCap)
    56  }
    57  
    58  func newTestGenericPLEGWithChannelSize(eventChannelCap int) *TestGenericPLEG {
    59  	fakeRuntime := &containertest.FakeRuntime{}
    60  	clock := testingclock.NewFakeClock(time.Time{})
    61  	// The channel capacity should be large enough to hold all events in a
    62  	// single test.
    63  	pleg := &GenericPLEG{
    64  		relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 3 * time.Minute},
    65  		runtime:        fakeRuntime,
    66  		eventChannel:   make(chan *PodLifecycleEvent, eventChannelCap),
    67  		podRecords:     make(podRecords),
    68  		clock:          clock,
    69  	}
    70  	return &TestGenericPLEG{pleg: pleg, runtime: fakeRuntime, clock: clock}
    71  }
    72  
    73  func getEventsFromChannel(ch <-chan *PodLifecycleEvent) []*PodLifecycleEvent {
    74  	events := []*PodLifecycleEvent{}
    75  	for len(ch) > 0 {
    76  		e := <-ch
    77  		events = append(events, e)
    78  	}
    79  	return events
    80  }
    81  
    82  func createTestContainer(ID string, state kubecontainer.State) *kubecontainer.Container {
    83  	return &kubecontainer.Container{
    84  		ID:    kubecontainer.ContainerID{Type: testContainerRuntimeType, ID: ID},
    85  		State: state,
    86  	}
    87  }
    88  
    89  type sortableEvents []*PodLifecycleEvent
    90  
    91  func (a sortableEvents) Len() int      { return len(a) }
    92  func (a sortableEvents) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
    93  func (a sortableEvents) Less(i, j int) bool {
    94  	if a[i].ID != a[j].ID {
    95  		return a[i].ID < a[j].ID
    96  	}
    97  	return a[i].Data.(string) < a[j].Data.(string)
    98  }
    99  
   100  func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) {
   101  	sort.Sort(sortableEvents(expected))
   102  	sort.Sort(sortableEvents(actual))
   103  	if !reflect.DeepEqual(expected, actual) {
   104  		t.Errorf("Actual events differ from the expected; diff:\n %v", cmp.Diff(expected, actual))
   105  	}
   106  }
   107  
   108  func TestRelisting(t *testing.T) {
   109  	testPleg := newTestGenericPLEG()
   110  	pleg, runtime := testPleg.pleg, testPleg.runtime
   111  	ch := pleg.Watch()
   112  	// The first relist should send a PodSync event to each pod.
   113  	runtime.AllPodList = []*containertest.FakePod{
   114  		{Pod: &kubecontainer.Pod{
   115  			ID: "1234",
   116  			Containers: []*kubecontainer.Container{
   117  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   118  				createTestContainer("c2", kubecontainer.ContainerStateRunning),
   119  				createTestContainer("c3", kubecontainer.ContainerStateUnknown),
   120  			},
   121  		}},
   122  		{Pod: &kubecontainer.Pod{
   123  			ID: "4567",
   124  			Containers: []*kubecontainer.Container{
   125  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   126  			},
   127  		}},
   128  	}
   129  	pleg.Relist()
   130  	// Report every running/exited container if we see them for the first time.
   131  	expected := []*PodLifecycleEvent{
   132  		{ID: "1234", Type: ContainerStarted, Data: "c2"},
   133  		{ID: "4567", Type: ContainerDied, Data: "c1"},
   134  		{ID: "1234", Type: ContainerDied, Data: "c1"},
   135  	}
   136  	actual := getEventsFromChannel(ch)
   137  	verifyEvents(t, expected, actual)
   138  
   139  	// The second relist should not send out any event because no container has
   140  	// changed.
   141  	pleg.Relist()
   142  	actual = getEventsFromChannel(ch)
   143  	assert.True(t, len(actual) == 0, "no container has changed, event length should be 0")
   144  
   145  	runtime.AllPodList = []*containertest.FakePod{
   146  		{Pod: &kubecontainer.Pod{
   147  			ID: "1234",
   148  			Containers: []*kubecontainer.Container{
   149  				createTestContainer("c2", kubecontainer.ContainerStateExited),
   150  				createTestContainer("c3", kubecontainer.ContainerStateRunning),
   151  			},
   152  		}},
   153  		{Pod: &kubecontainer.Pod{
   154  			ID: "4567",
   155  			Containers: []*kubecontainer.Container{
   156  				createTestContainer("c4", kubecontainer.ContainerStateRunning),
   157  			},
   158  		}},
   159  	}
   160  	pleg.Relist()
   161  	// Only report containers that transitioned to running or exited status.
   162  	expected = []*PodLifecycleEvent{
   163  		{ID: "1234", Type: ContainerRemoved, Data: "c1"},
   164  		{ID: "1234", Type: ContainerDied, Data: "c2"},
   165  		{ID: "1234", Type: ContainerStarted, Data: "c3"},
   166  		{ID: "4567", Type: ContainerRemoved, Data: "c1"},
   167  		{ID: "4567", Type: ContainerStarted, Data: "c4"},
   168  	}
   169  
   170  	actual = getEventsFromChannel(ch)
   171  	verifyEvents(t, expected, actual)
   172  }
   173  
   174  // TestEventChannelFull test when channel is full, the events will be discard.
   175  func TestEventChannelFull(t *testing.T) {
   176  	testPleg := newTestGenericPLEGWithChannelSize(4)
   177  	pleg, runtime := testPleg.pleg, testPleg.runtime
   178  	ch := pleg.Watch()
   179  	// The first relist should send a PodSync event to each pod.
   180  	runtime.AllPodList = []*containertest.FakePod{
   181  		{Pod: &kubecontainer.Pod{
   182  			ID: "1234",
   183  			Containers: []*kubecontainer.Container{
   184  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   185  				createTestContainer("c2", kubecontainer.ContainerStateRunning),
   186  				createTestContainer("c3", kubecontainer.ContainerStateUnknown),
   187  			},
   188  		}},
   189  		{Pod: &kubecontainer.Pod{
   190  			ID: "4567",
   191  			Containers: []*kubecontainer.Container{
   192  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   193  			},
   194  		}},
   195  	}
   196  	pleg.Relist()
   197  	// Report every running/exited container if we see them for the first time.
   198  	expected := []*PodLifecycleEvent{
   199  		{ID: "1234", Type: ContainerStarted, Data: "c2"},
   200  		{ID: "4567", Type: ContainerDied, Data: "c1"},
   201  		{ID: "1234", Type: ContainerDied, Data: "c1"},
   202  	}
   203  	actual := getEventsFromChannel(ch)
   204  	verifyEvents(t, expected, actual)
   205  
   206  	runtime.AllPodList = []*containertest.FakePod{
   207  		{Pod: &kubecontainer.Pod{
   208  			ID: "1234",
   209  			Containers: []*kubecontainer.Container{
   210  				createTestContainer("c2", kubecontainer.ContainerStateExited),
   211  				createTestContainer("c3", kubecontainer.ContainerStateRunning),
   212  			},
   213  		}},
   214  		{Pod: &kubecontainer.Pod{
   215  			ID: "4567",
   216  			Containers: []*kubecontainer.Container{
   217  				createTestContainer("c4", kubecontainer.ContainerStateRunning),
   218  			},
   219  		}},
   220  	}
   221  	pleg.Relist()
   222  	allEvents := []*PodLifecycleEvent{
   223  		{ID: "1234", Type: ContainerRemoved, Data: "c1"},
   224  		{ID: "1234", Type: ContainerDied, Data: "c2"},
   225  		{ID: "1234", Type: ContainerStarted, Data: "c3"},
   226  		{ID: "4567", Type: ContainerRemoved, Data: "c1"},
   227  		{ID: "4567", Type: ContainerStarted, Data: "c4"},
   228  	}
   229  	// event channel is full, discard events
   230  	actual = getEventsFromChannel(ch)
   231  	assert.True(t, len(actual) == 4, "channel length should be 4")
   232  	assert.Subsetf(t, allEvents, actual, "actual events should in all events")
   233  }
   234  
   235  func TestDetectingContainerDeaths(t *testing.T) {
   236  	// Vary the number of relists after the container started and before the
   237  	// container died to account for the changes in pleg's internal states.
   238  	testReportMissingContainers(t, 1)
   239  	testReportMissingPods(t, 1)
   240  
   241  	testReportMissingContainers(t, 3)
   242  	testReportMissingPods(t, 3)
   243  }
   244  
   245  func testReportMissingContainers(t *testing.T, numRelists int) {
   246  	testPleg := newTestGenericPLEG()
   247  	pleg, runtime := testPleg.pleg, testPleg.runtime
   248  	ch := pleg.Watch()
   249  	runtime.AllPodList = []*containertest.FakePod{
   250  		{Pod: &kubecontainer.Pod{
   251  			ID: "1234",
   252  			Containers: []*kubecontainer.Container{
   253  				createTestContainer("c1", kubecontainer.ContainerStateRunning),
   254  				createTestContainer("c2", kubecontainer.ContainerStateRunning),
   255  				createTestContainer("c3", kubecontainer.ContainerStateExited),
   256  			},
   257  		}},
   258  	}
   259  	// Relist and drain the events from the channel.
   260  	for i := 0; i < numRelists; i++ {
   261  		pleg.Relist()
   262  		getEventsFromChannel(ch)
   263  	}
   264  
   265  	// Container c2 was stopped and removed between relists. We should report
   266  	// the event. The exited container c3 was garbage collected (i.e., removed)
   267  	// between relists. We should ignore that event.
   268  	runtime.AllPodList = []*containertest.FakePod{
   269  		{Pod: &kubecontainer.Pod{
   270  			ID: "1234",
   271  			Containers: []*kubecontainer.Container{
   272  				createTestContainer("c1", kubecontainer.ContainerStateRunning),
   273  			},
   274  		}},
   275  	}
   276  	pleg.Relist()
   277  	expected := []*PodLifecycleEvent{
   278  		{ID: "1234", Type: ContainerDied, Data: "c2"},
   279  		{ID: "1234", Type: ContainerRemoved, Data: "c2"},
   280  		{ID: "1234", Type: ContainerRemoved, Data: "c3"},
   281  	}
   282  	actual := getEventsFromChannel(ch)
   283  	verifyEvents(t, expected, actual)
   284  }
   285  
   286  func testReportMissingPods(t *testing.T, numRelists int) {
   287  	testPleg := newTestGenericPLEG()
   288  	pleg, runtime := testPleg.pleg, testPleg.runtime
   289  	ch := pleg.Watch()
   290  	runtime.AllPodList = []*containertest.FakePod{
   291  		{Pod: &kubecontainer.Pod{
   292  			ID: "1234",
   293  			Containers: []*kubecontainer.Container{
   294  				createTestContainer("c2", kubecontainer.ContainerStateRunning),
   295  			},
   296  		}},
   297  	}
   298  	// Relist and drain the events from the channel.
   299  	for i := 0; i < numRelists; i++ {
   300  		pleg.Relist()
   301  		getEventsFromChannel(ch)
   302  	}
   303  
   304  	// Container c2 was stopped and removed between relists. We should report
   305  	// the event.
   306  	runtime.AllPodList = []*containertest.FakePod{}
   307  	pleg.Relist()
   308  	expected := []*PodLifecycleEvent{
   309  		{ID: "1234", Type: ContainerDied, Data: "c2"},
   310  		{ID: "1234", Type: ContainerRemoved, Data: "c2"},
   311  	}
   312  	actual := getEventsFromChannel(ch)
   313  	verifyEvents(t, expected, actual)
   314  }
   315  
   316  func newTestGenericPLEGWithRuntimeMock(runtimeMock kubecontainer.Runtime) *GenericPLEG {
   317  	pleg := &GenericPLEG{
   318  		relistDuration: &RelistDuration{RelistPeriod: time.Hour, RelistThreshold: 2 * time.Hour},
   319  		runtime:        runtimeMock,
   320  		eventChannel:   make(chan *PodLifecycleEvent, 1000),
   321  		podRecords:     make(podRecords),
   322  		cache:          kubecontainer.NewCache(),
   323  		clock:          clock.RealClock{},
   324  	}
   325  	return pleg
   326  }
   327  
   328  func createTestPodsStatusesAndEvents(num int) ([]*kubecontainer.Pod, []*kubecontainer.PodStatus, []*PodLifecycleEvent) {
   329  	var pods []*kubecontainer.Pod
   330  	var statuses []*kubecontainer.PodStatus
   331  	var events []*PodLifecycleEvent
   332  	for i := 0; i < num; i++ {
   333  		id := types.UID(fmt.Sprintf("test-pod-%d", i))
   334  		cState := kubecontainer.ContainerStateRunning
   335  		container := createTestContainer(fmt.Sprintf("c%d", i), cState)
   336  		pod := &kubecontainer.Pod{
   337  			ID:         id,
   338  			Containers: []*kubecontainer.Container{container},
   339  		}
   340  		status := &kubecontainer.PodStatus{
   341  			ID:                id,
   342  			ContainerStatuses: []*kubecontainer.Status{{ID: container.ID, State: cState}},
   343  		}
   344  		event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
   345  		pods = append(pods, pod)
   346  		statuses = append(statuses, status)
   347  		events = append(events, event)
   348  
   349  	}
   350  	return pods, statuses, events
   351  }
   352  
   353  func TestRelistWithCache(t *testing.T) {
   354  	ctx := context.Background()
   355  	mockCtrl := gomock.NewController(t)
   356  	defer mockCtrl.Finish()
   357  	runtimeMock := containertest.NewMockRuntime(mockCtrl)
   358  
   359  	pleg := newTestGenericPLEGWithRuntimeMock(runtimeMock)
   360  	ch := pleg.Watch()
   361  
   362  	pods, statuses, events := createTestPodsStatusesAndEvents(2)
   363  	runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).AnyTimes()
   364  	runtimeMock.EXPECT().GetPodStatus(ctx, pods[0].ID, "", "").Return(statuses[0], nil).Times(1)
   365  	// Inject an error when querying runtime for the pod status for pods[1].
   366  	statusErr := fmt.Errorf("unable to get status")
   367  	runtimeMock.EXPECT().GetPodStatus(ctx, pods[1].ID, "", "").Return(&kubecontainer.PodStatus{}, statusErr).Times(1)
   368  
   369  	pleg.Relist()
   370  	actualEvents := getEventsFromChannel(ch)
   371  	cases := []struct {
   372  		pod    *kubecontainer.Pod
   373  		status *kubecontainer.PodStatus
   374  		error  error
   375  	}{
   376  		{pod: pods[0], status: statuses[0], error: nil},
   377  		{pod: pods[1], status: &kubecontainer.PodStatus{}, error: statusErr},
   378  	}
   379  	for i, c := range cases {
   380  		testStr := fmt.Sprintf("test[%d]", i)
   381  		actualStatus, actualErr := pleg.cache.Get(c.pod.ID)
   382  		assert.Equal(t, c.status, actualStatus, testStr)
   383  		assert.Equal(t, c.error, actualErr, testStr)
   384  	}
   385  	// pleg should not generate any event for pods[1] because of the error.
   386  	assert.Exactly(t, []*PodLifecycleEvent{events[0]}, actualEvents)
   387  
   388  	// Return normal status for pods[1].
   389  	runtimeMock.EXPECT().GetPodStatus(ctx, pods[1].ID, "", "").Return(statuses[1], nil).Times(1)
   390  	pleg.Relist()
   391  	actualEvents = getEventsFromChannel(ch)
   392  	cases = []struct {
   393  		pod    *kubecontainer.Pod
   394  		status *kubecontainer.PodStatus
   395  		error  error
   396  	}{
   397  		{pod: pods[0], status: statuses[0], error: nil},
   398  		{pod: pods[1], status: statuses[1], error: nil},
   399  	}
   400  	for i, c := range cases {
   401  		testStr := fmt.Sprintf("test[%d]", i)
   402  		actualStatus, actualErr := pleg.cache.Get(c.pod.ID)
   403  		assert.Equal(t, c.status, actualStatus, testStr)
   404  		assert.Equal(t, c.error, actualErr, testStr)
   405  	}
   406  	// Now that we are able to query status for pods[1], pleg should generate an event.
   407  	assert.Exactly(t, []*PodLifecycleEvent{events[1]}, actualEvents)
   408  }
   409  
   410  func TestRemoveCacheEntry(t *testing.T) {
   411  	ctx := context.Background()
   412  	mockCtrl := gomock.NewController(t)
   413  	defer mockCtrl.Finish()
   414  	runtimeMock := containertest.NewMockRuntime(mockCtrl)
   415  	pleg := newTestGenericPLEGWithRuntimeMock(runtimeMock)
   416  
   417  	pods, statuses, _ := createTestPodsStatusesAndEvents(1)
   418  	runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
   419  	runtimeMock.EXPECT().GetPodStatus(ctx, pods[0].ID, "", "").Return(statuses[0], nil).Times(1)
   420  	// Does a relist to populate the cache.
   421  	pleg.Relist()
   422  	// Delete the pod from runtime. Verify that the cache entry has been
   423  	// removed after relisting.
   424  	runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{}, nil).Times(1)
   425  	pleg.Relist()
   426  	actualStatus, actualErr := pleg.cache.Get(pods[0].ID)
   427  	assert.Equal(t, &kubecontainer.PodStatus{ID: pods[0].ID}, actualStatus)
   428  	assert.Equal(t, nil, actualErr)
   429  }
   430  
   431  func TestHealthy(t *testing.T) {
   432  	testPleg := newTestGenericPLEG()
   433  
   434  	// pleg should initially be unhealthy
   435  	pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock
   436  	ok, _ := pleg.Healthy()
   437  	assert.False(t, ok, "pleg should be unhealthy")
   438  
   439  	// Advance the clock without any relisting.
   440  	clock.Step(time.Minute * 10)
   441  	ok, _ = pleg.Healthy()
   442  	assert.False(t, ok, "pleg should be unhealthy")
   443  
   444  	// Relist and than advance the time by 1 minute. pleg should be healthy
   445  	// because this is within the allowed limit.
   446  	pleg.Relist()
   447  	clock.Step(time.Minute * 1)
   448  	ok, _ = pleg.Healthy()
   449  	assert.True(t, ok, "pleg should be healthy")
   450  
   451  	// Advance by relistThreshold without any relisting. pleg should be unhealthy
   452  	// because it has been longer than relistThreshold since a relist occurred.
   453  	clock.Step(pleg.relistDuration.RelistThreshold)
   454  	ok, _ = pleg.Healthy()
   455  	assert.False(t, ok, "pleg should be unhealthy")
   456  }
   457  
   458  func TestRelistWithReinspection(t *testing.T) {
   459  	ctx := context.Background()
   460  	mockCtrl := gomock.NewController(t)
   461  	defer mockCtrl.Finish()
   462  	runtimeMock := containertest.NewMockRuntime(mockCtrl)
   463  
   464  	pleg := newTestGenericPLEGWithRuntimeMock(runtimeMock)
   465  	ch := pleg.Watch()
   466  
   467  	infraContainer := createTestContainer("infra", kubecontainer.ContainerStateRunning)
   468  
   469  	podID := types.UID("test-pod")
   470  	pods := []*kubecontainer.Pod{{
   471  		ID:         podID,
   472  		Containers: []*kubecontainer.Container{infraContainer},
   473  	}}
   474  	runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
   475  
   476  	goodStatus := &kubecontainer.PodStatus{
   477  		ID:                podID,
   478  		ContainerStatuses: []*kubecontainer.Status{{ID: infraContainer.ID, State: infraContainer.State}},
   479  	}
   480  	runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(goodStatus, nil).Times(1)
   481  
   482  	goodEvent := &PodLifecycleEvent{ID: podID, Type: ContainerStarted, Data: infraContainer.ID.ID}
   483  
   484  	// listing 1 - everything ok, infra container set up for pod
   485  	pleg.Relist()
   486  	actualEvents := getEventsFromChannel(ch)
   487  	actualStatus, actualErr := pleg.cache.Get(podID)
   488  	assert.Equal(t, goodStatus, actualStatus)
   489  	assert.Equal(t, nil, actualErr)
   490  	assert.Exactly(t, []*PodLifecycleEvent{goodEvent}, actualEvents)
   491  
   492  	// listing 2 - pretend runtime was in the middle of creating the non-infra container for the pod
   493  	// and return an error during inspection
   494  	transientContainer := createTestContainer("transient", kubecontainer.ContainerStateUnknown)
   495  	podsWithTransientContainer := []*kubecontainer.Pod{{
   496  		ID:         podID,
   497  		Containers: []*kubecontainer.Container{infraContainer, transientContainer},
   498  	}}
   499  	runtimeMock.EXPECT().GetPods(ctx, true).Return(podsWithTransientContainer, nil).Times(1)
   500  
   501  	badStatus := &kubecontainer.PodStatus{
   502  		ID:                podID,
   503  		ContainerStatuses: []*kubecontainer.Status{},
   504  	}
   505  	runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(badStatus, errors.New("inspection error")).Times(1)
   506  
   507  	pleg.Relist()
   508  	actualEvents = getEventsFromChannel(ch)
   509  	actualStatus, actualErr = pleg.cache.Get(podID)
   510  	assert.Equal(t, badStatus, actualStatus)
   511  	assert.Equal(t, errors.New("inspection error"), actualErr)
   512  	assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)
   513  
   514  	// listing 3 - pretend the transient container has now disappeared, leaving just the infra
   515  	// container. Make sure the pod is reinspected for its status and the cache is updated.
   516  	runtimeMock.EXPECT().GetPods(ctx, true).Return(pods, nil).Times(1)
   517  	runtimeMock.EXPECT().GetPodStatus(ctx, podID, "", "").Return(goodStatus, nil).Times(1)
   518  
   519  	pleg.Relist()
   520  	actualEvents = getEventsFromChannel(ch)
   521  	actualStatus, actualErr = pleg.cache.Get(podID)
   522  	assert.Equal(t, goodStatus, actualStatus)
   523  	assert.Equal(t, nil, actualErr)
   524  	// no events are expected because relist #1 set the old pod record which has the infra container
   525  	// running. relist #2 had the inspection error and therefore didn't modify either old or new.
   526  	// relist #3 forced the reinspection of the pod to retrieve its status, but because the list of
   527  	// containers was the same as relist #1, nothing "changed", so there are no new events.
   528  	assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)
   529  }
   530  
   531  // Test detecting sandbox state changes.
   532  func TestRelistingWithSandboxes(t *testing.T) {
   533  	testPleg := newTestGenericPLEG()
   534  	pleg, runtime := testPleg.pleg, testPleg.runtime
   535  	ch := pleg.Watch()
   536  	// The first relist should send a PodSync event to each pod.
   537  	runtime.AllPodList = []*containertest.FakePod{
   538  		{Pod: &kubecontainer.Pod{
   539  			ID: "1234",
   540  			Sandboxes: []*kubecontainer.Container{
   541  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   542  				createTestContainer("c2", kubecontainer.ContainerStateRunning),
   543  				createTestContainer("c3", kubecontainer.ContainerStateUnknown),
   544  			},
   545  		}},
   546  		{Pod: &kubecontainer.Pod{
   547  			ID: "4567",
   548  			Sandboxes: []*kubecontainer.Container{
   549  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   550  			},
   551  		}},
   552  	}
   553  	pleg.Relist()
   554  	// Report every running/exited container if we see them for the first time.
   555  	expected := []*PodLifecycleEvent{
   556  		{ID: "1234", Type: ContainerStarted, Data: "c2"},
   557  		{ID: "4567", Type: ContainerDied, Data: "c1"},
   558  		{ID: "1234", Type: ContainerDied, Data: "c1"},
   559  	}
   560  	actual := getEventsFromChannel(ch)
   561  	verifyEvents(t, expected, actual)
   562  
   563  	// The second relist should not send out any event because no container has
   564  	// changed.
   565  	pleg.Relist()
   566  	verifyEvents(t, expected, actual)
   567  
   568  	runtime.AllPodList = []*containertest.FakePod{
   569  		{Pod: &kubecontainer.Pod{
   570  			ID: "1234",
   571  			Sandboxes: []*kubecontainer.Container{
   572  				createTestContainer("c2", kubecontainer.ContainerStateExited),
   573  				createTestContainer("c3", kubecontainer.ContainerStateRunning),
   574  			},
   575  		}},
   576  		{Pod: &kubecontainer.Pod{
   577  			ID: "4567",
   578  			Sandboxes: []*kubecontainer.Container{
   579  				createTestContainer("c4", kubecontainer.ContainerStateRunning),
   580  			},
   581  		}},
   582  	}
   583  	pleg.Relist()
   584  	// Only report containers that transitioned to running or exited status.
   585  	expected = []*PodLifecycleEvent{
   586  		{ID: "1234", Type: ContainerRemoved, Data: "c1"},
   587  		{ID: "1234", Type: ContainerDied, Data: "c2"},
   588  		{ID: "1234", Type: ContainerStarted, Data: "c3"},
   589  		{ID: "4567", Type: ContainerRemoved, Data: "c1"},
   590  		{ID: "4567", Type: ContainerStarted, Data: "c4"},
   591  	}
   592  
   593  	actual = getEventsFromChannel(ch)
   594  	verifyEvents(t, expected, actual)
   595  }
   596  
   597  func TestRelistIPChange(t *testing.T) {
   598  	ctx := context.Background()
   599  	testCases := []struct {
   600  		name   string
   601  		podID  string
   602  		podIPs []string
   603  	}{
   604  		{
   605  			name:   "test-0",
   606  			podID:  "test-pod-0",
   607  			podIPs: []string{"192.168.1.5"},
   608  		},
   609  		{
   610  			name:   "tets-1",
   611  			podID:  "test-pod-1",
   612  			podIPs: []string{"192.168.1.5/24", "2000::"},
   613  		},
   614  	}
   615  
   616  	mockCtrl := gomock.NewController(t)
   617  	defer mockCtrl.Finish()
   618  
   619  	for _, tc := range testCases {
   620  		runtimeMock := containertest.NewMockRuntime(mockCtrl)
   621  
   622  		pleg := newTestGenericPLEGWithRuntimeMock(runtimeMock)
   623  		ch := pleg.Watch()
   624  
   625  		id := types.UID(tc.podID)
   626  		cState := kubecontainer.ContainerStateRunning
   627  		container := createTestContainer("c0", cState)
   628  		pod := &kubecontainer.Pod{
   629  			ID:         id,
   630  			Containers: []*kubecontainer.Container{container},
   631  		}
   632  		status := &kubecontainer.PodStatus{
   633  			ID:                id,
   634  			IPs:               tc.podIPs,
   635  			ContainerStatuses: []*kubecontainer.Status{{ID: container.ID, State: cState}},
   636  		}
   637  		event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID}
   638  
   639  		runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{pod}, nil).Times(1)
   640  		runtimeMock.EXPECT().GetPodStatus(ctx, pod.ID, "", "").Return(status, nil).Times(1)
   641  
   642  		pleg.Relist()
   643  		actualEvents := getEventsFromChannel(ch)
   644  		actualStatus, actualErr := pleg.cache.Get(pod.ID)
   645  		assert.Equal(t, status, actualStatus, tc.name)
   646  		assert.Nil(t, actualErr, tc.name)
   647  		assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
   648  
   649  		// Clear the IP address and mark the container terminated
   650  		container = createTestContainer("c0", kubecontainer.ContainerStateExited)
   651  		pod = &kubecontainer.Pod{
   652  			ID:         id,
   653  			Containers: []*kubecontainer.Container{container},
   654  		}
   655  		status = &kubecontainer.PodStatus{
   656  			ID:                id,
   657  			ContainerStatuses: []*kubecontainer.Status{{ID: container.ID, State: kubecontainer.ContainerStateExited}},
   658  		}
   659  		event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID}
   660  		runtimeMock.EXPECT().GetPods(ctx, true).Return([]*kubecontainer.Pod{pod}, nil).Times(1)
   661  		runtimeMock.EXPECT().GetPodStatus(ctx, pod.ID, "", "").Return(status, nil).Times(1)
   662  
   663  		pleg.Relist()
   664  		actualEvents = getEventsFromChannel(ch)
   665  		actualStatus, actualErr = pleg.cache.Get(pod.ID)
   666  		// Must copy status to compare since its pointer gets passed through all
   667  		// the way to the event
   668  		statusCopy := *status
   669  		statusCopy.IPs = tc.podIPs
   670  		assert.Equal(t, &statusCopy, actualStatus, tc.name)
   671  		assert.Nil(t, actualErr, tc.name)
   672  		assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents)
   673  	}
   674  }
   675  
   676  func TestRunningPodAndContainerCount(t *testing.T) {
   677  	metrics.Register()
   678  	testPleg := newTestGenericPLEG()
   679  	pleg, runtime := testPleg.pleg, testPleg.runtime
   680  
   681  	runtime.AllPodList = []*containertest.FakePod{
   682  		{Pod: &kubecontainer.Pod{
   683  			ID: "1234",
   684  			Containers: []*kubecontainer.Container{
   685  				createTestContainer("c1", kubecontainer.ContainerStateRunning),
   686  				createTestContainer("c2", kubecontainer.ContainerStateUnknown),
   687  				createTestContainer("c3", kubecontainer.ContainerStateUnknown),
   688  			},
   689  			Sandboxes: []*kubecontainer.Container{
   690  				createTestContainer("s1", kubecontainer.ContainerStateRunning),
   691  				createTestContainer("s2", kubecontainer.ContainerStateRunning),
   692  				createTestContainer("s3", kubecontainer.ContainerStateUnknown),
   693  			},
   694  		}},
   695  		{Pod: &kubecontainer.Pod{
   696  			ID: "4567",
   697  			Containers: []*kubecontainer.Container{
   698  				createTestContainer("c1", kubecontainer.ContainerStateExited),
   699  			},
   700  			Sandboxes: []*kubecontainer.Container{
   701  				createTestContainer("s1", kubecontainer.ContainerStateRunning),
   702  				createTestContainer("s2", kubecontainer.ContainerStateExited),
   703  			},
   704  		}},
   705  	}
   706  
   707  	pleg.Relist()
   708  
   709  	tests := []struct {
   710  		name        string
   711  		metricsName string
   712  		wants       string
   713  	}{
   714  		{
   715  			name:        "test container count",
   716  			metricsName: "kubelet_running_containers",
   717  			wants: `
   718  # HELP kubelet_running_containers [ALPHA] Number of containers currently running
   719  # TYPE kubelet_running_containers gauge
   720  kubelet_running_containers{container_state="exited"} 1
   721  kubelet_running_containers{container_state="running"} 1
   722  kubelet_running_containers{container_state="unknown"} 2
   723  `,
   724  		},
   725  		{
   726  			name:        "test pod count",
   727  			metricsName: "kubelet_running_pods",
   728  			wants: `
   729  # HELP kubelet_running_pods [ALPHA] Number of pods that have a running pod sandbox
   730  # TYPE kubelet_running_pods gauge
   731  kubelet_running_pods 2
   732  `,
   733  		},
   734  	}
   735  
   736  	for _, test := range tests {
   737  		tc := test
   738  		t.Run(tc.name, func(t *testing.T) {
   739  			if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(tc.wants), tc.metricsName); err != nil {
   740  				t.Fatal(err)
   741  			}
   742  		})
   743  	}
   744  }
   745  

View as plain text