1
16
17 package kubelet
18
19 import (
20 "context"
21 "reflect"
22 "strconv"
23 "sync"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/types"
33 "k8s.io/apimachinery/pkg/util/sets"
34 "k8s.io/client-go/tools/record"
35 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
36 containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
37 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
38 "k8s.io/kubernetes/pkg/kubelet/util/queue"
39 "k8s.io/utils/clock"
40 clocktesting "k8s.io/utils/clock/testing"
41 )
42
43
44
45 type fakePodWorkers struct {
46 lock sync.Mutex
47 syncPodFn syncPodFnType
48 cache kubecontainer.Cache
49 t TestingInterface
50
51 triggeredDeletion []types.UID
52 triggeredTerminal []types.UID
53
54 statusLock sync.Mutex
55 running map[types.UID]bool
56 terminating map[types.UID]bool
57 terminated map[types.UID]bool
58 terminationRequested map[types.UID]bool
59 finished map[types.UID]bool
60 removeRuntime map[types.UID]bool
61 removeContent map[types.UID]bool
62 terminatingStaticPods map[string]bool
63 }
64
65 func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
66 f.lock.Lock()
67 defer f.lock.Unlock()
68 var uid types.UID
69 switch {
70 case options.Pod != nil:
71 uid = options.Pod.UID
72 case options.RunningPod != nil:
73 uid = options.RunningPod.ID
74 default:
75 return
76 }
77 status, err := f.cache.Get(uid)
78 if err != nil {
79 f.t.Errorf("Unexpected error: %v", err)
80 }
81 switch options.UpdateType {
82 case kubetypes.SyncPodKill:
83 f.triggeredDeletion = append(f.triggeredDeletion, uid)
84 default:
85 isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status)
86 if err != nil {
87 f.t.Errorf("Unexpected error: %v", err)
88 }
89 if isTerminal {
90 f.triggeredTerminal = append(f.triggeredTerminal, uid)
91 }
92 }
93 }
94
95 func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
96 return map[types.UID]PodWorkerSync{}
97 }
98
99 func (f *fakePodWorkers) IsPodKnownTerminated(uid types.UID) bool {
100 f.statusLock.Lock()
101 defer f.statusLock.Unlock()
102 return f.terminated[uid]
103 }
104 func (f *fakePodWorkers) CouldHaveRunningContainers(uid types.UID) bool {
105 f.statusLock.Lock()
106 defer f.statusLock.Unlock()
107 return f.running[uid]
108 }
109 func (f *fakePodWorkers) ShouldPodBeFinished(uid types.UID) bool {
110 f.statusLock.Lock()
111 defer f.statusLock.Unlock()
112 return f.finished[uid]
113 }
114 func (f *fakePodWorkers) IsPodTerminationRequested(uid types.UID) bool {
115 f.statusLock.Lock()
116 defer f.statusLock.Unlock()
117 return f.terminationRequested[uid]
118 }
119 func (f *fakePodWorkers) ShouldPodContainersBeTerminating(uid types.UID) bool {
120 f.statusLock.Lock()
121 defer f.statusLock.Unlock()
122 return f.terminating[uid]
123 }
124 func (f *fakePodWorkers) ShouldPodRuntimeBeRemoved(uid types.UID) bool {
125 f.statusLock.Lock()
126 defer f.statusLock.Unlock()
127 return f.removeRuntime[uid]
128 }
129 func (f *fakePodWorkers) setPodRuntimeBeRemoved(uid types.UID) {
130 f.statusLock.Lock()
131 defer f.statusLock.Unlock()
132 f.removeRuntime = map[types.UID]bool{uid: true}
133 }
134 func (f *fakePodWorkers) ShouldPodContentBeRemoved(uid types.UID) bool {
135 f.statusLock.Lock()
136 defer f.statusLock.Unlock()
137 return f.removeContent[uid]
138 }
139 func (f *fakePodWorkers) IsPodForMirrorPodTerminatingByFullName(podFullname string) bool {
140 f.statusLock.Lock()
141 defer f.statusLock.Unlock()
142 return f.terminatingStaticPods[podFullname]
143 }
144
145 type TestingInterface interface {
146 Errorf(format string, args ...interface{})
147 }
148
149 func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod {
150 pod := newNamedPod(uid, "ns", name, false)
151 pod.Status.Phase = phase
152 return pod
153 }
154
155 func newStaticPod(uid, name string) *v1.Pod {
156 thirty := int64(30)
157 return &v1.Pod{
158 ObjectMeta: metav1.ObjectMeta{
159 UID: types.UID(uid),
160 Name: name,
161 Annotations: map[string]string{
162 kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
163 },
164 },
165 Spec: v1.PodSpec{
166 TerminationGracePeriodSeconds: &thirty,
167 },
168 }
169 }
170
171 func newNamedPod(uid, namespace, name string, isStatic bool) *v1.Pod {
172 thirty := int64(30)
173 pod := &v1.Pod{
174 ObjectMeta: metav1.ObjectMeta{
175 UID: types.UID(uid),
176 Namespace: namespace,
177 Name: name,
178 },
179 Spec: v1.PodSpec{
180 TerminationGracePeriodSeconds: &thirty,
181 },
182 }
183 if isStatic {
184 pod.Annotations = map[string]string{
185 kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
186 }
187 }
188 return pod
189 }
190
191
192 type syncPodRecord struct {
193 name string
194 updateType kubetypes.SyncPodType
195 runningPod *kubecontainer.Pod
196 terminated bool
197 gracePeriod *int64
198 }
199
200 type FakeQueueItem struct {
201 UID types.UID
202 Delay time.Duration
203 }
204
205 type fakeQueue struct {
206 lock sync.Mutex
207 queue []FakeQueueItem
208 currentStart int
209 }
210
211 func (q *fakeQueue) Empty() bool {
212 q.lock.Lock()
213 defer q.lock.Unlock()
214 return (len(q.queue) - q.currentStart) == 0
215 }
216
217 func (q *fakeQueue) Items() []FakeQueueItem {
218 q.lock.Lock()
219 defer q.lock.Unlock()
220 return append(make([]FakeQueueItem, 0, len(q.queue)), q.queue...)
221 }
222
223 func (q *fakeQueue) Set() sets.String {
224 q.lock.Lock()
225 defer q.lock.Unlock()
226 work := sets.NewString()
227 for _, item := range q.queue[q.currentStart:] {
228 work.Insert(string(item.UID))
229 }
230 return work
231 }
232
233 func (q *fakeQueue) Enqueue(uid types.UID, delay time.Duration) {
234 q.lock.Lock()
235 defer q.lock.Unlock()
236 q.queue = append(q.queue, FakeQueueItem{UID: uid, Delay: delay})
237 }
238
239 func (q *fakeQueue) GetWork() []types.UID {
240 q.lock.Lock()
241 defer q.lock.Unlock()
242 work := make([]types.UID, 0, len(q.queue)-q.currentStart)
243 for _, item := range q.queue[q.currentStart:] {
244 work = append(work, item.UID)
245 }
246 q.currentStart = len(q.queue)
247 return work
248 }
249
250 type timeIncrementingWorkers struct {
251 lock sync.Mutex
252 w *podWorkers
253 runtime *containertest.FakeRuntime
254 holds map[types.UID]chan struct{}
255 }
256
257
258
259
260 func (w *timeIncrementingWorkers) UpdatePod(options UpdatePodOptions, afterFns ...func()) {
261 func() {
262 w.lock.Lock()
263 defer w.lock.Unlock()
264 w.w.UpdatePod(options)
265 w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
266 for _, fn := range afterFns {
267 fn()
268 }
269 }()
270 w.drainUnpausedWorkers()
271 }
272
273
274
275 func (w *timeIncrementingWorkers) SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) {
276 func() {
277 w.lock.Lock()
278 defer w.lock.Unlock()
279 knownPods = w.w.SyncKnownPods(desiredPods)
280 w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
281 }()
282 w.drainUnpausedWorkers()
283 return
284 }
285
286 func (w *timeIncrementingWorkers) PauseWorkers(uids ...types.UID) {
287 w.lock.Lock()
288 defer w.lock.Unlock()
289 if w.holds == nil {
290 w.holds = make(map[types.UID]chan struct{})
291 }
292 for _, uid := range uids {
293 if _, ok := w.holds[uid]; !ok {
294 w.holds[uid] = make(chan struct{})
295 }
296 }
297 }
298
299 func (w *timeIncrementingWorkers) ReleaseWorkers(uids ...types.UID) {
300 w.lock.Lock()
301 defer w.lock.Unlock()
302 w.ReleaseWorkersUnderLock(uids...)
303 }
304
305 func (w *timeIncrementingWorkers) ReleaseWorkersUnderLock(uids ...types.UID) {
306 for _, uid := range uids {
307 if ch, ok := w.holds[uid]; ok {
308 close(ch)
309 delete(w.holds, uid)
310 }
311 }
312 }
313
314 func (w *timeIncrementingWorkers) waitForPod(uid types.UID) {
315 w.lock.Lock()
316 ch, ok := w.holds[uid]
317 w.lock.Unlock()
318 if !ok {
319 return
320 }
321 <-ch
322 }
323
324 func (w *timeIncrementingWorkers) drainUnpausedWorkers() {
325 pausedWorkers := make(map[types.UID]struct{})
326 for {
327 for uid := range pausedWorkers {
328 delete(pausedWorkers, uid)
329 }
330 stillWorking := false
331
332
333 w.lock.Lock()
334 for uid := range w.holds {
335 pausedWorkers[uid] = struct{}{}
336 }
337 w.lock.Unlock()
338
339
340 w.w.podLock.Lock()
341 for uid, worker := range w.w.podSyncStatuses {
342 if _, ok := pausedWorkers[uid]; ok {
343 continue
344 }
345 if worker.working {
346 stillWorking = true
347 break
348 }
349 }
350 w.w.podLock.Unlock()
351
352 if !stillWorking {
353 break
354 }
355 time.Sleep(time.Millisecond)
356 }
357 }
358
359 func (w *timeIncrementingWorkers) tick() {
360 w.lock.Lock()
361 defer w.lock.Unlock()
362 w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
363 }
364
365
366
367
368 func createTimeIncrementingPodWorkers() (*timeIncrementingWorkers, map[types.UID][]syncPodRecord) {
369 nested, runtime, processed := createPodWorkers()
370 w := &timeIncrementingWorkers{
371 w: nested,
372 runtime: runtime,
373 }
374 nested.workerChannelFn = func(uid types.UID, in chan struct{}) <-chan struct{} {
375 ch := make(chan struct{})
376 go func() {
377 defer close(ch)
378
379
380 for range in {
381 w.waitForPod(uid)
382 w.tick()
383 ch <- struct{}{}
384 }
385 }()
386 return ch
387 }
388 return w, processed
389 }
390
391 func createPodWorkers() (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) {
392 lock := sync.Mutex{}
393 processed := make(map[types.UID][]syncPodRecord)
394 fakeRecorder := &record.FakeRecorder{}
395 fakeRuntime := &containertest.FakeRuntime{}
396 fakeCache := containertest.NewFakeCache(fakeRuntime)
397 fakeQueue := &fakeQueue{}
398 clock := clocktesting.NewFakePassiveClock(time.Unix(1, 0))
399 w := newPodWorkers(
400 &podSyncerFuncs{
401 syncPod: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
402 func() {
403 lock.Lock()
404 defer lock.Unlock()
405 pod := pod
406 processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
407 name: pod.Name,
408 updateType: updateType,
409 })
410 }()
411 return false, nil
412 },
413 syncTerminatingPod: func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
414 func() {
415 lock.Lock()
416 defer lock.Unlock()
417 processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
418 name: pod.Name,
419 updateType: kubetypes.SyncPodKill,
420 gracePeriod: gracePeriod,
421 })
422 }()
423 return nil
424 },
425 syncTerminatingRuntimePod: func(ctx context.Context, runningPod *kubecontainer.Pod) error {
426 func() {
427 lock.Lock()
428 defer lock.Unlock()
429 processed[runningPod.ID] = append(processed[runningPod.ID], syncPodRecord{
430 name: runningPod.Name,
431 updateType: kubetypes.SyncPodKill,
432 runningPod: runningPod,
433 })
434 }()
435 return nil
436 },
437 syncTerminatedPod: func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
438 func() {
439 lock.Lock()
440 defer lock.Unlock()
441 processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
442 name: pod.Name,
443 terminated: true,
444 })
445 }()
446 return nil
447 },
448 },
449 fakeRecorder,
450 fakeQueue,
451 time.Second,
452 time.Millisecond,
453 fakeCache,
454 )
455 workers := w.(*podWorkers)
456 workers.clock = clock
457 return workers, fakeRuntime, processed
458 }
459
460 func drainWorkers(podWorkers *podWorkers, numPods int) {
461 for {
462 stillWorking := false
463 podWorkers.podLock.Lock()
464 for i := 0; i < numPods; i++ {
465 if s, ok := podWorkers.podSyncStatuses[types.UID(strconv.Itoa(i))]; ok && s.working {
466 stillWorking = true
467 break
468 }
469 }
470 podWorkers.podLock.Unlock()
471 if !stillWorking {
472 break
473 }
474 time.Sleep(50 * time.Millisecond)
475 }
476 }
477
478 func drainWorkersExcept(podWorkers *podWorkers, uids ...types.UID) {
479 set := sets.NewString()
480 for _, uid := range uids {
481 set.Insert(string(uid))
482 }
483 for {
484 stillWorking := false
485 podWorkers.podLock.Lock()
486 for k, v := range podWorkers.podSyncStatuses {
487 if set.Has(string(k)) {
488 continue
489 }
490 if v.working {
491 stillWorking = true
492 break
493 }
494 }
495 podWorkers.podLock.Unlock()
496 if !stillWorking {
497 break
498 }
499 time.Sleep(50 * time.Millisecond)
500 }
501 }
502
503 func drainAllWorkers(podWorkers *podWorkers) {
504 for {
505 stillWorking := false
506 podWorkers.podLock.Lock()
507 for _, worker := range podWorkers.podSyncStatuses {
508 if worker.working {
509 stillWorking = true
510 break
511 }
512 }
513 podWorkers.podLock.Unlock()
514 if !stillWorking {
515 break
516 }
517 time.Sleep(50 * time.Millisecond)
518 }
519 }
520
521 func TestUpdatePodParallel(t *testing.T) {
522 podWorkers, _, processed := createPodWorkers()
523
524 numPods := 20
525 for i := 0; i < numPods; i++ {
526 for j := i; j < numPods; j++ {
527 podWorkers.UpdatePod(UpdatePodOptions{
528 Pod: newNamedPod(strconv.Itoa(j), "ns", strconv.Itoa(i), false),
529 UpdateType: kubetypes.SyncPodCreate,
530 })
531 }
532 }
533 drainWorkers(podWorkers, numPods)
534
535 if len(processed) != numPods {
536 t.Fatalf("Not all pods processed: %v", len(processed))
537 }
538 for i := 0; i < numPods; i++ {
539 uid := types.UID(strconv.Itoa(i))
540 events := processed[uid]
541 if len(events) < 1 || len(events) > i+1 {
542 t.Errorf("Pod %v processed %v times", i, len(events))
543 continue
544 }
545
546
547 last := len(events) - 1
548 if events[last].name != strconv.Itoa(i) {
549 t.Errorf("Pod %v: incorrect order %v, %#v", i, last, events)
550 }
551 }
552 }
553
554 func TestUpdatePod(t *testing.T) {
555 one := int64(1)
556 hasContext := func(status *podSyncStatus) *podSyncStatus {
557 status.ctx, status.cancelFn = context.Background(), func() {}
558 return status
559 }
560 withLabel := func(pod *v1.Pod, label, value string) *v1.Pod {
561 if pod.Labels == nil {
562 pod.Labels = make(map[string]string)
563 }
564 pod.Labels[label] = value
565 return pod
566 }
567 withDeletionTimestamp := func(pod *v1.Pod, ts time.Time, gracePeriod *int64) *v1.Pod {
568 pod.DeletionTimestamp = &metav1.Time{Time: ts}
569 pod.DeletionGracePeriodSeconds = gracePeriod
570 return pod
571 }
572 intp := func(i int64) *int64 {
573 return &i
574 }
575 expectPodSyncStatus := func(t *testing.T, expected, status *podSyncStatus) {
576 t.Helper()
577
578 if status != nil {
579 if e, a := expected.ctx != nil, status.ctx != nil; e != a {
580 t.Errorf("expected context %t, has context %t", e, a)
581 } else {
582 expected.ctx, status.ctx = nil, nil
583 }
584 if e, a := expected.cancelFn != nil, status.cancelFn != nil; e != a {
585 t.Errorf("expected cancelFn %t, has cancelFn %t", e, a)
586 } else {
587 expected.cancelFn, status.cancelFn = nil, nil
588 }
589 }
590 if e, a := expected, status; !reflect.DeepEqual(e, a) {
591 t.Fatalf("unexpected status: %s", cmp.Diff(e, a, cmp.AllowUnexported(podSyncStatus{})))
592 }
593 }
594 for _, tc := range []struct {
595 name string
596 update UpdatePodOptions
597 runtimeStatus *kubecontainer.PodStatus
598 prepare func(t *testing.T, w *timeIncrementingWorkers) (afterUpdateFn func())
599
600 expect *podSyncStatus
601 expectBeforeWorker *podSyncStatus
602 expectKnownTerminated bool
603 }{
604 {
605 name: "a new pod is recorded and started",
606 update: UpdatePodOptions{
607 UpdateType: kubetypes.SyncPodCreate,
608 Pod: newNamedPod("1", "ns", "running-pod", false),
609 },
610 expect: hasContext(&podSyncStatus{
611 fullname: "running-pod_ns",
612 syncedAt: time.Unix(1, 0),
613 startedAt: time.Unix(3, 0),
614 activeUpdate: &UpdatePodOptions{
615 Pod: newNamedPod("1", "ns", "running-pod", false),
616 },
617 }),
618 },
619 {
620 name: "a new pod is recorded and started unless it is a duplicate of an existing terminating pod UID",
621 update: UpdatePodOptions{
622 UpdateType: kubetypes.SyncPodCreate,
623 Pod: withLabel(newNamedPod("1", "ns", "running-pod", false), "updated", "value"),
624 },
625 prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
626 w.UpdatePod(UpdatePodOptions{
627 UpdateType: kubetypes.SyncPodCreate,
628 Pod: newNamedPod("1", "ns", "running-pod", false),
629 })
630 w.PauseWorkers("1")
631 w.UpdatePod(UpdatePodOptions{
632 UpdateType: kubetypes.SyncPodKill,
633 Pod: newNamedPod("1", "ns", "running-pod", false),
634 })
635 return func() { w.ReleaseWorkersUnderLock("1") }
636 },
637 expect: hasContext(&podSyncStatus{
638 fullname: "running-pod_ns",
639 syncedAt: time.Unix(1, 0),
640 startedAt: time.Unix(3, 0),
641 terminatingAt: time.Unix(3, 0),
642 terminatedAt: time.Unix(6, 0),
643 gracePeriod: 30,
644 startedTerminating: true,
645 restartRequested: true,
646 finished: true,
647 activeUpdate: &UpdatePodOptions{
648 Pod: newNamedPod("1", "ns", "running-pod", false),
649 KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: intp(30)},
650 },
651 }),
652 expectKnownTerminated: true,
653 },
654 {
655 name: "a new pod is recorded and started and running pod is ignored",
656 update: UpdatePodOptions{
657 UpdateType: kubetypes.SyncPodCreate,
658 Pod: newNamedPod("1", "ns", "running-pod", false),
659 RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
660 },
661 expect: hasContext(&podSyncStatus{
662 fullname: "running-pod_ns",
663 syncedAt: time.Unix(1, 0),
664 startedAt: time.Unix(3, 0),
665 activeUpdate: &UpdatePodOptions{
666 Pod: newNamedPod("1", "ns", "running-pod", false),
667 },
668 }),
669 },
670 {
671 name: "a running pod is terminated when an update contains a deletionTimestamp",
672 update: UpdatePodOptions{
673 UpdateType: kubetypes.SyncPodUpdate,
674 Pod: withDeletionTimestamp(newNamedPod("1", "ns", "running-pod", false), time.Unix(1, 0), intp(15)),
675 },
676 prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
677 w.UpdatePod(UpdatePodOptions{
678 UpdateType: kubetypes.SyncPodCreate,
679 Pod: newNamedPod("1", "ns", "running-pod", false),
680 })
681 return nil
682 },
683 expect: hasContext(&podSyncStatus{
684 fullname: "running-pod_ns",
685 syncedAt: time.Unix(1, 0),
686 startedAt: time.Unix(3, 0),
687 terminatingAt: time.Unix(3, 0),
688 terminatedAt: time.Unix(5, 0),
689 gracePeriod: 15,
690 startedTerminating: true,
691 finished: true,
692 deleted: true,
693 activeUpdate: &UpdatePodOptions{
694 Pod: withDeletionTimestamp(newNamedPod("1", "ns", "running-pod", false), time.Unix(1, 0), intp(15)),
695 KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: intp(15)},
696 },
697 }),
698 expectKnownTerminated: true,
699 },
700 {
701 name: "a running pod is terminated when an eviction is requested",
702 update: UpdatePodOptions{
703 UpdateType: kubetypes.SyncPodKill,
704 Pod: newNamedPod("1", "ns", "running-pod", false),
705 KillPodOptions: &KillPodOptions{Evict: true},
706 },
707 prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
708 w.UpdatePod(UpdatePodOptions{
709 UpdateType: kubetypes.SyncPodCreate,
710 Pod: newNamedPod("1", "ns", "running-pod", false),
711 })
712 return nil
713 },
714 expect: hasContext(&podSyncStatus{
715 fullname: "running-pod_ns",
716 syncedAt: time.Unix(1, 0),
717 startedAt: time.Unix(3, 0),
718 terminatingAt: time.Unix(3, 0),
719 terminatedAt: time.Unix(5, 0),
720 gracePeriod: 30,
721 startedTerminating: true,
722 finished: true,
723 evicted: true,
724 activeUpdate: &UpdatePodOptions{
725 Pod: newNamedPod("1", "ns", "running-pod", false),
726 KillPodOptions: &KillPodOptions{
727 PodTerminationGracePeriodSecondsOverride: intp(30),
728 Evict: true,
729 },
730 },
731 }),
732 expectKnownTerminated: true,
733 },
734 {
735 name: "a pod that is terminal and has never started must be terminated if the runtime does not have a cached terminal state",
736 update: UpdatePodOptions{
737 UpdateType: kubetypes.SyncPodCreate,
738 Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
739 },
740 expect: hasContext(&podSyncStatus{
741 fullname: "done-pod_ns",
742 syncedAt: time.Unix(1, 0),
743 terminatingAt: time.Unix(1, 0),
744 startedAt: time.Unix(3, 0),
745 terminatedAt: time.Unix(3, 0),
746 activeUpdate: &UpdatePodOptions{
747 Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
748 KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: intp(30)},
749 },
750 gracePeriod: 30,
751 startedTerminating: true,
752 finished: true,
753 }),
754 expectKnownTerminated: true,
755 },
756 {
757 name: "a pod that is terminal and has never started advances to finished if the runtime has a cached terminal state",
758 update: UpdatePodOptions{
759 UpdateType: kubetypes.SyncPodCreate,
760 Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
761 },
762 runtimeStatus: &kubecontainer.PodStatus{ },
763 expectBeforeWorker: &podSyncStatus{
764 fullname: "done-pod_ns",
765 syncedAt: time.Unix(1, 0),
766 terminatingAt: time.Unix(1, 0),
767 terminatedAt: time.Unix(1, 0),
768 pendingUpdate: &UpdatePodOptions{
769 UpdateType: kubetypes.SyncPodCreate,
770 Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
771 },
772 finished: false,
773 startedTerminating: true,
774 working: true,
775 },
776 expect: hasContext(&podSyncStatus{
777 fullname: "done-pod_ns",
778 syncedAt: time.Unix(1, 0),
779 terminatingAt: time.Unix(1, 0),
780 terminatedAt: time.Unix(1, 0),
781 startedAt: time.Unix(3, 0),
782 startedTerminating: true,
783 finished: true,
784 activeUpdate: &UpdatePodOptions{
785 UpdateType: kubetypes.SyncPodSync,
786 Pod: newPodWithPhase("1", "done-pod", v1.PodSucceeded),
787 },
788
789
790 restartRequested: false,
791 }),
792 expectKnownTerminated: true,
793 },
794 {
795 name: "an orphaned running pod we have not seen is marked terminating and advances to finished and then is removed",
796 update: UpdatePodOptions{
797 UpdateType: kubetypes.SyncPodKill,
798 RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
799 },
800 expectBeforeWorker: &podSyncStatus{
801 fullname: "orphaned-pod_ns",
802 syncedAt: time.Unix(1, 0),
803 terminatingAt: time.Unix(1, 0),
804 pendingUpdate: &UpdatePodOptions{
805 UpdateType: kubetypes.SyncPodKill,
806 RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
807 KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: &one},
808 },
809 gracePeriod: 1,
810 deleted: true,
811 observedRuntime: true,
812 working: true,
813 },
814
815
816
817 expectKnownTerminated: false,
818 },
819 {
820 name: "an orphaned running pod with a non-kill update type does nothing",
821 update: UpdatePodOptions{
822 UpdateType: kubetypes.SyncPodCreate,
823 RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
824 },
825 expect: nil,
826 },
827 } {
828 t.Run(tc.name, func(t *testing.T) {
829 var uid types.UID
830 switch {
831 case tc.update.Pod != nil:
832 uid = tc.update.Pod.UID
833 case tc.update.RunningPod != nil:
834 uid = tc.update.RunningPod.ID
835 default:
836 t.Fatalf("unable to find uid for update")
837 }
838
839 var fns []func()
840
841 podWorkers, _ := createTimeIncrementingPodWorkers()
842
843 if tc.expectBeforeWorker != nil {
844 fns = append(fns, func() {
845 expectPodSyncStatus(t, tc.expectBeforeWorker, podWorkers.w.podSyncStatuses[uid])
846 })
847 }
848
849 if tc.prepare != nil {
850 if fn := tc.prepare(t, podWorkers); fn != nil {
851 fns = append(fns, fn)
852 }
853 }
854
855
856
857 if tc.runtimeStatus != nil {
858 podWorkers.runtime.PodStatus = *tc.runtimeStatus
859 podWorkers.runtime.Err = nil
860 } else {
861 podWorkers.runtime.PodStatus = kubecontainer.PodStatus{}
862 podWorkers.runtime.Err = status.Error(codes.NotFound, "No such pod")
863 }
864 fns = append(fns, func() {
865 podWorkers.runtime.PodStatus = kubecontainer.PodStatus{}
866 podWorkers.runtime.Err = nil
867 })
868
869 podWorkers.UpdatePod(tc.update, fns...)
870
871 if podWorkers.w.IsPodKnownTerminated(uid) != tc.expectKnownTerminated {
872 t.Errorf("podWorker.IsPodKnownTerminated expected to be %t", tc.expectKnownTerminated)
873 }
874
875 expectPodSyncStatus(t, tc.expect, podWorkers.w.podSyncStatuses[uid])
876
877
878
879 })
880 }
881 }
882
883 func TestUpdatePodForRuntimePod(t *testing.T) {
884 podWorkers, _, processed := createPodWorkers()
885
886
887 podWorkers.UpdatePod(UpdatePodOptions{
888 UpdateType: kubetypes.SyncPodCreate,
889 RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
890 })
891 drainAllWorkers(podWorkers)
892 if len(processed) != 0 {
893 t.Fatalf("Not all pods processed: %v", len(processed))
894 }
895
896
897 podWorkers.UpdatePod(UpdatePodOptions{
898 UpdateType: kubetypes.SyncPodKill,
899 RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
900 })
901 drainAllWorkers(podWorkers)
902 if len(processed) != 1 {
903 t.Fatalf("Not all pods processed: %v", processed)
904 }
905 updates := processed["1"]
906 if len(updates) != 1 {
907 t.Fatalf("unexpected updates: %v", updates)
908 }
909 if updates[0].runningPod == nil || updates[0].updateType != kubetypes.SyncPodKill || updates[0].name != "1" {
910 t.Fatalf("unexpected update: %v", updates)
911 }
912 }
913
914 func TestUpdatePodForTerminatedRuntimePod(t *testing.T) {
915 podWorkers, _, processed := createPodWorkers()
916
917 now := time.Now()
918 podWorkers.podSyncStatuses[types.UID("1")] = &podSyncStatus{
919 startedTerminating: true,
920 terminatedAt: now.Add(-time.Second),
921 terminatingAt: now.Add(-2 * time.Second),
922 gracePeriod: 1,
923 }
924
925
926 podWorkers.UpdatePod(UpdatePodOptions{
927 UpdateType: kubetypes.SyncPodKill,
928 RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
929 })
930 drainAllWorkers(podWorkers)
931 if len(processed) != 0 {
932 t.Fatalf("Not all pods processed: %v", processed)
933 }
934 updates := processed["1"]
935 if len(updates) != 0 {
936 t.Fatalf("unexpected updates: %v", updates)
937 }
938 }
939
940 func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
941 podWorkers, _, processed := createPodWorkers()
942 numPods := 20
943 for i := 0; i < numPods; i++ {
944 pod := newNamedPod(strconv.Itoa(i), "ns", strconv.Itoa(i), false)
945 podWorkers.UpdatePod(UpdatePodOptions{
946 Pod: pod,
947 UpdateType: kubetypes.SyncPodCreate,
948 })
949 podWorkers.UpdatePod(UpdatePodOptions{
950 Pod: pod,
951 UpdateType: kubetypes.SyncPodKill,
952 })
953 podWorkers.UpdatePod(UpdatePodOptions{
954 Pod: pod,
955 UpdateType: kubetypes.SyncPodUpdate,
956 })
957 }
958 drainWorkers(podWorkers, numPods)
959 if len(processed) != numPods {
960 t.Errorf("Not all pods processed: %v", len(processed))
961 return
962 }
963 for i := 0; i < numPods; i++ {
964 uid := types.UID(strconv.Itoa(i))
965
966
967 syncPodRecords := processed[uid]
968 var match bool
969 grace := int64(30)
970 for _, possible := range [][]syncPodRecord{
971 {{name: string(uid), updateType: kubetypes.SyncPodKill, gracePeriod: &grace}, {name: string(uid), terminated: true}},
972 {{name: string(uid), updateType: kubetypes.SyncPodCreate}, {name: string(uid), updateType: kubetypes.SyncPodKill, gracePeriod: &grace}, {name: string(uid), terminated: true}},
973 } {
974 if reflect.DeepEqual(possible, syncPodRecords) {
975 match = true
976 break
977 }
978 }
979 if !match {
980 t.Fatalf("unexpected history for pod %v: %#v", i, syncPodRecords)
981 }
982 }
983 }
984
985 func newUIDSet(uids ...types.UID) sets.String {
986 set := sets.NewString()
987 for _, uid := range uids {
988 set.Insert(string(uid))
989 }
990 return set
991 }
992
993 type terminalPhaseSync struct {
994 lock sync.Mutex
995 fn syncPodFnType
996 terminal sets.String
997 }
998
999 func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
1000 isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus)
1001 if err != nil {
1002 return false, err
1003 }
1004 if !isTerminal {
1005 s.lock.Lock()
1006 defer s.lock.Unlock()
1007 isTerminal = s.terminal.Has(string(pod.UID))
1008 }
1009 return isTerminal, nil
1010 }
1011
1012 func (s *terminalPhaseSync) SetTerminal(uid types.UID) {
1013 s.lock.Lock()
1014 defer s.lock.Unlock()
1015 s.terminal.Insert(string(uid))
1016 }
1017
1018 func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync {
1019 return &terminalPhaseSync{
1020 fn: fn,
1021 terminal: sets.NewString(),
1022 }
1023 }
1024
1025 func TestTerminalPhaseTransition(t *testing.T) {
1026 podWorkers, _, _ := createPodWorkers()
1027 var channels WorkChannel
1028 podWorkers.workerChannelFn = channels.Intercept
1029 terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.podSyncer.(*podSyncerFuncs).syncPod)
1030 podWorkers.podSyncer.(*podSyncerFuncs).syncPod = terminalPhaseSyncer.SyncPod
1031
1032
1033 podWorkers.UpdatePod(UpdatePodOptions{
1034 Pod: newNamedPod("1", "test1", "pod1", false),
1035 UpdateType: kubetypes.SyncPodUpdate,
1036 })
1037 drainAllWorkers(podWorkers)
1038
1039
1040 pod1 := podWorkers.podSyncStatuses[types.UID("1")]
1041 if pod1.IsTerminated() {
1042 t.Fatalf("unexpected pod state: %#v", pod1)
1043 }
1044
1045
1046 podWorkers.UpdatePod(UpdatePodOptions{
1047 Pod: newNamedPod("1", "test1", "pod1", false),
1048 UpdateType: kubetypes.SyncPodUpdate,
1049 })
1050 drainAllWorkers(podWorkers)
1051
1052
1053 pod1 = podWorkers.podSyncStatuses[types.UID("1")]
1054 if pod1.IsTerminated() {
1055 t.Fatalf("unexpected pod state: %#v", pod1)
1056 }
1057
1058
1059 terminalPhaseSyncer.SetTerminal(types.UID("1"))
1060 podWorkers.UpdatePod(UpdatePodOptions{
1061 Pod: newNamedPod("1", "test1", "pod1", false),
1062 UpdateType: kubetypes.SyncPodUpdate,
1063 })
1064 drainAllWorkers(podWorkers)
1065
1066
1067 pod1 = podWorkers.podSyncStatuses[types.UID("1")]
1068 if !pod1.IsTerminationRequested() || !pod1.IsTerminated() {
1069 t.Fatalf("unexpected pod state: %#v", pod1)
1070 }
1071 }
1072
1073 func TestStaticPodExclusion(t *testing.T) {
1074 if testing.Short() {
1075 t.Skip("skipping test in short mode.")
1076 }
1077
1078 podWorkers, _, processed := createPodWorkers()
1079 var channels WorkChannel
1080 podWorkers.workerChannelFn = channels.Intercept
1081
1082 testPod := newNamedPod("2-static", "test1", "pod1", true)
1083 if !kubetypes.IsStaticPod(testPod) {
1084 t.Fatalf("unable to test static pod")
1085 }
1086
1087
1088 podWorkers.UpdatePod(UpdatePodOptions{
1089 Pod: newNamedPod("1-normal", "test1", "pod1", false),
1090 UpdateType: kubetypes.SyncPodUpdate,
1091 })
1092 podWorkers.UpdatePod(UpdatePodOptions{
1093 Pod: newNamedPod("2-static", "test1", "pod1", true),
1094 UpdateType: kubetypes.SyncPodUpdate,
1095 })
1096 drainAllWorkers(podWorkers)
1097
1098
1099 pod1 := podWorkers.podSyncStatuses[types.UID("1-normal")]
1100 if pod1.IsTerminated() {
1101 t.Fatalf("unexpected pod state: %#v", pod1)
1102 }
1103 pod2 := podWorkers.podSyncStatuses[types.UID("2-static")]
1104 if pod2.IsTerminated() {
1105 t.Fatalf("unexpected pod state: %#v", pod2)
1106 }
1107
1108 if len(processed) != 2 {
1109 t.Fatalf("unexpected synced pods: %#v", processed)
1110 }
1111 if e, a :=
1112 []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
1113 processed[types.UID("2-static")]; !reflect.DeepEqual(e, a) {
1114 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(e, a))
1115 }
1116 if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1117 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1118 }
1119
1120
1121 podWorkers.UpdatePod(UpdatePodOptions{
1122 Pod: newNamedPod("3-static", "test1", "pod1", true),
1123 UpdateType: kubetypes.SyncPodUpdate,
1124 })
1125 podWorkers.UpdatePod(UpdatePodOptions{
1126 Pod: newNamedPod("4-static", "test1", "pod1", true),
1127 UpdateType: kubetypes.SyncPodUpdate,
1128 })
1129 drainAllWorkers(podWorkers)
1130
1131
1132 pod1 = podWorkers.podSyncStatuses[types.UID("1-normal")]
1133 if pod1.IsTerminated() {
1134 t.Fatalf("unexpected pod state: %#v", pod1)
1135 }
1136 pod2 = podWorkers.podSyncStatuses[types.UID("2-static")]
1137 if pod2.IsTerminated() {
1138 t.Fatalf("unexpected pod state: %#v", pod2)
1139 }
1140 pod3 := podWorkers.podSyncStatuses[types.UID("3-static")]
1141 if pod3.IsTerminated() {
1142 t.Fatalf("unexpected pod state: %#v", pod3)
1143 }
1144 pod4 := podWorkers.podSyncStatuses[types.UID("4-static")]
1145 if pod4.IsTerminated() {
1146 t.Fatalf("unexpected pod state: %#v", pod4)
1147 }
1148
1149 if len(processed) != 2 {
1150 t.Fatalf("unexpected synced pods: %#v", processed)
1151 }
1152 if expected, actual :=
1153 []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
1154 processed[types.UID("2-static")]; !reflect.DeepEqual(expected, actual) {
1155 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1156 }
1157 if expected, actual :=
1158 []syncPodRecord(nil),
1159 processed[types.UID("3-static")]; !reflect.DeepEqual(expected, actual) {
1160 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1161 }
1162 if expected, actual :=
1163 []syncPodRecord(nil),
1164 processed[types.UID("4-static")]; !reflect.DeepEqual(expected, actual) {
1165 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1166 }
1167 if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1168 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1169 }
1170 if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1171 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1172 }
1173
1174 if e, a := sets.NewString("1-normal", "2-static", "4-static", "3-static"), podWorkers.workQueue.(*fakeQueue).Set(); !e.Equal(a) {
1175 t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
1176 }
1177
1178
1179 podWorkers.workQueue.GetWork()
1180 podWorkers.UpdatePod(UpdatePodOptions{
1181 Pod: newNamedPod("3-static", "test1", "pod1", true),
1182 UpdateType: kubetypes.SyncPodUpdate,
1183 })
1184 drainAllWorkers(podWorkers)
1185
1186
1187 if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1188 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1189 }
1190 if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1191 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1192 }
1193
1194 if e, a := sets.NewString("3-static"), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) {
1195 t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
1196 }
1197
1198
1199 podWorkers.workQueue.GetWork()
1200 podWorkers.UpdatePod(UpdatePodOptions{
1201 Pod: newNamedPod("3-static", "test1", "pod1", true),
1202 UpdateType: kubetypes.SyncPodKill,
1203 })
1204 drainAllWorkers(podWorkers)
1205
1206
1207 pod3 = podWorkers.podSyncStatuses[types.UID("3-static")]
1208 if !pod3.IsTerminated() {
1209 t.Fatalf("unexpected pod state: %#v", pod3)
1210 }
1211
1212 if e, a := sets.NewString(), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) {
1213 t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
1214 }
1215
1216 if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1217 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1218 }
1219
1220 if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1221 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1222 }
1223
1224
1225 podWorkers.UpdatePod(UpdatePodOptions{
1226 Pod: newNamedPod("2-static", "test1", "pod1", true),
1227 UpdateType: kubetypes.SyncPodKill,
1228 })
1229 drainAllWorkers(podWorkers)
1230
1231
1232 pod2 = podWorkers.podSyncStatuses[types.UID("2-static")]
1233 if !pod2.IsTerminated() {
1234 t.Fatalf("unexpected pod state: %#v", pod3)
1235 }
1236 if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1237 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1238 }
1239 if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1240 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1241 }
1242
1243
1244 podWorkers.UpdatePod(UpdatePodOptions{
1245 Pod: newNamedPod("4-static", "test1", "pod1", true),
1246 UpdateType: kubetypes.SyncPodUpdate,
1247 })
1248 drainAllWorkers(podWorkers)
1249
1250
1251 pod4 = podWorkers.podSyncStatuses[types.UID("4-static")]
1252 if pod4.IsTerminated() {
1253 t.Fatalf("unexpected pod state: %#v", pod3)
1254 }
1255 if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1256 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1257 }
1258 if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1259 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1260 }
1261
1262
1263 state := podWorkers.SyncKnownPods([]*v1.Pod{
1264 newNamedPod("1-normal", "test1", "pod1", false),
1265 newNamedPod("2-static", "test1", "pod1", true),
1266 newNamedPod("3-static", "test1", "pod1", true),
1267 newNamedPod("4-static", "test1", "pod1", true),
1268 })
1269 drainAllWorkers(podWorkers)
1270
1271
1272 if e, a := map[types.UID]PodWorkerSync{
1273 "1-normal": {State: SyncPod, HasConfig: true},
1274 "2-static": {State: TerminatedPod, HasConfig: true, Static: true},
1275 "3-static": {State: TerminatedPod},
1276 "4-static": {State: SyncPod, HasConfig: true, Static: true},
1277 }, state; !reflect.DeepEqual(e, a) {
1278 t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a))
1279 }
1280
1281 if status, ok := podWorkers.podSyncStatuses["3-static"]; !ok || status.terminatedAt.IsZero() || !status.finished || status.working {
1282 t.Fatalf("unexpected post termination status: %#v", status)
1283 }
1284
1285
1286 state = podWorkers.SyncKnownPods([]*v1.Pod{
1287 newNamedPod("1-normal", "test1", "pod1", false),
1288 newNamedPod("2-static", "test1", "pod1", true),
1289 newNamedPod("4-static", "test1", "pod1", true),
1290 })
1291 drainAllWorkers(podWorkers)
1292
1293
1294 if e, a := map[types.UID]PodWorkerSync{
1295 "1-normal": {State: SyncPod, HasConfig: true},
1296 "2-static": {State: TerminatedPod, HasConfig: true, Static: true},
1297 "4-static": {State: SyncPod, HasConfig: true, Static: true},
1298 }, state; !reflect.DeepEqual(e, a) {
1299 t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a))
1300 }
1301 if status, ok := podWorkers.podSyncStatuses["3-static"]; ok {
1302 t.Fatalf("unexpected post termination status: %#v", status)
1303 }
1304
1305
1306
1307 podWorkers.UpdatePod(UpdatePodOptions{
1308 Pod: newNamedPod("5-static", "test1", "pod1", true),
1309 UpdateType: kubetypes.SyncPodUpdate,
1310 })
1311
1312 drainAllWorkers(podWorkers)
1313 channels.Channel("5-static").Hold()
1314 podWorkers.UpdatePod(UpdatePodOptions{
1315 Pod: newNamedPod("5-static", "test1", "pod1", true),
1316 UpdateType: kubetypes.SyncPodKill,
1317 })
1318 podWorkers.UpdatePod(UpdatePodOptions{
1319 Pod: newNamedPod("6-static", "test1", "pod1", true),
1320 UpdateType: kubetypes.SyncPodUpdate,
1321 })
1322 drainWorkersExcept(podWorkers, "5-static")
1323
1324
1325 pod5 := podWorkers.podSyncStatuses[types.UID("5-static")]
1326 if !pod5.IsTerminationRequested() || pod5.IsTerminated() {
1327 t.Fatalf("unexpected status for pod 5: %#v", pod5)
1328 }
1329 if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1330 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1331 }
1332 if e, a := map[string][]types.UID{"pod1_test1": {"5-static", "6-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1333 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1334 }
1335
1336
1337 podWorkers.UpdatePod(UpdatePodOptions{
1338 Pod: newNamedPod("4-static", "test1", "pod1", true),
1339 UpdateType: kubetypes.SyncPodKill,
1340 })
1341 drainWorkersExcept(podWorkers, "5-static")
1342 podWorkers.UpdatePod(UpdatePodOptions{
1343 Pod: newNamedPod("6-static", "test1", "pod1", true),
1344 UpdateType: kubetypes.SyncPodUpdate,
1345 })
1346 drainWorkersExcept(podWorkers, "5-static")
1347
1348
1349 pod5 = podWorkers.podSyncStatuses[types.UID("5-static")]
1350 if !pod5.IsTerminationRequested() || pod5.IsTerminated() {
1351 t.Fatalf("unexpected status for pod 5: %#v", pod5)
1352 }
1353 if e, a := map[string]types.UID{"pod1_test1": "6-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1354 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1355 }
1356
1357 if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1358 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1359 }
1360
1361 if expected, actual :=
1362 []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
1363 processed[types.UID("6-static")]; !reflect.DeepEqual(expected, actual) {
1364 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1365 }
1366
1367
1368 channels.Channel("5-static").Release()
1369 drainAllWorkers(podWorkers)
1370 pod5 = podWorkers.podSyncStatuses[types.UID("5-static")]
1371 if !pod5.IsTerminated() {
1372 t.Fatalf("unexpected status for pod 5: %#v", pod5)
1373 }
1374
1375
1376
1377 podWorkers.UpdatePod(UpdatePodOptions{
1378 Pod: newNamedPod("7-static", "test1", "pod1", true),
1379 UpdateType: kubetypes.SyncPodUpdate,
1380 })
1381 podWorkers.UpdatePod(UpdatePodOptions{
1382 Pod: newNamedPod("8-static", "test1", "pod1", true),
1383 UpdateType: kubetypes.SyncPodUpdate,
1384 })
1385 podWorkers.UpdatePod(UpdatePodOptions{
1386 Pod: newNamedPod("9-static", "test1", "pod1", true),
1387 UpdateType: kubetypes.SyncPodUpdate,
1388 })
1389 drainAllWorkers(podWorkers)
1390 podWorkers.UpdatePod(UpdatePodOptions{
1391 Pod: newNamedPod("6-static", "test1", "pod1", true),
1392 UpdateType: kubetypes.SyncPodKill,
1393 })
1394 drainAllWorkers(podWorkers)
1395 podWorkers.UpdatePod(UpdatePodOptions{
1396 Pod: newNamedPod("6-static", "test1", "pod1", true),
1397 UpdateType: kubetypes.SyncPodCreate,
1398 })
1399 drainAllWorkers(podWorkers)
1400 podWorkers.UpdatePod(UpdatePodOptions{
1401 Pod: newNamedPod("8-static", "test1", "pod1", true),
1402 UpdateType: kubetypes.SyncPodUpdate,
1403 })
1404 drainAllWorkers(podWorkers)
1405
1406
1407 if status := podWorkers.podSyncStatuses["6-static"]; !status.restartRequested {
1408 t.Fatalf("unexpected restarted static pod: %#v", status)
1409 }
1410
1411 if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1412 t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
1413 }
1414
1415 if e, a := map[string][]types.UID{"pod1_test1": {"7-static", "8-static", "9-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
1416 t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
1417 }
1418
1419 if expected, actual :=
1420 []syncPodRecord(nil),
1421 processed[types.UID("7-static")]; !reflect.DeepEqual(expected, actual) {
1422 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1423 }
1424 if expected, actual :=
1425 []syncPodRecord(nil),
1426 processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) {
1427 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1428 }
1429 if expected, actual :=
1430 []syncPodRecord(nil),
1431 processed[types.UID("9-static")]; !reflect.DeepEqual(expected, actual) {
1432 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1433 }
1434
1435
1436 podWorkers.UpdatePod(UpdatePodOptions{
1437 Pod: newNamedPod("7-static", "test1", "pod1", true),
1438 UpdateType: kubetypes.SyncPodKill,
1439 })
1440 drainAllWorkers(podWorkers)
1441 podWorkers.UpdatePod(UpdatePodOptions{
1442 Pod: newNamedPod("8-static", "test1", "pod1", true),
1443 UpdateType: kubetypes.SyncPodUpdate,
1444 })
1445 drainAllWorkers(podWorkers)
1446
1447
1448 if expected, actual :=
1449 []syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
1450 processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) {
1451 t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
1452 }
1453
1454
1455 state = podWorkers.SyncKnownPods([]*v1.Pod{
1456 newNamedPod("8-static", "test1", "pod1", true),
1457 })
1458 drainAllWorkers(podWorkers)
1459 if e, a := map[types.UID]PodWorkerSync{
1460 "1-normal": {State: TerminatingPod, Orphan: true, HasConfig: true},
1461 "8-static": {State: SyncPod, HasConfig: true, Static: true},
1462 }, state; !reflect.DeepEqual(e, a) {
1463 t.Fatalf("unexpected actual restartable: %s", cmp.Diff(e, a))
1464 }
1465 }
1466
1467 type WorkChannelItem struct {
1468 out chan struct{}
1469 lock sync.Mutex
1470 pause bool
1471 queue int
1472 }
1473
1474 func (item *WorkChannelItem) Handle() {
1475 item.lock.Lock()
1476 defer item.lock.Unlock()
1477 if item.pause {
1478 item.queue++
1479 return
1480 }
1481 item.out <- struct{}{}
1482 }
1483
1484 func (item *WorkChannelItem) Hold() {
1485 item.lock.Lock()
1486 defer item.lock.Unlock()
1487 item.pause = true
1488 }
1489
1490 func (item *WorkChannelItem) Close() {
1491 item.lock.Lock()
1492 defer item.lock.Unlock()
1493 if item.out != nil {
1494 close(item.out)
1495 item.out = nil
1496 }
1497 }
1498
1499
1500 func (item *WorkChannelItem) Release() {
1501 item.lock.Lock()
1502 defer item.lock.Unlock()
1503 item.pause = false
1504 for i := 0; i < item.queue; i++ {
1505 item.out <- struct{}{}
1506 }
1507 item.queue = 0
1508 }
1509
1510
1511
1512
1513 type WorkChannel struct {
1514 lock sync.Mutex
1515 channels map[types.UID]*WorkChannelItem
1516 }
1517
1518 func (w *WorkChannel) Channel(uid types.UID) *WorkChannelItem {
1519 w.lock.Lock()
1520 defer w.lock.Unlock()
1521 if w.channels == nil {
1522 w.channels = make(map[types.UID]*WorkChannelItem)
1523 }
1524 channel, ok := w.channels[uid]
1525 if !ok {
1526 channel = &WorkChannelItem{
1527 out: make(chan struct{}, 1),
1528 }
1529 w.channels[uid] = channel
1530 }
1531 return channel
1532 }
1533
1534 func (w *WorkChannel) Intercept(uid types.UID, ch chan struct{}) (outCh <-chan struct{}) {
1535 channel := w.Channel(uid)
1536 w.lock.Lock()
1537
1538 defer w.lock.Unlock()
1539 go func() {
1540 defer func() {
1541 channel.Close()
1542 w.lock.Lock()
1543 defer w.lock.Unlock()
1544 delete(w.channels, uid)
1545 }()
1546 for range ch {
1547 channel.Handle()
1548 }
1549 }()
1550 return channel.out
1551 }
1552
1553 func TestSyncKnownPods(t *testing.T) {
1554 podWorkers, _, _ := createPodWorkers()
1555
1556 numPods := 20
1557 for i := 0; i < numPods; i++ {
1558 podWorkers.UpdatePod(UpdatePodOptions{
1559 Pod: newNamedPod(strconv.Itoa(i), "ns", "name", false),
1560 UpdateType: kubetypes.SyncPodUpdate,
1561 })
1562 }
1563 drainWorkers(podWorkers, numPods)
1564
1565 if len(podWorkers.podUpdates) != numPods {
1566 t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
1567 }
1568
1569 desiredPods := map[types.UID]sets.Empty{}
1570 desiredPods[types.UID("2")] = sets.Empty{}
1571 desiredPods[types.UID("14")] = sets.Empty{}
1572 desiredPodList := []*v1.Pod{newNamedPod("2", "ns", "name", false), newNamedPod("14", "ns", "name", false)}
1573
1574
1575 for i := 0; i < numPods; i++ {
1576 pod := newNamedPod(strconv.Itoa(i), "ns", "name", false)
1577 if _, ok := desiredPods[pod.UID]; ok {
1578 continue
1579 }
1580 if (i % 2) == 0 {
1581 now := metav1.Now()
1582 pod.DeletionTimestamp = &now
1583 }
1584 podWorkers.UpdatePod(UpdatePodOptions{
1585 Pod: pod,
1586 UpdateType: kubetypes.SyncPodKill,
1587 })
1588 }
1589 drainWorkers(podWorkers, numPods)
1590
1591 if !podWorkers.ShouldPodContainersBeTerminating(types.UID("0")) {
1592 t.Errorf("Expected pod to be terminating")
1593 }
1594 if !podWorkers.ShouldPodContainersBeTerminating(types.UID("1")) {
1595 t.Errorf("Expected pod to be terminating")
1596 }
1597 if podWorkers.ShouldPodContainersBeTerminating(types.UID("2")) {
1598 t.Errorf("Expected pod to not be terminating")
1599 }
1600 if !podWorkers.IsPodTerminationRequested(types.UID("0")) {
1601 t.Errorf("Expected pod to be terminating")
1602 }
1603 if podWorkers.IsPodTerminationRequested(types.UID("2")) {
1604 t.Errorf("Expected pod to not be terminating")
1605 }
1606
1607 if podWorkers.CouldHaveRunningContainers(types.UID("0")) {
1608 t.Errorf("Expected pod to be terminated (deleted and terminated)")
1609 }
1610 if podWorkers.CouldHaveRunningContainers(types.UID("1")) {
1611 t.Errorf("Expected pod to be terminated")
1612 }
1613 if !podWorkers.CouldHaveRunningContainers(types.UID("2")) {
1614 t.Errorf("Expected pod to not be terminated")
1615 }
1616
1617 if !podWorkers.ShouldPodContentBeRemoved(types.UID("0")) {
1618 t.Errorf("Expected pod to be suitable for removal (deleted and terminated)")
1619 }
1620 if podWorkers.ShouldPodContentBeRemoved(types.UID("1")) {
1621 t.Errorf("Expected pod to not be suitable for removal (terminated but not deleted)")
1622 }
1623 if podWorkers.ShouldPodContentBeRemoved(types.UID("2")) {
1624 t.Errorf("Expected pod to not be suitable for removal (not terminated)")
1625 }
1626
1627 if podWorkers.ShouldPodContainersBeTerminating(types.UID("abc")) {
1628 t.Errorf("Expected pod to not be known to be terminating (does not exist but not yet synced)")
1629 }
1630 if !podWorkers.CouldHaveRunningContainers(types.UID("abc")) {
1631 t.Errorf("Expected pod to potentially have running containers (does not exist but not yet synced)")
1632 }
1633 if podWorkers.ShouldPodContentBeRemoved(types.UID("abc")) {
1634 t.Errorf("Expected pod to not be suitable for removal (does not exist but not yet synced)")
1635 }
1636
1637 podWorkers.SyncKnownPods(desiredPodList)
1638 if len(podWorkers.podUpdates) != 2 {
1639 t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
1640 }
1641 if _, exists := podWorkers.podUpdates[types.UID("2")]; !exists {
1642 t.Errorf("No updates channel for pod 2")
1643 }
1644 if _, exists := podWorkers.podUpdates[types.UID("14")]; !exists {
1645 t.Errorf("No updates channel for pod 14")
1646 }
1647 if podWorkers.IsPodTerminationRequested(types.UID("2")) {
1648 t.Errorf("Expected pod termination request to be cleared after sync")
1649 }
1650
1651 if !podWorkers.ShouldPodContainersBeTerminating(types.UID("abc")) {
1652 t.Errorf("Expected pod to be expected to terminate containers (does not exist and synced at least once)")
1653 }
1654 if podWorkers.CouldHaveRunningContainers(types.UID("abc")) {
1655 t.Errorf("Expected pod to be known not to have running containers (does not exist and synced at least once)")
1656 }
1657 if !podWorkers.ShouldPodContentBeRemoved(types.UID("abc")) {
1658 t.Errorf("Expected pod to be suitable for removal (does not exist and synced at least once)")
1659 }
1660
1661
1662
1663 podWorkers.SyncKnownPods(nil)
1664 drainAllWorkers(podWorkers)
1665 if len(podWorkers.podUpdates) != 0 {
1666 t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
1667 }
1668 if len(podWorkers.podSyncStatuses) != 2 {
1669 t.Errorf("Incorrect number of tracked statuses: %#v", podWorkers.podSyncStatuses)
1670 }
1671
1672 for uid := range desiredPods {
1673 pod := newNamedPod(string(uid), "ns", "name", false)
1674 podWorkers.UpdatePod(UpdatePodOptions{
1675 Pod: pod,
1676 UpdateType: kubetypes.SyncPodKill,
1677 })
1678 }
1679 drainWorkers(podWorkers, numPods)
1680
1681
1682 podWorkers.SyncKnownPods(nil)
1683 if len(podWorkers.podUpdates) != 0 {
1684 t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
1685 }
1686 if len(podWorkers.podSyncStatuses) != 0 {
1687 t.Errorf("Incorrect number of tracked statuses: %#v", podWorkers.podSyncStatuses)
1688 }
1689 }
1690
1691 func Test_removeTerminatedWorker(t *testing.T) {
1692 podUID := types.UID("pod-uid")
1693
1694 testCases := []struct {
1695 desc string
1696 orphan bool
1697 podSyncStatus *podSyncStatus
1698 startedStaticPodsByFullname map[string]types.UID
1699 waitingToStartStaticPodsByFullname map[string][]types.UID
1700 removed bool
1701 expectGracePeriod int64
1702 expectPending *UpdatePodOptions
1703 }{
1704 {
1705 desc: "finished worker",
1706 podSyncStatus: &podSyncStatus{
1707 finished: true,
1708 },
1709 removed: true,
1710 },
1711 {
1712 desc: "waiting to start worker because of another started pod with the same fullname",
1713 podSyncStatus: &podSyncStatus{
1714 finished: false,
1715 fullname: "fake-fullname",
1716 },
1717 startedStaticPodsByFullname: map[string]types.UID{
1718 "fake-fullname": "another-pod-uid",
1719 },
1720 waitingToStartStaticPodsByFullname: map[string][]types.UID{
1721 "fake-fullname": {podUID},
1722 },
1723 removed: false,
1724 },
1725 {
1726 desc: "not yet started worker",
1727 podSyncStatus: &podSyncStatus{
1728 finished: false,
1729 fullname: "fake-fullname",
1730 },
1731 startedStaticPodsByFullname: make(map[string]types.UID),
1732 waitingToStartStaticPodsByFullname: map[string][]types.UID{
1733 "fake-fullname": {podUID},
1734 },
1735 removed: false,
1736 },
1737 {
1738 desc: "orphaned not started worker",
1739 podSyncStatus: &podSyncStatus{
1740 finished: false,
1741 fullname: "fake-fullname",
1742 },
1743 orphan: true,
1744 removed: true,
1745 },
1746 {
1747 desc: "orphaned started worker",
1748 podSyncStatus: &podSyncStatus{
1749 startedAt: time.Unix(1, 0),
1750 finished: false,
1751 fullname: "fake-fullname",
1752 },
1753 orphan: true,
1754 removed: false,
1755 },
1756 {
1757 desc: "orphaned terminating worker with no activeUpdate",
1758 podSyncStatus: &podSyncStatus{
1759 startedAt: time.Unix(1, 0),
1760 terminatingAt: time.Unix(2, 0),
1761 finished: false,
1762 fullname: "fake-fullname",
1763 },
1764 orphan: true,
1765 removed: false,
1766 },
1767 {
1768 desc: "orphaned terminating worker",
1769 podSyncStatus: &podSyncStatus{
1770 startedAt: time.Unix(1, 0),
1771 terminatingAt: time.Unix(2, 0),
1772 finished: false,
1773 fullname: "fake-fullname",
1774 activeUpdate: &UpdatePodOptions{
1775 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
1776 },
1777 },
1778 orphan: true,
1779 removed: false,
1780 expectPending: &UpdatePodOptions{
1781 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
1782 },
1783 },
1784 {
1785 desc: "orphaned terminating worker with pendingUpdate",
1786 podSyncStatus: &podSyncStatus{
1787 startedAt: time.Unix(1, 0),
1788 terminatingAt: time.Unix(2, 0),
1789 finished: false,
1790 fullname: "fake-fullname",
1791 working: true,
1792 pendingUpdate: &UpdatePodOptions{
1793 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
1794 },
1795 activeUpdate: &UpdatePodOptions{
1796 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
1797 },
1798 },
1799 orphan: true,
1800 removed: false,
1801 expectPending: &UpdatePodOptions{
1802 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
1803 },
1804 },
1805 {
1806 desc: "orphaned terminated worker with no activeUpdate",
1807 podSyncStatus: &podSyncStatus{
1808 startedAt: time.Unix(1, 0),
1809 terminatingAt: time.Unix(2, 0),
1810 terminatedAt: time.Unix(3, 0),
1811 finished: false,
1812 fullname: "fake-fullname",
1813 },
1814 orphan: true,
1815 removed: false,
1816 },
1817 {
1818 desc: "orphaned terminated worker",
1819 podSyncStatus: &podSyncStatus{
1820 startedAt: time.Unix(1, 0),
1821 terminatingAt: time.Unix(2, 0),
1822 terminatedAt: time.Unix(3, 0),
1823 finished: false,
1824 fullname: "fake-fullname",
1825 activeUpdate: &UpdatePodOptions{
1826 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
1827 },
1828 },
1829 orphan: true,
1830 removed: false,
1831 expectPending: &UpdatePodOptions{
1832 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
1833 },
1834 },
1835 {
1836 desc: "orphaned terminated worker with pendingUpdate",
1837 podSyncStatus: &podSyncStatus{
1838 startedAt: time.Unix(1, 0),
1839 terminatingAt: time.Unix(2, 0),
1840 terminatedAt: time.Unix(3, 0),
1841 finished: false,
1842 working: true,
1843 fullname: "fake-fullname",
1844 pendingUpdate: &UpdatePodOptions{
1845 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
1846 },
1847 activeUpdate: &UpdatePodOptions{
1848 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
1849 },
1850 },
1851 orphan: true,
1852 removed: false,
1853 expectPending: &UpdatePodOptions{
1854 Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
1855 },
1856 },
1857 }
1858
1859 for _, tc := range testCases {
1860 t.Run(tc.desc, func(t *testing.T) {
1861 podWorkers, _, _ := createPodWorkers()
1862 podWorkers.podSyncStatuses[podUID] = tc.podSyncStatus
1863 podWorkers.podUpdates[podUID] = make(chan struct{}, 1)
1864 if tc.podSyncStatus.working {
1865 podWorkers.podUpdates[podUID] <- struct{}{}
1866 }
1867 podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
1868 podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
1869
1870 podWorkers.removeTerminatedWorker(podUID, podWorkers.podSyncStatuses[podUID], tc.orphan)
1871 status, exists := podWorkers.podSyncStatuses[podUID]
1872 if tc.removed && exists {
1873 t.Fatalf("Expected pod worker to be removed")
1874 }
1875 if !tc.removed && !exists {
1876 t.Fatalf("Expected pod worker to not be removed")
1877 }
1878 if tc.removed {
1879 return
1880 }
1881 if tc.expectGracePeriod > 0 && status.gracePeriod != tc.expectGracePeriod {
1882 t.Errorf("Unexpected grace period %d", status.gracePeriod)
1883 }
1884 if !reflect.DeepEqual(tc.expectPending, status.pendingUpdate) {
1885 t.Errorf("Unexpected pending: %s", cmp.Diff(tc.expectPending, status.pendingUpdate))
1886 }
1887 if tc.expectPending != nil {
1888 if !status.working {
1889 t.Errorf("Should be working")
1890 }
1891 if len(podWorkers.podUpdates[podUID]) != 1 {
1892 t.Errorf("Should have one entry in podUpdates")
1893 }
1894 }
1895 })
1896 }
1897 }
1898
1899 type simpleFakeKubelet struct {
1900 pod *v1.Pod
1901 mirrorPod *v1.Pod
1902 podStatus *kubecontainer.PodStatus
1903 wg sync.WaitGroup
1904 }
1905
1906 func (kl *simpleFakeKubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
1907 kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
1908 return false, nil
1909 }
1910
1911 func (kl *simpleFakeKubelet) SyncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
1912 kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
1913 kl.wg.Done()
1914 return false, nil
1915 }
1916
1917 func (kl *simpleFakeKubelet) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
1918 return nil
1919 }
1920
1921 func (kl *simpleFakeKubelet) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error {
1922 return nil
1923 }
1924
1925 func (kl *simpleFakeKubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
1926 return nil
1927 }
1928
1929
1930
1931 func TestFakePodWorkers(t *testing.T) {
1932 fakeRecorder := &record.FakeRecorder{}
1933 fakeRuntime := &containertest.FakeRuntime{}
1934 fakeCache := containertest.NewFakeCache(fakeRuntime)
1935
1936 kubeletForRealWorkers := &simpleFakeKubelet{}
1937 kubeletForFakeWorkers := &simpleFakeKubelet{}
1938 realPodSyncer := newPodSyncerFuncs(kubeletForRealWorkers)
1939 realPodSyncer.syncPod = kubeletForRealWorkers.SyncPodWithWaitGroup
1940
1941 realPodWorkers := newPodWorkers(
1942 realPodSyncer,
1943 fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)
1944 fakePodWorkers := &fakePodWorkers{
1945 syncPodFn: kubeletForFakeWorkers.SyncPod,
1946 cache: fakeCache,
1947 t: t,
1948 }
1949
1950 tests := []struct {
1951 pod *v1.Pod
1952 mirrorPod *v1.Pod
1953 }{
1954 {
1955 &v1.Pod{},
1956 &v1.Pod{},
1957 },
1958 {
1959 podWithUIDNameNs("12345678", "foo", "new"),
1960 podWithUIDNameNs("12345678", "fooMirror", "new"),
1961 },
1962 {
1963 podWithUIDNameNs("98765", "bar", "new"),
1964 podWithUIDNameNs("98765", "barMirror", "new"),
1965 },
1966 }
1967
1968 for i, tt := range tests {
1969 kubeletForRealWorkers.wg.Add(1)
1970 realPodWorkers.UpdatePod(UpdatePodOptions{
1971 Pod: tt.pod,
1972 MirrorPod: tt.mirrorPod,
1973 UpdateType: kubetypes.SyncPodUpdate,
1974 })
1975 fakePodWorkers.UpdatePod(UpdatePodOptions{
1976 Pod: tt.pod,
1977 MirrorPod: tt.mirrorPod,
1978 UpdateType: kubetypes.SyncPodUpdate,
1979 })
1980
1981 kubeletForRealWorkers.wg.Wait()
1982
1983 if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
1984 t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
1985 }
1986
1987 if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
1988 t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
1989 }
1990
1991 if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
1992 t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
1993 }
1994 }
1995 }
1996
1997
1998 func TestKillPodNowFunc(t *testing.T) {
1999 fakeRecorder := &record.FakeRecorder{}
2000 podWorkers, _, processed := createPodWorkers()
2001 killPodFunc := killPodNow(podWorkers, fakeRecorder)
2002 pod := newNamedPod("test", "ns", "test", false)
2003 gracePeriodOverride := int64(0)
2004 err := killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
2005 status.Phase = v1.PodFailed
2006 status.Reason = "reason"
2007 status.Message = "message"
2008 })
2009 if err != nil {
2010 t.Fatalf("Unexpected error: %v", err)
2011 }
2012 drainAllWorkers(podWorkers)
2013 if len(processed) != 1 {
2014 t.Fatalf("len(processed) expected: %v, actual: %#v", 1, processed)
2015 }
2016 syncPodRecords := processed[pod.UID]
2017 if len(syncPodRecords) != 2 {
2018 t.Fatalf("Pod processed expected %v times, got %#v", 1, syncPodRecords)
2019 }
2020 if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
2021 t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
2022 }
2023 if !syncPodRecords[1].terminated {
2024 t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true)
2025 }
2026 }
2027
2028 func Test_allowPodStart(t *testing.T) {
2029 testCases := []struct {
2030 desc string
2031 pod *v1.Pod
2032 podSyncStatuses map[types.UID]*podSyncStatus
2033 startedStaticPodsByFullname map[string]types.UID
2034 waitingToStartStaticPodsByFullname map[string][]types.UID
2035
2036 expectedStartedStaticPodsByFullname map[string]types.UID
2037 expectedWaitingToStartStaticPodsByFullname map[string][]types.UID
2038 allowed bool
2039 allowedEver bool
2040 }{
2041 {
2042
2043
2044 desc: "non-static pod",
2045 pod: newNamedPod("uid-0", "ns", "test", false),
2046 podSyncStatuses: map[types.UID]*podSyncStatus{
2047 "uid-0": {
2048 fullname: "test_",
2049 },
2050 "uid-1": {
2051 fullname: "test_",
2052 },
2053 },
2054 allowed: true,
2055 allowedEver: true,
2056 },
2057 {
2058
2059
2060 desc: "non-static pod when there is a started static pod with the same full name",
2061 pod: newNamedPod("uid-0", "ns", "test", false),
2062 podSyncStatuses: map[types.UID]*podSyncStatus{
2063 "uid-0": {
2064 fullname: "test_",
2065 },
2066 "uid-1": {
2067 fullname: "test_",
2068 },
2069 },
2070 startedStaticPodsByFullname: map[string]types.UID{
2071 "test_": types.UID("uid-1"),
2072 },
2073 expectedStartedStaticPodsByFullname: map[string]types.UID{
2074 "test_": types.UID("uid-1"),
2075 },
2076 allowed: true,
2077 allowedEver: true,
2078 },
2079 {
2080
2081
2082 desc: "static pod when there is a started non-static pod with the same full name",
2083 pod: newNamedPod("uid-0", "ns", "test", false),
2084 podSyncStatuses: map[types.UID]*podSyncStatus{
2085 "uid-0": {
2086 fullname: "test_",
2087 },
2088 "uid-1": {
2089 fullname: "test_",
2090 },
2091 },
2092 allowed: true,
2093 allowedEver: true,
2094 },
2095 {
2096 desc: "static pod when there are no started static pods with the same full name",
2097 pod: newStaticPod("uid-0", "foo"),
2098 podSyncStatuses: map[types.UID]*podSyncStatus{
2099 "uid-0": {
2100 fullname: "foo_",
2101 },
2102 "uid-1": {
2103 fullname: "bar_",
2104 },
2105 },
2106 startedStaticPodsByFullname: map[string]types.UID{
2107 "bar_": types.UID("uid-1"),
2108 },
2109 expectedStartedStaticPodsByFullname: map[string]types.UID{
2110 "foo_": types.UID("uid-0"),
2111 "bar_": types.UID("uid-1"),
2112 },
2113 allowed: true,
2114 allowedEver: true,
2115 },
2116 {
2117 desc: "static pod when there is a started static pod with the same full name",
2118 pod: newStaticPod("uid-0", "foo"),
2119 podSyncStatuses: map[types.UID]*podSyncStatus{
2120 "uid-0": {
2121 fullname: "foo_",
2122 },
2123 "uid-1": {
2124 fullname: "foo_",
2125 },
2126 },
2127 startedStaticPodsByFullname: map[string]types.UID{
2128 "foo_": types.UID("uid-1"),
2129 },
2130 expectedStartedStaticPodsByFullname: map[string]types.UID{
2131 "foo_": types.UID("uid-1"),
2132 },
2133 allowed: false,
2134 allowedEver: true,
2135 },
2136 {
2137 desc: "static pod if the static pod has already started",
2138 pod: newStaticPod("uid-0", "foo"),
2139 podSyncStatuses: map[types.UID]*podSyncStatus{
2140 "uid-0": {
2141 fullname: "foo_",
2142 },
2143 },
2144 startedStaticPodsByFullname: map[string]types.UID{
2145 "foo_": types.UID("uid-0"),
2146 },
2147 expectedStartedStaticPodsByFullname: map[string]types.UID{
2148 "foo_": types.UID("uid-0"),
2149 },
2150 allowed: true,
2151 allowedEver: true,
2152 },
2153 {
2154 desc: "static pod if the static pod is the first pod waiting to start",
2155 pod: newStaticPod("uid-0", "foo"),
2156 podSyncStatuses: map[types.UID]*podSyncStatus{
2157 "uid-0": {
2158 fullname: "foo_",
2159 },
2160 },
2161 waitingToStartStaticPodsByFullname: map[string][]types.UID{
2162 "foo_": {
2163 types.UID("uid-0"),
2164 },
2165 },
2166 expectedStartedStaticPodsByFullname: map[string]types.UID{
2167 "foo_": types.UID("uid-0"),
2168 },
2169 expectedWaitingToStartStaticPodsByFullname: make(map[string][]types.UID),
2170 allowed: true,
2171 allowedEver: true,
2172 },
2173 {
2174 desc: "static pod if the static pod is not the first pod waiting to start",
2175 pod: newStaticPod("uid-0", "foo"),
2176 podSyncStatuses: map[types.UID]*podSyncStatus{
2177 "uid-0": {
2178 fullname: "foo_",
2179 },
2180 "uid-1": {
2181 fullname: "foo_",
2182 },
2183 },
2184 waitingToStartStaticPodsByFullname: map[string][]types.UID{
2185 "foo_": {
2186 types.UID("uid-1"),
2187 types.UID("uid-0"),
2188 },
2189 },
2190 expectedStartedStaticPodsByFullname: make(map[string]types.UID),
2191 expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
2192 "foo_": {
2193 types.UID("uid-1"),
2194 types.UID("uid-0"),
2195 },
2196 },
2197 allowed: false,
2198 allowedEver: true,
2199 },
2200 {
2201 desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod",
2202 pod: newStaticPod("uid-0", "foo"),
2203 podSyncStatuses: map[types.UID]*podSyncStatus{
2204 "uid-0": {
2205 fullname: "foo_",
2206 },
2207 "uid-1": {
2208 fullname: "foo_",
2209 },
2210 },
2211 waitingToStartStaticPodsByFullname: map[string][]types.UID{
2212 "foo_": {
2213 types.UID("uid-2"),
2214 types.UID("uid-2"),
2215 types.UID("uid-3"),
2216 types.UID("uid-0"),
2217 types.UID("uid-1"),
2218 },
2219 },
2220 expectedStartedStaticPodsByFullname: map[string]types.UID{
2221 "foo_": types.UID("uid-0"),
2222 },
2223 expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
2224 "foo_": {
2225 types.UID("uid-1"),
2226 },
2227 },
2228 allowed: true,
2229 allowedEver: true,
2230 },
2231 {
2232 desc: "static pod if the static pod is the first pod that is not termination requested and waiting to start",
2233 pod: newStaticPod("uid-0", "foo"),
2234 podSyncStatuses: map[types.UID]*podSyncStatus{
2235 "uid-0": {
2236 fullname: "foo_",
2237 },
2238 "uid-1": {
2239 fullname: "foo_",
2240 },
2241 "uid-2": {
2242 fullname: "foo_",
2243 terminatingAt: time.Now(),
2244 },
2245 "uid-3": {
2246 fullname: "foo_",
2247 terminatedAt: time.Now(),
2248 },
2249 },
2250 waitingToStartStaticPodsByFullname: map[string][]types.UID{
2251 "foo_": {
2252 types.UID("uid-2"),
2253 types.UID("uid-3"),
2254 types.UID("uid-0"),
2255 types.UID("uid-1"),
2256 },
2257 },
2258 expectedStartedStaticPodsByFullname: map[string]types.UID{
2259 "foo_": types.UID("uid-0"),
2260 },
2261 expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
2262 "foo_": {
2263 types.UID("uid-1"),
2264 },
2265 },
2266 allowed: true,
2267 allowedEver: true,
2268 },
2269 {
2270 desc: "static pod if there is no sync status for the pod should be denied",
2271 pod: newStaticPod("uid-0", "foo"),
2272 podSyncStatuses: map[types.UID]*podSyncStatus{
2273 "uid-1": {
2274 fullname: "foo_",
2275 },
2276 "uid-2": {
2277 fullname: "foo_",
2278 terminatingAt: time.Now(),
2279 },
2280 "uid-3": {
2281 fullname: "foo_",
2282 terminatedAt: time.Now(),
2283 },
2284 },
2285 waitingToStartStaticPodsByFullname: map[string][]types.UID{
2286 "foo_": {
2287 types.UID("uid-1"),
2288 },
2289 },
2290 expectedStartedStaticPodsByFullname: map[string]types.UID{},
2291 expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
2292 "foo_": {
2293 types.UID("uid-1"),
2294 },
2295 },
2296 allowed: false,
2297 allowedEver: false,
2298 },
2299 {
2300 desc: "static pod if the static pod is terminated should not be allowed",
2301 pod: newStaticPod("uid-0", "foo"),
2302 podSyncStatuses: map[types.UID]*podSyncStatus{
2303 "uid-0": {
2304 fullname: "foo_",
2305 terminatingAt: time.Now(),
2306 },
2307 },
2308 waitingToStartStaticPodsByFullname: map[string][]types.UID{
2309 "foo_": {
2310 types.UID("uid-2"),
2311 types.UID("uid-3"),
2312 types.UID("uid-0"),
2313 types.UID("uid-1"),
2314 },
2315 },
2316 expectedStartedStaticPodsByFullname: map[string]types.UID{},
2317 expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
2318 "foo_": {
2319 types.UID("uid-2"),
2320 types.UID("uid-3"),
2321 types.UID("uid-0"),
2322 types.UID("uid-1"),
2323 },
2324 },
2325 allowed: false,
2326 allowedEver: false,
2327 },
2328 }
2329
2330 for _, tc := range testCases {
2331 t.Run(tc.desc, func(t *testing.T) {
2332 podWorkers, _, _ := createPodWorkers()
2333 if tc.podSyncStatuses != nil {
2334 podWorkers.podSyncStatuses = tc.podSyncStatuses
2335 }
2336 if tc.startedStaticPodsByFullname != nil {
2337 podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
2338 }
2339 if tc.waitingToStartStaticPodsByFullname != nil {
2340 podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
2341 }
2342 allowed, allowedEver := podWorkers.allowPodStart(tc.pod)
2343 if allowed != tc.allowed {
2344 if tc.allowed {
2345 t.Errorf("Pod should be allowed")
2346 } else {
2347 t.Errorf("Pod should not be allowed")
2348 }
2349 }
2350
2351 if allowedEver != tc.allowedEver {
2352 if tc.allowedEver {
2353 t.Errorf("Pod should be allowed ever")
2354 } else {
2355 t.Errorf("Pod should not be allowed ever")
2356 }
2357 }
2358
2359
2360 if len(podWorkers.startedStaticPodsByFullname) != 0 ||
2361 len(podWorkers.startedStaticPodsByFullname) != len(tc.expectedStartedStaticPodsByFullname) {
2362 if !reflect.DeepEqual(
2363 podWorkers.startedStaticPodsByFullname,
2364 tc.expectedStartedStaticPodsByFullname) {
2365 t.Errorf("startedStaticPodsByFullname: expected %v, got %v",
2366 tc.expectedStartedStaticPodsByFullname,
2367 podWorkers.startedStaticPodsByFullname)
2368 }
2369 }
2370
2371
2372 if len(podWorkers.waitingToStartStaticPodsByFullname) != 0 ||
2373 len(podWorkers.waitingToStartStaticPodsByFullname) != len(tc.expectedWaitingToStartStaticPodsByFullname) {
2374 if !reflect.DeepEqual(
2375 podWorkers.waitingToStartStaticPodsByFullname,
2376 tc.expectedWaitingToStartStaticPodsByFullname) {
2377 t.Errorf("waitingToStartStaticPodsByFullname: expected %v, got %v",
2378 tc.expectedWaitingToStartStaticPodsByFullname,
2379 podWorkers.waitingToStartStaticPodsByFullname)
2380 }
2381 }
2382 })
2383 }
2384 }
2385
View as plain text