...

Source file src/k8s.io/kubernetes/pkg/kubelet/config/config_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/config

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package config
    18  
    19  import (
    20  	"context"
    21  	"math/rand"
    22  	"reflect"
    23  	"sort"
    24  	"strconv"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	"k8s.io/client-go/kubernetes/scheme"
    35  	"k8s.io/client-go/tools/record"
    36  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    37  	"k8s.io/kubernetes/pkg/securitycontext"
    38  	"k8s.io/kubernetes/test/utils/ktesting"
    39  )
    40  
    41  const (
    42  	TestSource = "test"
    43  )
    44  
    45  func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
    46  	select {
    47  	case update := <-ch:
    48  		t.Errorf("Expected no update in channel, Got %v", update)
    49  	default:
    50  	}
    51  }
    52  
    53  type sortedPods []*v1.Pod
    54  
    55  func (s sortedPods) Len() int {
    56  	return len(s)
    57  }
    58  func (s sortedPods) Swap(i, j int) {
    59  	s[i], s[j] = s[j], s[i]
    60  }
    61  func (s sortedPods) Less(i, j int) bool {
    62  	return s[i].Namespace < s[j].Namespace
    63  }
    64  
    65  type mockPodStartupSLIObserver struct{}
    66  
    67  func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {}
    68  
    69  func CreateValidPod(name, namespace string) *v1.Pod {
    70  	return &v1.Pod{
    71  		ObjectMeta: metav1.ObjectMeta{
    72  			UID:       types.UID(name + namespace), // for the purpose of testing, this is unique enough
    73  			Name:      name,
    74  			Namespace: namespace,
    75  		},
    76  		Spec: v1.PodSpec{
    77  			RestartPolicy: v1.RestartPolicyAlways,
    78  			DNSPolicy:     v1.DNSClusterFirst,
    79  			Containers: []v1.Container{
    80  				{
    81  					Name:                     "ctr",
    82  					Image:                    "image",
    83  					ImagePullPolicy:          "IfNotPresent",
    84  					SecurityContext:          securitycontext.ValidSecurityContextWithContainerDefaults(),
    85  					TerminationMessagePolicy: v1.TerminationMessageReadFile,
    86  				},
    87  			},
    88  		},
    89  	}
    90  }
    91  
    92  func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) kubetypes.PodUpdate {
    93  	return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
    94  }
    95  
    96  func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
    97  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    98  	config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
    99  	channel := config.Channel(ctx, TestSource)
   100  	ch := config.Updates()
   101  	return channel, ch, config
   102  }
   103  
   104  func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...kubetypes.PodUpdate) {
   105  	for i := range expected {
   106  		update := <-ch
   107  		sort.Sort(sortedPods(update.Pods))
   108  		sort.Sort(sortedPods(expected[i].Pods))
   109  		// Make copies of the expected/actual update to compare all fields
   110  		// except for "Pods", which are compared separately below.
   111  		expectedCopy, updateCopy := expected[i], update
   112  		expectedCopy.Pods, updateCopy.Pods = nil, nil
   113  		if !apiequality.Semantic.DeepEqual(expectedCopy, updateCopy) {
   114  			t.Fatalf("Expected %#v, Got %#v", expectedCopy, updateCopy)
   115  		}
   116  
   117  		if len(expected[i].Pods) != len(update.Pods) {
   118  			t.Fatalf("Expected %#v, Got %#v", expected[i], update)
   119  		}
   120  		// Compare pods one by one. This is necessary because we don't want to
   121  		// compare local annotations.
   122  		for j := range expected[i].Pods {
   123  			if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) || !reflect.DeepEqual(expected[i].Pods[j].Status, update.Pods[j].Status) {
   124  				t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j])
   125  			}
   126  		}
   127  	}
   128  	expectNoPodUpdate(t, ch)
   129  }
   130  
   131  func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
   132  	select {
   133  	case update := <-ch:
   134  		t.Errorf("Expected no update in channel, Got %#v", update)
   135  	default:
   136  	}
   137  }
   138  
   139  func TestNewPodAdded(t *testing.T) {
   140  	tCtx := ktesting.Init(t)
   141  
   142  	channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   143  
   144  	// see an update
   145  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   146  	channel <- podUpdate
   147  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
   148  
   149  	config.Sync()
   150  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
   151  }
   152  
   153  func TestNewPodAddedInvalidNamespace(t *testing.T) {
   154  	tCtx := ktesting.Init(t)
   155  
   156  	channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   157  
   158  	// see an update
   159  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
   160  	channel <- podUpdate
   161  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "")))
   162  
   163  	config.Sync()
   164  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "")))
   165  }
   166  
   167  func TestNewPodAddedDefaultNamespace(t *testing.T) {
   168  	tCtx := ktesting.Init(t)
   169  
   170  	channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   171  
   172  	// see an update
   173  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
   174  	channel <- podUpdate
   175  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")))
   176  
   177  	config.Sync()
   178  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default")))
   179  }
   180  
   181  func TestNewPodAddedDifferentNamespaces(t *testing.T) {
   182  	tCtx := ktesting.Init(t)
   183  
   184  	channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   185  
   186  	// see an update
   187  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
   188  	channel <- podUpdate
   189  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")))
   190  
   191  	// see an update in another namespace
   192  	podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   193  	channel <- podUpdate
   194  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
   195  
   196  	config.Sync()
   197  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default"), CreateValidPod("foo", "new")))
   198  }
   199  
   200  func TestInvalidPodFiltered(t *testing.T) {
   201  	tCtx := ktesting.Init(t)
   202  
   203  	channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   204  
   205  	// see an update
   206  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   207  	channel <- podUpdate
   208  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
   209  
   210  	// add an invalid update, pod with the same name
   211  	podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   212  	channel <- podUpdate
   213  	expectNoPodUpdate(t, ch)
   214  }
   215  
   216  func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
   217  	tCtx := ktesting.Init(t)
   218  
   219  	channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshotAndUpdates)
   220  
   221  	// see an set
   222  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   223  	channel <- podUpdate
   224  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new")))
   225  
   226  	config.Sync()
   227  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
   228  
   229  	// container updates are separated as UPDATE
   230  	pod := *podUpdate.Pods[0]
   231  	pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
   232  	channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod)
   233  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, &pod))
   234  }
   235  
   236  func TestNewPodAddedSnapshot(t *testing.T) {
   237  	tCtx := ktesting.Init(t)
   238  
   239  	channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshot)
   240  
   241  	// see an set
   242  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   243  	channel <- podUpdate
   244  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new")))
   245  
   246  	config.Sync()
   247  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
   248  
   249  	// container updates are separated as UPDATE
   250  	pod := *podUpdate.Pods[0]
   251  	pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
   252  	channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod)
   253  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, &pod))
   254  }
   255  
   256  func TestNewPodAddedUpdatedRemoved(t *testing.T) {
   257  	tCtx := ktesting.Init(t)
   258  
   259  	channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   260  
   261  	// should register an add
   262  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   263  	channel <- podUpdate
   264  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
   265  
   266  	// should ignore ADDs that are identical
   267  	expectNoPodUpdate(t, ch)
   268  
   269  	// an kubetypes.ADD should be converted to kubetypes.UPDATE
   270  	pod := CreateValidPod("foo", "new")
   271  	pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
   272  	podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, pod)
   273  	channel <- podUpdate
   274  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
   275  
   276  	podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new"))
   277  	channel <- podUpdate
   278  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
   279  }
   280  
   281  func TestNewPodAddedDelete(t *testing.T) {
   282  	tCtx := ktesting.Init(t)
   283  
   284  	channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   285  
   286  	// should register an add
   287  	addedPod := CreateValidPod("foo", "new")
   288  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)
   289  	channel <- podUpdate
   290  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod))
   291  
   292  	// mark this pod as deleted
   293  	timestamp := metav1.NewTime(time.Now())
   294  	deletedPod := CreateValidPod("foo", "new")
   295  	deletedPod.ObjectMeta.DeletionTimestamp = &timestamp
   296  	podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod)
   297  	channel <- podUpdate
   298  	// the existing pod should be gracefully deleted
   299  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod))
   300  }
   301  
   302  func TestNewPodAddedUpdatedSet(t *testing.T) {
   303  	tCtx := ktesting.Init(t)
   304  
   305  	channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   306  
   307  	// should register an add
   308  	podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
   309  	channel <- podUpdate
   310  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")))
   311  
   312  	// should ignore ADDs that are identical
   313  	expectNoPodUpdate(t, ch)
   314  
   315  	// should be converted to an kubetypes.ADD, kubetypes.REMOVE, and kubetypes.UPDATE
   316  	pod := CreateValidPod("foo2", "new")
   317  	pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
   318  	podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new"))
   319  	channel <- podUpdate
   320  	expectPodUpdate(t, ch,
   321  		CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new")),
   322  		CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo4", "new")),
   323  		CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
   324  }
   325  
   326  func TestNewPodAddedSetReconciled(t *testing.T) {
   327  	tCtx := ktesting.Init(t)
   328  
   329  	// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
   330  	// before touching to avoid data race.
   331  	newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) {
   332  		pods := []*v1.Pod{
   333  			CreateValidPod("changeable-pod-0", "new"),
   334  			CreateValidPod("constant-pod-1", "new"),
   335  			CreateValidPod("constant-pod-2", "new"),
   336  		}
   337  		if touchStatus {
   338  			pods[0].Status = v1.PodStatus{Message: strconv.Itoa(rand.Int())}
   339  		}
   340  		if touchSpec {
   341  			pods[0].Spec.Containers[0].Name = strconv.Itoa(rand.Int())
   342  		}
   343  		return pods, pods[0]
   344  	}
   345  	for _, op := range []kubetypes.PodOperation{
   346  		kubetypes.ADD,
   347  		kubetypes.SET,
   348  	} {
   349  		var podWithStatusChange *v1.Pod
   350  		pods, _ := newTestPods(false, false)
   351  		channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   352  
   353  		// Use SET to initialize the config, especially initialize the source set
   354  		channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
   355  		expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...))
   356  
   357  		// If status is not changed, no reconcile should be triggered
   358  		channel <- CreatePodUpdate(op, TestSource, pods...)
   359  		expectNoPodUpdate(t, ch)
   360  
   361  		// If the pod status is changed and not updated, a reconcile should be triggered
   362  		pods, podWithStatusChange = newTestPods(true, false)
   363  		channel <- CreatePodUpdate(op, TestSource, pods...)
   364  		expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange))
   365  
   366  		// If the pod status is changed, but the pod is also updated, no reconcile should be triggered
   367  		pods, podWithStatusChange = newTestPods(true, true)
   368  		channel <- CreatePodUpdate(op, TestSource, pods...)
   369  		expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange))
   370  	}
   371  }
   372  
   373  func TestInitialEmptySet(t *testing.T) {
   374  	tCtx := ktesting.Init(t)
   375  
   376  	for _, test := range []struct {
   377  		mode PodConfigNotificationMode
   378  		op   kubetypes.PodOperation
   379  	}{
   380  		{PodConfigNotificationIncremental, kubetypes.ADD},
   381  		{PodConfigNotificationSnapshot, kubetypes.SET},
   382  		{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
   383  	} {
   384  		channel, ch, _ := createPodConfigTester(tCtx, test.mode)
   385  
   386  		// should register an empty PodUpdate operation
   387  		podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
   388  		channel <- podUpdate
   389  		expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource))
   390  
   391  		// should ignore following empty sets
   392  		podUpdate = CreatePodUpdate(kubetypes.SET, TestSource)
   393  		channel <- podUpdate
   394  		podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
   395  		channel <- podUpdate
   396  		expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new")))
   397  	}
   398  }
   399  
   400  func TestPodUpdateAnnotations(t *testing.T) {
   401  	tCtx := ktesting.Init(t)
   402  
   403  	channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   404  
   405  	pod := CreateValidPod("foo2", "new")
   406  	pod.Annotations = make(map[string]string)
   407  	pod.Annotations["kubernetes.io/blah"] = "blah"
   408  
   409  	clone := pod.DeepCopy()
   410  
   411  	podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), clone, CreateValidPod("foo3", "new"))
   412  	channel <- podUpdate
   413  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")))
   414  
   415  	pod.Annotations["kubernetes.io/blah"] = "superblah"
   416  	podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
   417  	channel <- podUpdate
   418  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
   419  
   420  	pod.Annotations["kubernetes.io/otherblah"] = "doh"
   421  	podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
   422  	channel <- podUpdate
   423  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
   424  
   425  	delete(pod.Annotations, "kubernetes.io/blah")
   426  	podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
   427  	channel <- podUpdate
   428  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
   429  }
   430  
   431  func TestPodUpdateLabels(t *testing.T) {
   432  	tCtx := ktesting.Init(t)
   433  
   434  	channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
   435  
   436  	pod := CreateValidPod("foo2", "new")
   437  	pod.Labels = make(map[string]string)
   438  	pod.Labels["key"] = "value"
   439  
   440  	clone := pod.DeepCopy()
   441  
   442  	podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, clone)
   443  	channel <- podUpdate
   444  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pod))
   445  
   446  	pod.Labels["key"] = "newValue"
   447  	podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod)
   448  	channel <- podUpdate
   449  	expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
   450  
   451  }
   452  
   453  func TestPodConfigRace(t *testing.T) {
   454  	tCtx := ktesting.Init(t)
   455  
   456  	eventBroadcaster := record.NewBroadcaster(record.WithContext(tCtx))
   457  	config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
   458  	seenSources := sets.NewString(TestSource)
   459  	var wg sync.WaitGroup
   460  	const iterations = 100
   461  	wg.Add(2)
   462  
   463  	go func() {
   464  		ctx, cancel := context.WithCancel(tCtx)
   465  		defer cancel()
   466  		defer wg.Done()
   467  		for i := 0; i < iterations; i++ {
   468  			config.Channel(ctx, strconv.Itoa(i))
   469  		}
   470  	}()
   471  	go func() {
   472  		defer wg.Done()
   473  		for i := 0; i < iterations; i++ {
   474  			config.SeenAllSources(seenSources)
   475  		}
   476  	}()
   477  
   478  	wg.Wait()
   479  }
   480  

View as plain text