1
16
17 package config
18
19 import (
20 "context"
21 "math/rand"
22 "reflect"
23 "sort"
24 "strconv"
25 "sync"
26 "testing"
27 "time"
28
29 v1 "k8s.io/api/core/v1"
30 apiequality "k8s.io/apimachinery/pkg/api/equality"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/types"
33 "k8s.io/apimachinery/pkg/util/sets"
34 "k8s.io/client-go/kubernetes/scheme"
35 "k8s.io/client-go/tools/record"
36 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
37 "k8s.io/kubernetes/pkg/securitycontext"
38 "k8s.io/kubernetes/test/utils/ktesting"
39 )
40
41 const (
42 TestSource = "test"
43 )
44
45 func expectEmptyChannel(t *testing.T, ch <-chan interface{}) {
46 select {
47 case update := <-ch:
48 t.Errorf("Expected no update in channel, Got %v", update)
49 default:
50 }
51 }
52
53 type sortedPods []*v1.Pod
54
55 func (s sortedPods) Len() int {
56 return len(s)
57 }
58 func (s sortedPods) Swap(i, j int) {
59 s[i], s[j] = s[j], s[i]
60 }
61 func (s sortedPods) Less(i, j int) bool {
62 return s[i].Namespace < s[j].Namespace
63 }
64
65 type mockPodStartupSLIObserver struct{}
66
67 func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {}
68
69 func CreateValidPod(name, namespace string) *v1.Pod {
70 return &v1.Pod{
71 ObjectMeta: metav1.ObjectMeta{
72 UID: types.UID(name + namespace),
73 Name: name,
74 Namespace: namespace,
75 },
76 Spec: v1.PodSpec{
77 RestartPolicy: v1.RestartPolicyAlways,
78 DNSPolicy: v1.DNSClusterFirst,
79 Containers: []v1.Container{
80 {
81 Name: "ctr",
82 Image: "image",
83 ImagePullPolicy: "IfNotPresent",
84 SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
85 TerminationMessagePolicy: v1.TerminationMessageReadFile,
86 },
87 },
88 },
89 }
90 }
91
92 func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) kubetypes.PodUpdate {
93 return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
94 }
95
96 func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
97 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
98 config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
99 channel := config.Channel(ctx, TestSource)
100 ch := config.Updates()
101 return channel, ch, config
102 }
103
104 func expectPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate, expected ...kubetypes.PodUpdate) {
105 for i := range expected {
106 update := <-ch
107 sort.Sort(sortedPods(update.Pods))
108 sort.Sort(sortedPods(expected[i].Pods))
109
110
111 expectedCopy, updateCopy := expected[i], update
112 expectedCopy.Pods, updateCopy.Pods = nil, nil
113 if !apiequality.Semantic.DeepEqual(expectedCopy, updateCopy) {
114 t.Fatalf("Expected %#v, Got %#v", expectedCopy, updateCopy)
115 }
116
117 if len(expected[i].Pods) != len(update.Pods) {
118 t.Fatalf("Expected %#v, Got %#v", expected[i], update)
119 }
120
121
122 for j := range expected[i].Pods {
123 if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) || !reflect.DeepEqual(expected[i].Pods[j].Status, update.Pods[j].Status) {
124 t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j])
125 }
126 }
127 }
128 expectNoPodUpdate(t, ch)
129 }
130
131 func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
132 select {
133 case update := <-ch:
134 t.Errorf("Expected no update in channel, Got %#v", update)
135 default:
136 }
137 }
138
139 func TestNewPodAdded(t *testing.T) {
140 tCtx := ktesting.Init(t)
141
142 channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
143
144
145 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
146 channel <- podUpdate
147 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
148
149 config.Sync()
150 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
151 }
152
153 func TestNewPodAddedInvalidNamespace(t *testing.T) {
154 tCtx := ktesting.Init(t)
155
156 channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
157
158
159 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
160 channel <- podUpdate
161 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "")))
162
163 config.Sync()
164 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "")))
165 }
166
167 func TestNewPodAddedDefaultNamespace(t *testing.T) {
168 tCtx := ktesting.Init(t)
169
170 channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
171
172
173 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
174 channel <- podUpdate
175 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")))
176
177 config.Sync()
178 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default")))
179 }
180
181 func TestNewPodAddedDifferentNamespaces(t *testing.T) {
182 tCtx := ktesting.Init(t)
183
184 channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
185
186
187 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
188 channel <- podUpdate
189 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")))
190
191
192 podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
193 channel <- podUpdate
194 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
195
196 config.Sync()
197 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "default"), CreateValidPod("foo", "new")))
198 }
199
200 func TestInvalidPodFiltered(t *testing.T) {
201 tCtx := ktesting.Init(t)
202
203 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
204
205
206 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
207 channel <- podUpdate
208 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
209
210
211 podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
212 channel <- podUpdate
213 expectNoPodUpdate(t, ch)
214 }
215
216 func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
217 tCtx := ktesting.Init(t)
218
219 channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshotAndUpdates)
220
221
222 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
223 channel <- podUpdate
224 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new")))
225
226 config.Sync()
227 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
228
229
230 pod := *podUpdate.Pods[0]
231 pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
232 channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod)
233 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, &pod))
234 }
235
236 func TestNewPodAddedSnapshot(t *testing.T) {
237 tCtx := ktesting.Init(t)
238
239 channel, ch, config := createPodConfigTester(tCtx, PodConfigNotificationSnapshot)
240
241
242 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
243 channel <- podUpdate
244 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo", "new")))
245
246 config.Sync()
247 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, kubetypes.AllSource, CreateValidPod("foo", "new")))
248
249
250 pod := *podUpdate.Pods[0]
251 pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
252 channel <- CreatePodUpdate(kubetypes.ADD, TestSource, &pod)
253 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.SET, TestSource, &pod))
254 }
255
256 func TestNewPodAddedUpdatedRemoved(t *testing.T) {
257 tCtx := ktesting.Init(t)
258
259 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
260
261
262 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
263 channel <- podUpdate
264 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")))
265
266
267 expectNoPodUpdate(t, ch)
268
269
270 pod := CreateValidPod("foo", "new")
271 pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
272 podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, pod)
273 channel <- podUpdate
274 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
275
276 podUpdate = CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new"))
277 channel <- podUpdate
278 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
279 }
280
281 func TestNewPodAddedDelete(t *testing.T) {
282 tCtx := ktesting.Init(t)
283
284 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
285
286
287 addedPod := CreateValidPod("foo", "new")
288 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)
289 channel <- podUpdate
290 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod))
291
292
293 timestamp := metav1.NewTime(time.Now())
294 deletedPod := CreateValidPod("foo", "new")
295 deletedPod.ObjectMeta.DeletionTimestamp = ×tamp
296 podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod)
297 channel <- podUpdate
298
299 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod))
300 }
301
302 func TestNewPodAddedUpdatedSet(t *testing.T) {
303 tCtx := ktesting.Init(t)
304
305 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
306
307
308 podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
309 channel <- podUpdate
310 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")))
311
312
313 expectNoPodUpdate(t, ch)
314
315
316 pod := CreateValidPod("foo2", "new")
317 pod.Spec.Containers = []v1.Container{{Name: "bar", Image: "test", ImagePullPolicy: v1.PullIfNotPresent, TerminationMessagePolicy: v1.TerminationMessageReadFile}}
318 podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new"))
319 channel <- podUpdate
320 expectPodUpdate(t, ch,
321 CreatePodUpdate(kubetypes.REMOVE, TestSource, CreateValidPod("foo", "new")),
322 CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo4", "new")),
323 CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
324 }
325
326 func TestNewPodAddedSetReconciled(t *testing.T) {
327 tCtx := ktesting.Init(t)
328
329
330
331 newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) {
332 pods := []*v1.Pod{
333 CreateValidPod("changeable-pod-0", "new"),
334 CreateValidPod("constant-pod-1", "new"),
335 CreateValidPod("constant-pod-2", "new"),
336 }
337 if touchStatus {
338 pods[0].Status = v1.PodStatus{Message: strconv.Itoa(rand.Int())}
339 }
340 if touchSpec {
341 pods[0].Spec.Containers[0].Name = strconv.Itoa(rand.Int())
342 }
343 return pods, pods[0]
344 }
345 for _, op := range []kubetypes.PodOperation{
346 kubetypes.ADD,
347 kubetypes.SET,
348 } {
349 var podWithStatusChange *v1.Pod
350 pods, _ := newTestPods(false, false)
351 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
352
353
354 channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
355 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pods...))
356
357
358 channel <- CreatePodUpdate(op, TestSource, pods...)
359 expectNoPodUpdate(t, ch)
360
361
362 pods, podWithStatusChange = newTestPods(true, false)
363 channel <- CreatePodUpdate(op, TestSource, pods...)
364 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RECONCILE, TestSource, podWithStatusChange))
365
366
367 pods, podWithStatusChange = newTestPods(true, true)
368 channel <- CreatePodUpdate(op, TestSource, pods...)
369 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, podWithStatusChange))
370 }
371 }
372
373 func TestInitialEmptySet(t *testing.T) {
374 tCtx := ktesting.Init(t)
375
376 for _, test := range []struct {
377 mode PodConfigNotificationMode
378 op kubetypes.PodOperation
379 }{
380 {PodConfigNotificationIncremental, kubetypes.ADD},
381 {PodConfigNotificationSnapshot, kubetypes.SET},
382 {PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
383 } {
384 channel, ch, _ := createPodConfigTester(tCtx, test.mode)
385
386
387 podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
388 channel <- podUpdate
389 expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource))
390
391
392 podUpdate = CreatePodUpdate(kubetypes.SET, TestSource)
393 channel <- podUpdate
394 podUpdate = CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
395 channel <- podUpdate
396 expectPodUpdate(t, ch, CreatePodUpdate(test.op, TestSource, CreateValidPod("foo", "new")))
397 }
398 }
399
400 func TestPodUpdateAnnotations(t *testing.T) {
401 tCtx := ktesting.Init(t)
402
403 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
404
405 pod := CreateValidPod("foo2", "new")
406 pod.Annotations = make(map[string]string)
407 pod.Annotations["kubernetes.io/blah"] = "blah"
408
409 clone := pod.DeepCopy()
410
411 podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), clone, CreateValidPod("foo3", "new"))
412 channel <- podUpdate
413 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")))
414
415 pod.Annotations["kubernetes.io/blah"] = "superblah"
416 podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
417 channel <- podUpdate
418 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
419
420 pod.Annotations["kubernetes.io/otherblah"] = "doh"
421 podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
422 channel <- podUpdate
423 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
424
425 delete(pod.Annotations, "kubernetes.io/blah")
426 podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))
427 channel <- podUpdate
428 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
429 }
430
431 func TestPodUpdateLabels(t *testing.T) {
432 tCtx := ktesting.Init(t)
433
434 channel, ch, _ := createPodConfigTester(tCtx, PodConfigNotificationIncremental)
435
436 pod := CreateValidPod("foo2", "new")
437 pod.Labels = make(map[string]string)
438 pod.Labels["key"] = "value"
439
440 clone := pod.DeepCopy()
441
442 podUpdate := CreatePodUpdate(kubetypes.SET, TestSource, clone)
443 channel <- podUpdate
444 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, pod))
445
446 pod.Labels["key"] = "newValue"
447 podUpdate = CreatePodUpdate(kubetypes.SET, TestSource, pod)
448 channel <- podUpdate
449 expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
450
451 }
452
453 func TestPodConfigRace(t *testing.T) {
454 tCtx := ktesting.Init(t)
455
456 eventBroadcaster := record.NewBroadcaster(record.WithContext(tCtx))
457 config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
458 seenSources := sets.NewString(TestSource)
459 var wg sync.WaitGroup
460 const iterations = 100
461 wg.Add(2)
462
463 go func() {
464 ctx, cancel := context.WithCancel(tCtx)
465 defer cancel()
466 defer wg.Done()
467 for i := 0; i < iterations; i++ {
468 config.Channel(ctx, strconv.Itoa(i))
469 }
470 }()
471 go func() {
472 defer wg.Done()
473 for i := 0; i < iterations; i++ {
474 config.SeenAllSources(seenSources)
475 }
476 }()
477
478 wg.Wait()
479 }
480
View as plain text