...

Source file src/k8s.io/kubernetes/pkg/kubelet/pod_workers_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"context"
    21  	"reflect"
    22  	"strconv"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/status"
    30  	v1 "k8s.io/api/core/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	"k8s.io/client-go/tools/record"
    35  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    36  	containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
    37  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    38  	"k8s.io/kubernetes/pkg/kubelet/util/queue"
    39  	"k8s.io/utils/clock"
    40  	clocktesting "k8s.io/utils/clock/testing"
    41  )
    42  
    43  // fakePodWorkers runs sync pod function in serial, so we can have
    44  // deterministic behaviour in testing.
    45  type fakePodWorkers struct {
    46  	lock      sync.Mutex
    47  	syncPodFn syncPodFnType
    48  	cache     kubecontainer.Cache
    49  	t         TestingInterface
    50  
    51  	triggeredDeletion []types.UID
    52  	triggeredTerminal []types.UID
    53  
    54  	statusLock            sync.Mutex
    55  	running               map[types.UID]bool
    56  	terminating           map[types.UID]bool
    57  	terminated            map[types.UID]bool
    58  	terminationRequested  map[types.UID]bool
    59  	finished              map[types.UID]bool
    60  	removeRuntime         map[types.UID]bool
    61  	removeContent         map[types.UID]bool
    62  	terminatingStaticPods map[string]bool
    63  }
    64  
    65  func (f *fakePodWorkers) UpdatePod(options UpdatePodOptions) {
    66  	f.lock.Lock()
    67  	defer f.lock.Unlock()
    68  	var uid types.UID
    69  	switch {
    70  	case options.Pod != nil:
    71  		uid = options.Pod.UID
    72  	case options.RunningPod != nil:
    73  		uid = options.RunningPod.ID
    74  	default:
    75  		return
    76  	}
    77  	status, err := f.cache.Get(uid)
    78  	if err != nil {
    79  		f.t.Errorf("Unexpected error: %v", err)
    80  	}
    81  	switch options.UpdateType {
    82  	case kubetypes.SyncPodKill:
    83  		f.triggeredDeletion = append(f.triggeredDeletion, uid)
    84  	default:
    85  		isTerminal, err := f.syncPodFn(context.Background(), options.UpdateType, options.Pod, options.MirrorPod, status)
    86  		if err != nil {
    87  			f.t.Errorf("Unexpected error: %v", err)
    88  		}
    89  		if isTerminal {
    90  			f.triggeredTerminal = append(f.triggeredTerminal, uid)
    91  		}
    92  	}
    93  }
    94  
    95  func (f *fakePodWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorkerSync {
    96  	return map[types.UID]PodWorkerSync{}
    97  }
    98  
    99  func (f *fakePodWorkers) IsPodKnownTerminated(uid types.UID) bool {
   100  	f.statusLock.Lock()
   101  	defer f.statusLock.Unlock()
   102  	return f.terminated[uid]
   103  }
   104  func (f *fakePodWorkers) CouldHaveRunningContainers(uid types.UID) bool {
   105  	f.statusLock.Lock()
   106  	defer f.statusLock.Unlock()
   107  	return f.running[uid]
   108  }
   109  func (f *fakePodWorkers) ShouldPodBeFinished(uid types.UID) bool {
   110  	f.statusLock.Lock()
   111  	defer f.statusLock.Unlock()
   112  	return f.finished[uid]
   113  }
   114  func (f *fakePodWorkers) IsPodTerminationRequested(uid types.UID) bool {
   115  	f.statusLock.Lock()
   116  	defer f.statusLock.Unlock()
   117  	return f.terminationRequested[uid]
   118  }
   119  func (f *fakePodWorkers) ShouldPodContainersBeTerminating(uid types.UID) bool {
   120  	f.statusLock.Lock()
   121  	defer f.statusLock.Unlock()
   122  	return f.terminating[uid]
   123  }
   124  func (f *fakePodWorkers) ShouldPodRuntimeBeRemoved(uid types.UID) bool {
   125  	f.statusLock.Lock()
   126  	defer f.statusLock.Unlock()
   127  	return f.removeRuntime[uid]
   128  }
   129  func (f *fakePodWorkers) setPodRuntimeBeRemoved(uid types.UID) {
   130  	f.statusLock.Lock()
   131  	defer f.statusLock.Unlock()
   132  	f.removeRuntime = map[types.UID]bool{uid: true}
   133  }
   134  func (f *fakePodWorkers) ShouldPodContentBeRemoved(uid types.UID) bool {
   135  	f.statusLock.Lock()
   136  	defer f.statusLock.Unlock()
   137  	return f.removeContent[uid]
   138  }
   139  func (f *fakePodWorkers) IsPodForMirrorPodTerminatingByFullName(podFullname string) bool {
   140  	f.statusLock.Lock()
   141  	defer f.statusLock.Unlock()
   142  	return f.terminatingStaticPods[podFullname]
   143  }
   144  
   145  type TestingInterface interface {
   146  	Errorf(format string, args ...interface{})
   147  }
   148  
   149  func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod {
   150  	pod := newNamedPod(uid, "ns", name, false)
   151  	pod.Status.Phase = phase
   152  	return pod
   153  }
   154  
   155  func newStaticPod(uid, name string) *v1.Pod {
   156  	thirty := int64(30)
   157  	return &v1.Pod{
   158  		ObjectMeta: metav1.ObjectMeta{
   159  			UID:  types.UID(uid),
   160  			Name: name,
   161  			Annotations: map[string]string{
   162  				kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
   163  			},
   164  		},
   165  		Spec: v1.PodSpec{
   166  			TerminationGracePeriodSeconds: &thirty,
   167  		},
   168  	}
   169  }
   170  
   171  func newNamedPod(uid, namespace, name string, isStatic bool) *v1.Pod {
   172  	thirty := int64(30)
   173  	pod := &v1.Pod{
   174  		ObjectMeta: metav1.ObjectMeta{
   175  			UID:       types.UID(uid),
   176  			Namespace: namespace,
   177  			Name:      name,
   178  		},
   179  		Spec: v1.PodSpec{
   180  			TerminationGracePeriodSeconds: &thirty,
   181  		},
   182  	}
   183  	if isStatic {
   184  		pod.Annotations = map[string]string{
   185  			kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
   186  		}
   187  	}
   188  	return pod
   189  }
   190  
   191  // syncPodRecord is a record of a sync pod call
   192  type syncPodRecord struct {
   193  	name        string
   194  	updateType  kubetypes.SyncPodType
   195  	runningPod  *kubecontainer.Pod
   196  	terminated  bool
   197  	gracePeriod *int64
   198  }
   199  
   200  type FakeQueueItem struct {
   201  	UID   types.UID
   202  	Delay time.Duration
   203  }
   204  
   205  type fakeQueue struct {
   206  	lock         sync.Mutex
   207  	queue        []FakeQueueItem
   208  	currentStart int
   209  }
   210  
   211  func (q *fakeQueue) Empty() bool {
   212  	q.lock.Lock()
   213  	defer q.lock.Unlock()
   214  	return (len(q.queue) - q.currentStart) == 0
   215  }
   216  
   217  func (q *fakeQueue) Items() []FakeQueueItem {
   218  	q.lock.Lock()
   219  	defer q.lock.Unlock()
   220  	return append(make([]FakeQueueItem, 0, len(q.queue)), q.queue...)
   221  }
   222  
   223  func (q *fakeQueue) Set() sets.String {
   224  	q.lock.Lock()
   225  	defer q.lock.Unlock()
   226  	work := sets.NewString()
   227  	for _, item := range q.queue[q.currentStart:] {
   228  		work.Insert(string(item.UID))
   229  	}
   230  	return work
   231  }
   232  
   233  func (q *fakeQueue) Enqueue(uid types.UID, delay time.Duration) {
   234  	q.lock.Lock()
   235  	defer q.lock.Unlock()
   236  	q.queue = append(q.queue, FakeQueueItem{UID: uid, Delay: delay})
   237  }
   238  
   239  func (q *fakeQueue) GetWork() []types.UID {
   240  	q.lock.Lock()
   241  	defer q.lock.Unlock()
   242  	work := make([]types.UID, 0, len(q.queue)-q.currentStart)
   243  	for _, item := range q.queue[q.currentStart:] {
   244  		work = append(work, item.UID)
   245  	}
   246  	q.currentStart = len(q.queue)
   247  	return work
   248  }
   249  
   250  type timeIncrementingWorkers struct {
   251  	lock    sync.Mutex
   252  	w       *podWorkers
   253  	runtime *containertest.FakeRuntime
   254  	holds   map[types.UID]chan struct{}
   255  }
   256  
   257  // UpdatePod increments the clock after UpdatePod is called, but before the workers
   258  // are invoked, and then drains all workers before returning. The provided functions
   259  // are invoked while holding the lock to prevent workers from receiving updates.
   260  func (w *timeIncrementingWorkers) UpdatePod(options UpdatePodOptions, afterFns ...func()) {
   261  	func() {
   262  		w.lock.Lock()
   263  		defer w.lock.Unlock()
   264  		w.w.UpdatePod(options)
   265  		w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
   266  		for _, fn := range afterFns {
   267  			fn()
   268  		}
   269  	}()
   270  	w.drainUnpausedWorkers()
   271  }
   272  
   273  // SyncKnownPods increments the clock after SyncKnownPods is called, but before the workers
   274  // are invoked, and then drains all workers before returning.
   275  func (w *timeIncrementingWorkers) SyncKnownPods(desiredPods []*v1.Pod) (knownPods map[types.UID]PodWorkerSync) {
   276  	func() {
   277  		w.lock.Lock()
   278  		defer w.lock.Unlock()
   279  		knownPods = w.w.SyncKnownPods(desiredPods)
   280  		w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
   281  	}()
   282  	w.drainUnpausedWorkers()
   283  	return
   284  }
   285  
   286  func (w *timeIncrementingWorkers) PauseWorkers(uids ...types.UID) {
   287  	w.lock.Lock()
   288  	defer w.lock.Unlock()
   289  	if w.holds == nil {
   290  		w.holds = make(map[types.UID]chan struct{})
   291  	}
   292  	for _, uid := range uids {
   293  		if _, ok := w.holds[uid]; !ok {
   294  			w.holds[uid] = make(chan struct{})
   295  		}
   296  	}
   297  }
   298  
   299  func (w *timeIncrementingWorkers) ReleaseWorkers(uids ...types.UID) {
   300  	w.lock.Lock()
   301  	defer w.lock.Unlock()
   302  	w.ReleaseWorkersUnderLock(uids...)
   303  }
   304  
   305  func (w *timeIncrementingWorkers) ReleaseWorkersUnderLock(uids ...types.UID) {
   306  	for _, uid := range uids {
   307  		if ch, ok := w.holds[uid]; ok {
   308  			close(ch)
   309  			delete(w.holds, uid)
   310  		}
   311  	}
   312  }
   313  
   314  func (w *timeIncrementingWorkers) waitForPod(uid types.UID) {
   315  	w.lock.Lock()
   316  	ch, ok := w.holds[uid]
   317  	w.lock.Unlock()
   318  	if !ok {
   319  		return
   320  	}
   321  	<-ch
   322  }
   323  
   324  func (w *timeIncrementingWorkers) drainUnpausedWorkers() {
   325  	pausedWorkers := make(map[types.UID]struct{})
   326  	for {
   327  		for uid := range pausedWorkers {
   328  			delete(pausedWorkers, uid)
   329  		}
   330  		stillWorking := false
   331  
   332  		// ignore held workers
   333  		w.lock.Lock()
   334  		for uid := range w.holds {
   335  			pausedWorkers[uid] = struct{}{}
   336  		}
   337  		w.lock.Unlock()
   338  
   339  		// check for at least one still working non-paused worker
   340  		w.w.podLock.Lock()
   341  		for uid, worker := range w.w.podSyncStatuses {
   342  			if _, ok := pausedWorkers[uid]; ok {
   343  				continue
   344  			}
   345  			if worker.working {
   346  				stillWorking = true
   347  				break
   348  			}
   349  		}
   350  		w.w.podLock.Unlock()
   351  
   352  		if !stillWorking {
   353  			break
   354  		}
   355  		time.Sleep(time.Millisecond)
   356  	}
   357  }
   358  
   359  func (w *timeIncrementingWorkers) tick() {
   360  	w.lock.Lock()
   361  	defer w.lock.Unlock()
   362  	w.w.clock.(*clocktesting.FakePassiveClock).SetTime(w.w.clock.Now().Add(time.Second))
   363  }
   364  
   365  // createTimeIncrementingPodWorkers will guarantee that each call to UpdatePod and each worker goroutine invocation advances the clock by one second,
   366  // although multiple workers will advance the clock in an unpredictable order. Use to observe
   367  // successive internal updates to each update pod state when only a single pod is being updated.
   368  func createTimeIncrementingPodWorkers() (*timeIncrementingWorkers, map[types.UID][]syncPodRecord) {
   369  	nested, runtime, processed := createPodWorkers()
   370  	w := &timeIncrementingWorkers{
   371  		w:       nested,
   372  		runtime: runtime,
   373  	}
   374  	nested.workerChannelFn = func(uid types.UID, in chan struct{}) <-chan struct{} {
   375  		ch := make(chan struct{})
   376  		go func() {
   377  			defer close(ch)
   378  			// TODO: this is an eager loop, we might want to lazily read from in only once
   379  			// ch is empty
   380  			for range in {
   381  				w.waitForPod(uid)
   382  				w.tick()
   383  				ch <- struct{}{}
   384  			}
   385  		}()
   386  		return ch
   387  	}
   388  	return w, processed
   389  }
   390  
   391  func createPodWorkers() (*podWorkers, *containertest.FakeRuntime, map[types.UID][]syncPodRecord) {
   392  	lock := sync.Mutex{}
   393  	processed := make(map[types.UID][]syncPodRecord)
   394  	fakeRecorder := &record.FakeRecorder{}
   395  	fakeRuntime := &containertest.FakeRuntime{}
   396  	fakeCache := containertest.NewFakeCache(fakeRuntime)
   397  	fakeQueue := &fakeQueue{}
   398  	clock := clocktesting.NewFakePassiveClock(time.Unix(1, 0))
   399  	w := newPodWorkers(
   400  		&podSyncerFuncs{
   401  			syncPod: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
   402  				func() {
   403  					lock.Lock()
   404  					defer lock.Unlock()
   405  					pod := pod
   406  					processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
   407  						name:       pod.Name,
   408  						updateType: updateType,
   409  					})
   410  				}()
   411  				return false, nil
   412  			},
   413  			syncTerminatingPod: func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
   414  				func() {
   415  					lock.Lock()
   416  					defer lock.Unlock()
   417  					processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
   418  						name:        pod.Name,
   419  						updateType:  kubetypes.SyncPodKill,
   420  						gracePeriod: gracePeriod,
   421  					})
   422  				}()
   423  				return nil
   424  			},
   425  			syncTerminatingRuntimePod: func(ctx context.Context, runningPod *kubecontainer.Pod) error {
   426  				func() {
   427  					lock.Lock()
   428  					defer lock.Unlock()
   429  					processed[runningPod.ID] = append(processed[runningPod.ID], syncPodRecord{
   430  						name:       runningPod.Name,
   431  						updateType: kubetypes.SyncPodKill,
   432  						runningPod: runningPod,
   433  					})
   434  				}()
   435  				return nil
   436  			},
   437  			syncTerminatedPod: func(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
   438  				func() {
   439  					lock.Lock()
   440  					defer lock.Unlock()
   441  					processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
   442  						name:       pod.Name,
   443  						terminated: true,
   444  					})
   445  				}()
   446  				return nil
   447  			},
   448  		},
   449  		fakeRecorder,
   450  		fakeQueue,
   451  		time.Second,
   452  		time.Millisecond,
   453  		fakeCache,
   454  	)
   455  	workers := w.(*podWorkers)
   456  	workers.clock = clock
   457  	return workers, fakeRuntime, processed
   458  }
   459  
   460  func drainWorkers(podWorkers *podWorkers, numPods int) {
   461  	for {
   462  		stillWorking := false
   463  		podWorkers.podLock.Lock()
   464  		for i := 0; i < numPods; i++ {
   465  			if s, ok := podWorkers.podSyncStatuses[types.UID(strconv.Itoa(i))]; ok && s.working {
   466  				stillWorking = true
   467  				break
   468  			}
   469  		}
   470  		podWorkers.podLock.Unlock()
   471  		if !stillWorking {
   472  			break
   473  		}
   474  		time.Sleep(50 * time.Millisecond)
   475  	}
   476  }
   477  
   478  func drainWorkersExcept(podWorkers *podWorkers, uids ...types.UID) {
   479  	set := sets.NewString()
   480  	for _, uid := range uids {
   481  		set.Insert(string(uid))
   482  	}
   483  	for {
   484  		stillWorking := false
   485  		podWorkers.podLock.Lock()
   486  		for k, v := range podWorkers.podSyncStatuses {
   487  			if set.Has(string(k)) {
   488  				continue
   489  			}
   490  			if v.working {
   491  				stillWorking = true
   492  				break
   493  			}
   494  		}
   495  		podWorkers.podLock.Unlock()
   496  		if !stillWorking {
   497  			break
   498  		}
   499  		time.Sleep(50 * time.Millisecond)
   500  	}
   501  }
   502  
   503  func drainAllWorkers(podWorkers *podWorkers) {
   504  	for {
   505  		stillWorking := false
   506  		podWorkers.podLock.Lock()
   507  		for _, worker := range podWorkers.podSyncStatuses {
   508  			if worker.working {
   509  				stillWorking = true
   510  				break
   511  			}
   512  		}
   513  		podWorkers.podLock.Unlock()
   514  		if !stillWorking {
   515  			break
   516  		}
   517  		time.Sleep(50 * time.Millisecond)
   518  	}
   519  }
   520  
   521  func TestUpdatePodParallel(t *testing.T) {
   522  	podWorkers, _, processed := createPodWorkers()
   523  
   524  	numPods := 20
   525  	for i := 0; i < numPods; i++ {
   526  		for j := i; j < numPods; j++ {
   527  			podWorkers.UpdatePod(UpdatePodOptions{
   528  				Pod:        newNamedPod(strconv.Itoa(j), "ns", strconv.Itoa(i), false),
   529  				UpdateType: kubetypes.SyncPodCreate,
   530  			})
   531  		}
   532  	}
   533  	drainWorkers(podWorkers, numPods)
   534  
   535  	if len(processed) != numPods {
   536  		t.Fatalf("Not all pods processed: %v", len(processed))
   537  	}
   538  	for i := 0; i < numPods; i++ {
   539  		uid := types.UID(strconv.Itoa(i))
   540  		events := processed[uid]
   541  		if len(events) < 1 || len(events) > i+1 {
   542  			t.Errorf("Pod %v processed %v times", i, len(events))
   543  			continue
   544  		}
   545  
   546  		// PodWorker guarantees the last event will be processed
   547  		last := len(events) - 1
   548  		if events[last].name != strconv.Itoa(i) {
   549  			t.Errorf("Pod %v: incorrect order %v, %#v", i, last, events)
   550  		}
   551  	}
   552  }
   553  
   554  func TestUpdatePod(t *testing.T) {
   555  	one := int64(1)
   556  	hasContext := func(status *podSyncStatus) *podSyncStatus {
   557  		status.ctx, status.cancelFn = context.Background(), func() {}
   558  		return status
   559  	}
   560  	withLabel := func(pod *v1.Pod, label, value string) *v1.Pod {
   561  		if pod.Labels == nil {
   562  			pod.Labels = make(map[string]string)
   563  		}
   564  		pod.Labels[label] = value
   565  		return pod
   566  	}
   567  	withDeletionTimestamp := func(pod *v1.Pod, ts time.Time, gracePeriod *int64) *v1.Pod {
   568  		pod.DeletionTimestamp = &metav1.Time{Time: ts}
   569  		pod.DeletionGracePeriodSeconds = gracePeriod
   570  		return pod
   571  	}
   572  	intp := func(i int64) *int64 {
   573  		return &i
   574  	}
   575  	expectPodSyncStatus := func(t *testing.T, expected, status *podSyncStatus) {
   576  		t.Helper()
   577  		// handle special non-comparable fields
   578  		if status != nil {
   579  			if e, a := expected.ctx != nil, status.ctx != nil; e != a {
   580  				t.Errorf("expected context %t, has context %t", e, a)
   581  			} else {
   582  				expected.ctx, status.ctx = nil, nil
   583  			}
   584  			if e, a := expected.cancelFn != nil, status.cancelFn != nil; e != a {
   585  				t.Errorf("expected cancelFn %t, has cancelFn %t", e, a)
   586  			} else {
   587  				expected.cancelFn, status.cancelFn = nil, nil
   588  			}
   589  		}
   590  		if e, a := expected, status; !reflect.DeepEqual(e, a) {
   591  			t.Fatalf("unexpected status: %s", cmp.Diff(e, a, cmp.AllowUnexported(podSyncStatus{})))
   592  		}
   593  	}
   594  	for _, tc := range []struct {
   595  		name          string
   596  		update        UpdatePodOptions
   597  		runtimeStatus *kubecontainer.PodStatus
   598  		prepare       func(t *testing.T, w *timeIncrementingWorkers) (afterUpdateFn func())
   599  
   600  		expect                *podSyncStatus
   601  		expectBeforeWorker    *podSyncStatus
   602  		expectKnownTerminated bool
   603  	}{
   604  		{
   605  			name: "a new pod is recorded and started",
   606  			update: UpdatePodOptions{
   607  				UpdateType: kubetypes.SyncPodCreate,
   608  				Pod:        newNamedPod("1", "ns", "running-pod", false),
   609  			},
   610  			expect: hasContext(&podSyncStatus{
   611  				fullname:  "running-pod_ns",
   612  				syncedAt:  time.Unix(1, 0),
   613  				startedAt: time.Unix(3, 0),
   614  				activeUpdate: &UpdatePodOptions{
   615  					Pod: newNamedPod("1", "ns", "running-pod", false),
   616  				},
   617  			}),
   618  		},
   619  		{
   620  			name: "a new pod is recorded and started unless it is a duplicate of an existing terminating pod UID",
   621  			update: UpdatePodOptions{
   622  				UpdateType: kubetypes.SyncPodCreate,
   623  				Pod:        withLabel(newNamedPod("1", "ns", "running-pod", false), "updated", "value"),
   624  			},
   625  			prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
   626  				w.UpdatePod(UpdatePodOptions{
   627  					UpdateType: kubetypes.SyncPodCreate,
   628  					Pod:        newNamedPod("1", "ns", "running-pod", false),
   629  				})
   630  				w.PauseWorkers("1")
   631  				w.UpdatePod(UpdatePodOptions{
   632  					UpdateType: kubetypes.SyncPodKill,
   633  					Pod:        newNamedPod("1", "ns", "running-pod", false),
   634  				})
   635  				return func() { w.ReleaseWorkersUnderLock("1") }
   636  			},
   637  			expect: hasContext(&podSyncStatus{
   638  				fullname:           "running-pod_ns",
   639  				syncedAt:           time.Unix(1, 0),
   640  				startedAt:          time.Unix(3, 0),
   641  				terminatingAt:      time.Unix(3, 0),
   642  				terminatedAt:       time.Unix(6, 0),
   643  				gracePeriod:        30,
   644  				startedTerminating: true,
   645  				restartRequested:   true, // because we received a create during termination
   646  				finished:           true,
   647  				activeUpdate: &UpdatePodOptions{
   648  					Pod:            newNamedPod("1", "ns", "running-pod", false),
   649  					KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: intp(30)},
   650  				},
   651  			}),
   652  			expectKnownTerminated: true,
   653  		},
   654  		{
   655  			name: "a new pod is recorded and started and running pod is ignored",
   656  			update: UpdatePodOptions{
   657  				UpdateType: kubetypes.SyncPodCreate,
   658  				Pod:        newNamedPod("1", "ns", "running-pod", false),
   659  				RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
   660  			},
   661  			expect: hasContext(&podSyncStatus{
   662  				fullname:  "running-pod_ns",
   663  				syncedAt:  time.Unix(1, 0),
   664  				startedAt: time.Unix(3, 0),
   665  				activeUpdate: &UpdatePodOptions{
   666  					Pod: newNamedPod("1", "ns", "running-pod", false),
   667  				},
   668  			}),
   669  		},
   670  		{
   671  			name: "a running pod is terminated when an update contains a deletionTimestamp",
   672  			update: UpdatePodOptions{
   673  				UpdateType: kubetypes.SyncPodUpdate,
   674  				Pod:        withDeletionTimestamp(newNamedPod("1", "ns", "running-pod", false), time.Unix(1, 0), intp(15)),
   675  			},
   676  			prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
   677  				w.UpdatePod(UpdatePodOptions{
   678  					UpdateType: kubetypes.SyncPodCreate,
   679  					Pod:        newNamedPod("1", "ns", "running-pod", false),
   680  				})
   681  				return nil
   682  			},
   683  			expect: hasContext(&podSyncStatus{
   684  				fullname:           "running-pod_ns",
   685  				syncedAt:           time.Unix(1, 0),
   686  				startedAt:          time.Unix(3, 0),
   687  				terminatingAt:      time.Unix(3, 0),
   688  				terminatedAt:       time.Unix(5, 0),
   689  				gracePeriod:        15,
   690  				startedTerminating: true,
   691  				finished:           true,
   692  				deleted:            true,
   693  				activeUpdate: &UpdatePodOptions{
   694  					Pod:            withDeletionTimestamp(newNamedPod("1", "ns", "running-pod", false), time.Unix(1, 0), intp(15)),
   695  					KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: intp(15)},
   696  				},
   697  			}),
   698  			expectKnownTerminated: true,
   699  		},
   700  		{
   701  			name: "a running pod is terminated when an eviction is requested",
   702  			update: UpdatePodOptions{
   703  				UpdateType:     kubetypes.SyncPodKill,
   704  				Pod:            newNamedPod("1", "ns", "running-pod", false),
   705  				KillPodOptions: &KillPodOptions{Evict: true},
   706  			},
   707  			prepare: func(t *testing.T, w *timeIncrementingWorkers) func() {
   708  				w.UpdatePod(UpdatePodOptions{
   709  					UpdateType: kubetypes.SyncPodCreate,
   710  					Pod:        newNamedPod("1", "ns", "running-pod", false),
   711  				})
   712  				return nil
   713  			},
   714  			expect: hasContext(&podSyncStatus{
   715  				fullname:           "running-pod_ns",
   716  				syncedAt:           time.Unix(1, 0),
   717  				startedAt:          time.Unix(3, 0),
   718  				terminatingAt:      time.Unix(3, 0),
   719  				terminatedAt:       time.Unix(5, 0),
   720  				gracePeriod:        30,
   721  				startedTerminating: true,
   722  				finished:           true,
   723  				evicted:            true,
   724  				activeUpdate: &UpdatePodOptions{
   725  					Pod: newNamedPod("1", "ns", "running-pod", false),
   726  					KillPodOptions: &KillPodOptions{
   727  						PodTerminationGracePeriodSecondsOverride: intp(30),
   728  						Evict:                                    true,
   729  					},
   730  				},
   731  			}),
   732  			expectKnownTerminated: true,
   733  		},
   734  		{
   735  			name: "a pod that is terminal and has never started must be terminated if the runtime does not have a cached terminal state",
   736  			update: UpdatePodOptions{
   737  				UpdateType: kubetypes.SyncPodCreate,
   738  				Pod:        newPodWithPhase("1", "done-pod", v1.PodSucceeded),
   739  			},
   740  			expect: hasContext(&podSyncStatus{
   741  				fullname:      "done-pod_ns",
   742  				syncedAt:      time.Unix(1, 0),
   743  				terminatingAt: time.Unix(1, 0),
   744  				startedAt:     time.Unix(3, 0),
   745  				terminatedAt:  time.Unix(3, 0),
   746  				activeUpdate: &UpdatePodOptions{
   747  					Pod:            newPodWithPhase("1", "done-pod", v1.PodSucceeded),
   748  					KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: intp(30)},
   749  				},
   750  				gracePeriod:        30,
   751  				startedTerminating: true,
   752  				finished:           true,
   753  			}),
   754  			expectKnownTerminated: true,
   755  		},
   756  		{
   757  			name: "a pod that is terminal and has never started advances to finished if the runtime has a cached terminal state",
   758  			update: UpdatePodOptions{
   759  				UpdateType: kubetypes.SyncPodCreate,
   760  				Pod:        newPodWithPhase("1", "done-pod", v1.PodSucceeded),
   761  			},
   762  			runtimeStatus: &kubecontainer.PodStatus{ /* we know about this pod */ },
   763  			expectBeforeWorker: &podSyncStatus{
   764  				fullname:      "done-pod_ns",
   765  				syncedAt:      time.Unix(1, 0),
   766  				terminatingAt: time.Unix(1, 0),
   767  				terminatedAt:  time.Unix(1, 0),
   768  				pendingUpdate: &UpdatePodOptions{
   769  					UpdateType: kubetypes.SyncPodCreate,
   770  					Pod:        newPodWithPhase("1", "done-pod", v1.PodSucceeded),
   771  				},
   772  				finished:           false, // Should be marked as not finished initially (to ensure `SyncTerminatedPod` will run) and status will progress to terminated.
   773  				startedTerminating: true,
   774  				working:            true,
   775  			},
   776  			expect: hasContext(&podSyncStatus{
   777  				fullname:           "done-pod_ns",
   778  				syncedAt:           time.Unix(1, 0),
   779  				terminatingAt:      time.Unix(1, 0),
   780  				terminatedAt:       time.Unix(1, 0),
   781  				startedAt:          time.Unix(3, 0),
   782  				startedTerminating: true,
   783  				finished:           true,
   784  				activeUpdate: &UpdatePodOptions{
   785  					UpdateType: kubetypes.SyncPodSync,
   786  					Pod:        newPodWithPhase("1", "done-pod", v1.PodSucceeded),
   787  				},
   788  
   789  				// if we have never seen the pod before, a restart makes no sense
   790  				restartRequested: false,
   791  			}),
   792  			expectKnownTerminated: true,
   793  		},
   794  		{
   795  			name: "an orphaned running pod we have not seen is marked terminating and advances to finished and then is removed",
   796  			update: UpdatePodOptions{
   797  				UpdateType: kubetypes.SyncPodKill,
   798  				RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
   799  			},
   800  			expectBeforeWorker: &podSyncStatus{
   801  				fullname:      "orphaned-pod_ns",
   802  				syncedAt:      time.Unix(1, 0),
   803  				terminatingAt: time.Unix(1, 0),
   804  				pendingUpdate: &UpdatePodOptions{
   805  					UpdateType:     kubetypes.SyncPodKill,
   806  					RunningPod:     &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
   807  					KillPodOptions: &KillPodOptions{PodTerminationGracePeriodSecondsOverride: &one},
   808  				},
   809  				gracePeriod:     1,
   810  				deleted:         true,
   811  				observedRuntime: true,
   812  				working:         true,
   813  			},
   814  			// Once a running pod is fully terminated, we stop tracking it in history, and so it
   815  			// is deliberately expected not to be known outside the pod worker since the source of
   816  			// the pod is also not in the desired pod set.
   817  			expectKnownTerminated: false,
   818  		},
   819  		{
   820  			name: "an orphaned running pod with a non-kill update type does nothing",
   821  			update: UpdatePodOptions{
   822  				UpdateType: kubetypes.SyncPodCreate,
   823  				RunningPod: &kubecontainer.Pod{ID: "1", Name: "orphaned-pod", Namespace: "ns"},
   824  			},
   825  			expect: nil,
   826  		},
   827  	} {
   828  		t.Run(tc.name, func(t *testing.T) {
   829  			var uid types.UID
   830  			switch {
   831  			case tc.update.Pod != nil:
   832  				uid = tc.update.Pod.UID
   833  			case tc.update.RunningPod != nil:
   834  				uid = tc.update.RunningPod.ID
   835  			default:
   836  				t.Fatalf("unable to find uid for update")
   837  			}
   838  
   839  			var fns []func()
   840  
   841  			podWorkers, _ := createTimeIncrementingPodWorkers()
   842  
   843  			if tc.expectBeforeWorker != nil {
   844  				fns = append(fns, func() {
   845  					expectPodSyncStatus(t, tc.expectBeforeWorker, podWorkers.w.podSyncStatuses[uid])
   846  				})
   847  			}
   848  
   849  			if tc.prepare != nil {
   850  				if fn := tc.prepare(t, podWorkers); fn != nil {
   851  					fns = append(fns, fn)
   852  				}
   853  			}
   854  
   855  			// set up an initial pod status for the UpdatePod invocation which is
   856  			// reset before workers call the podCache
   857  			if tc.runtimeStatus != nil {
   858  				podWorkers.runtime.PodStatus = *tc.runtimeStatus
   859  				podWorkers.runtime.Err = nil
   860  			} else {
   861  				podWorkers.runtime.PodStatus = kubecontainer.PodStatus{}
   862  				podWorkers.runtime.Err = status.Error(codes.NotFound, "No such pod")
   863  			}
   864  			fns = append(fns, func() {
   865  				podWorkers.runtime.PodStatus = kubecontainer.PodStatus{}
   866  				podWorkers.runtime.Err = nil
   867  			})
   868  
   869  			podWorkers.UpdatePod(tc.update, fns...)
   870  
   871  			if podWorkers.w.IsPodKnownTerminated(uid) != tc.expectKnownTerminated {
   872  				t.Errorf("podWorker.IsPodKnownTerminated expected to be %t", tc.expectKnownTerminated)
   873  			}
   874  
   875  			expectPodSyncStatus(t, tc.expect, podWorkers.w.podSyncStatuses[uid])
   876  
   877  			// TODO: validate processed records for the pod based on the test case, which reduces
   878  			// the amount of testing we need to do in kubelet_pods_test.go
   879  		})
   880  	}
   881  }
   882  
   883  func TestUpdatePodForRuntimePod(t *testing.T) {
   884  	podWorkers, _, processed := createPodWorkers()
   885  
   886  	// ignores running pod of wrong sync type
   887  	podWorkers.UpdatePod(UpdatePodOptions{
   888  		UpdateType: kubetypes.SyncPodCreate,
   889  		RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
   890  	})
   891  	drainAllWorkers(podWorkers)
   892  	if len(processed) != 0 {
   893  		t.Fatalf("Not all pods processed: %v", len(processed))
   894  	}
   895  
   896  	// creates synthetic pod
   897  	podWorkers.UpdatePod(UpdatePodOptions{
   898  		UpdateType: kubetypes.SyncPodKill,
   899  		RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
   900  	})
   901  	drainAllWorkers(podWorkers)
   902  	if len(processed) != 1 {
   903  		t.Fatalf("Not all pods processed: %v", processed)
   904  	}
   905  	updates := processed["1"]
   906  	if len(updates) != 1 {
   907  		t.Fatalf("unexpected updates: %v", updates)
   908  	}
   909  	if updates[0].runningPod == nil || updates[0].updateType != kubetypes.SyncPodKill || updates[0].name != "1" {
   910  		t.Fatalf("unexpected update: %v", updates)
   911  	}
   912  }
   913  
   914  func TestUpdatePodForTerminatedRuntimePod(t *testing.T) {
   915  	podWorkers, _, processed := createPodWorkers()
   916  
   917  	now := time.Now()
   918  	podWorkers.podSyncStatuses[types.UID("1")] = &podSyncStatus{
   919  		startedTerminating: true,
   920  		terminatedAt:       now.Add(-time.Second),
   921  		terminatingAt:      now.Add(-2 * time.Second),
   922  		gracePeriod:        1,
   923  	}
   924  
   925  	// creates synthetic pod
   926  	podWorkers.UpdatePod(UpdatePodOptions{
   927  		UpdateType: kubetypes.SyncPodKill,
   928  		RunningPod: &kubecontainer.Pod{ID: "1", Name: "1", Namespace: "test"},
   929  	})
   930  	drainAllWorkers(podWorkers)
   931  	if len(processed) != 0 {
   932  		t.Fatalf("Not all pods processed: %v", processed)
   933  	}
   934  	updates := processed["1"]
   935  	if len(updates) != 0 {
   936  		t.Fatalf("unexpected updates: %v", updates)
   937  	}
   938  }
   939  
   940  func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
   941  	podWorkers, _, processed := createPodWorkers()
   942  	numPods := 20
   943  	for i := 0; i < numPods; i++ {
   944  		pod := newNamedPod(strconv.Itoa(i), "ns", strconv.Itoa(i), false)
   945  		podWorkers.UpdatePod(UpdatePodOptions{
   946  			Pod:        pod,
   947  			UpdateType: kubetypes.SyncPodCreate,
   948  		})
   949  		podWorkers.UpdatePod(UpdatePodOptions{
   950  			Pod:        pod,
   951  			UpdateType: kubetypes.SyncPodKill,
   952  		})
   953  		podWorkers.UpdatePod(UpdatePodOptions{
   954  			Pod:        pod,
   955  			UpdateType: kubetypes.SyncPodUpdate,
   956  		})
   957  	}
   958  	drainWorkers(podWorkers, numPods)
   959  	if len(processed) != numPods {
   960  		t.Errorf("Not all pods processed: %v", len(processed))
   961  		return
   962  	}
   963  	for i := 0; i < numPods; i++ {
   964  		uid := types.UID(strconv.Itoa(i))
   965  		// each pod should be processed two or three times (kill,terminate or create,kill,terminate) because
   966  		// we buffer pending updates and the pod worker may compress the create and kill
   967  		syncPodRecords := processed[uid]
   968  		var match bool
   969  		grace := int64(30)
   970  		for _, possible := range [][]syncPodRecord{
   971  			{{name: string(uid), updateType: kubetypes.SyncPodKill, gracePeriod: &grace}, {name: string(uid), terminated: true}},
   972  			{{name: string(uid), updateType: kubetypes.SyncPodCreate}, {name: string(uid), updateType: kubetypes.SyncPodKill, gracePeriod: &grace}, {name: string(uid), terminated: true}},
   973  		} {
   974  			if reflect.DeepEqual(possible, syncPodRecords) {
   975  				match = true
   976  				break
   977  			}
   978  		}
   979  		if !match {
   980  			t.Fatalf("unexpected history for pod %v: %#v", i, syncPodRecords)
   981  		}
   982  	}
   983  }
   984  
   985  func newUIDSet(uids ...types.UID) sets.String {
   986  	set := sets.NewString()
   987  	for _, uid := range uids {
   988  		set.Insert(string(uid))
   989  	}
   990  	return set
   991  }
   992  
   993  type terminalPhaseSync struct {
   994  	lock     sync.Mutex
   995  	fn       syncPodFnType
   996  	terminal sets.String
   997  }
   998  
   999  func (s *terminalPhaseSync) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod *v1.Pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
  1000  	isTerminal, err := s.fn(ctx, updateType, pod, mirrorPod, podStatus)
  1001  	if err != nil {
  1002  		return false, err
  1003  	}
  1004  	if !isTerminal {
  1005  		s.lock.Lock()
  1006  		defer s.lock.Unlock()
  1007  		isTerminal = s.terminal.Has(string(pod.UID))
  1008  	}
  1009  	return isTerminal, nil
  1010  }
  1011  
  1012  func (s *terminalPhaseSync) SetTerminal(uid types.UID) {
  1013  	s.lock.Lock()
  1014  	defer s.lock.Unlock()
  1015  	s.terminal.Insert(string(uid))
  1016  }
  1017  
  1018  func newTerminalPhaseSync(fn syncPodFnType) *terminalPhaseSync {
  1019  	return &terminalPhaseSync{
  1020  		fn:       fn,
  1021  		terminal: sets.NewString(),
  1022  	}
  1023  }
  1024  
  1025  func TestTerminalPhaseTransition(t *testing.T) {
  1026  	podWorkers, _, _ := createPodWorkers()
  1027  	var channels WorkChannel
  1028  	podWorkers.workerChannelFn = channels.Intercept
  1029  	terminalPhaseSyncer := newTerminalPhaseSync(podWorkers.podSyncer.(*podSyncerFuncs).syncPod)
  1030  	podWorkers.podSyncer.(*podSyncerFuncs).syncPod = terminalPhaseSyncer.SyncPod
  1031  
  1032  	// start pod
  1033  	podWorkers.UpdatePod(UpdatePodOptions{
  1034  		Pod:        newNamedPod("1", "test1", "pod1", false),
  1035  		UpdateType: kubetypes.SyncPodUpdate,
  1036  	})
  1037  	drainAllWorkers(podWorkers)
  1038  
  1039  	// should observe pod running
  1040  	pod1 := podWorkers.podSyncStatuses[types.UID("1")]
  1041  	if pod1.IsTerminated() {
  1042  		t.Fatalf("unexpected pod state: %#v", pod1)
  1043  	}
  1044  
  1045  	// send another update to the pod
  1046  	podWorkers.UpdatePod(UpdatePodOptions{
  1047  		Pod:        newNamedPod("1", "test1", "pod1", false),
  1048  		UpdateType: kubetypes.SyncPodUpdate,
  1049  	})
  1050  	drainAllWorkers(podWorkers)
  1051  
  1052  	// should observe pod still running
  1053  	pod1 = podWorkers.podSyncStatuses[types.UID("1")]
  1054  	if pod1.IsTerminated() {
  1055  		t.Fatalf("unexpected pod state: %#v", pod1)
  1056  	}
  1057  
  1058  	// the next sync should result in a transition to terminal
  1059  	terminalPhaseSyncer.SetTerminal(types.UID("1"))
  1060  	podWorkers.UpdatePod(UpdatePodOptions{
  1061  		Pod:        newNamedPod("1", "test1", "pod1", false),
  1062  		UpdateType: kubetypes.SyncPodUpdate,
  1063  	})
  1064  	drainAllWorkers(podWorkers)
  1065  
  1066  	// should observe pod terminating
  1067  	pod1 = podWorkers.podSyncStatuses[types.UID("1")]
  1068  	if !pod1.IsTerminationRequested() || !pod1.IsTerminated() {
  1069  		t.Fatalf("unexpected pod state: %#v", pod1)
  1070  	}
  1071  }
  1072  
  1073  func TestStaticPodExclusion(t *testing.T) {
  1074  	if testing.Short() {
  1075  		t.Skip("skipping test in short mode.")
  1076  	}
  1077  
  1078  	podWorkers, _, processed := createPodWorkers()
  1079  	var channels WorkChannel
  1080  	podWorkers.workerChannelFn = channels.Intercept
  1081  
  1082  	testPod := newNamedPod("2-static", "test1", "pod1", true)
  1083  	if !kubetypes.IsStaticPod(testPod) {
  1084  		t.Fatalf("unable to test static pod")
  1085  	}
  1086  
  1087  	// start two pods with the same name, one static, one apiserver
  1088  	podWorkers.UpdatePod(UpdatePodOptions{
  1089  		Pod:        newNamedPod("1-normal", "test1", "pod1", false),
  1090  		UpdateType: kubetypes.SyncPodUpdate,
  1091  	})
  1092  	podWorkers.UpdatePod(UpdatePodOptions{
  1093  		Pod:        newNamedPod("2-static", "test1", "pod1", true),
  1094  		UpdateType: kubetypes.SyncPodUpdate,
  1095  	})
  1096  	drainAllWorkers(podWorkers)
  1097  
  1098  	// should observe both pods running
  1099  	pod1 := podWorkers.podSyncStatuses[types.UID("1-normal")]
  1100  	if pod1.IsTerminated() {
  1101  		t.Fatalf("unexpected pod state: %#v", pod1)
  1102  	}
  1103  	pod2 := podWorkers.podSyncStatuses[types.UID("2-static")]
  1104  	if pod2.IsTerminated() {
  1105  		t.Fatalf("unexpected pod state: %#v", pod2)
  1106  	}
  1107  
  1108  	if len(processed) != 2 {
  1109  		t.Fatalf("unexpected synced pods: %#v", processed)
  1110  	}
  1111  	if e, a :=
  1112  		[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
  1113  		processed[types.UID("2-static")]; !reflect.DeepEqual(e, a) {
  1114  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(e, a))
  1115  	}
  1116  	if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1117  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1118  	}
  1119  
  1120  	// attempt to start a second and third static pod, which should not start
  1121  	podWorkers.UpdatePod(UpdatePodOptions{
  1122  		Pod:        newNamedPod("3-static", "test1", "pod1", true),
  1123  		UpdateType: kubetypes.SyncPodUpdate,
  1124  	})
  1125  	podWorkers.UpdatePod(UpdatePodOptions{
  1126  		Pod:        newNamedPod("4-static", "test1", "pod1", true),
  1127  		UpdateType: kubetypes.SyncPodUpdate,
  1128  	})
  1129  	drainAllWorkers(podWorkers)
  1130  
  1131  	// should observe both pods running but last pod shouldn't have synced
  1132  	pod1 = podWorkers.podSyncStatuses[types.UID("1-normal")]
  1133  	if pod1.IsTerminated() {
  1134  		t.Fatalf("unexpected pod state: %#v", pod1)
  1135  	}
  1136  	pod2 = podWorkers.podSyncStatuses[types.UID("2-static")]
  1137  	if pod2.IsTerminated() {
  1138  		t.Fatalf("unexpected pod state: %#v", pod2)
  1139  	}
  1140  	pod3 := podWorkers.podSyncStatuses[types.UID("3-static")]
  1141  	if pod3.IsTerminated() {
  1142  		t.Fatalf("unexpected pod state: %#v", pod3)
  1143  	}
  1144  	pod4 := podWorkers.podSyncStatuses[types.UID("4-static")]
  1145  	if pod4.IsTerminated() {
  1146  		t.Fatalf("unexpected pod state: %#v", pod4)
  1147  	}
  1148  
  1149  	if len(processed) != 2 {
  1150  		t.Fatalf("unexpected synced pods: %#v", processed)
  1151  	}
  1152  	if expected, actual :=
  1153  		[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
  1154  		processed[types.UID("2-static")]; !reflect.DeepEqual(expected, actual) {
  1155  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1156  	}
  1157  	if expected, actual :=
  1158  		[]syncPodRecord(nil),
  1159  		processed[types.UID("3-static")]; !reflect.DeepEqual(expected, actual) {
  1160  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1161  	}
  1162  	if expected, actual :=
  1163  		[]syncPodRecord(nil),
  1164  		processed[types.UID("4-static")]; !reflect.DeepEqual(expected, actual) {
  1165  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1166  	}
  1167  	if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1168  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1169  	}
  1170  	if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1171  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1172  	}
  1173  	// verify all are enqueued
  1174  	if e, a := sets.NewString("1-normal", "2-static", "4-static", "3-static"), podWorkers.workQueue.(*fakeQueue).Set(); !e.Equal(a) {
  1175  		t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
  1176  	}
  1177  
  1178  	// send a basic update for 3-static
  1179  	podWorkers.workQueue.GetWork()
  1180  	podWorkers.UpdatePod(UpdatePodOptions{
  1181  		Pod:        newNamedPod("3-static", "test1", "pod1", true),
  1182  		UpdateType: kubetypes.SyncPodUpdate,
  1183  	})
  1184  	drainAllWorkers(podWorkers)
  1185  
  1186  	// 3-static should not be started because 2-static is still running
  1187  	if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1188  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1189  	}
  1190  	if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1191  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1192  	}
  1193  	// the queue should include a single item for 3-static (indicating we need to retry later)
  1194  	if e, a := sets.NewString("3-static"), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) {
  1195  		t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
  1196  	}
  1197  
  1198  	// mark 3-static as deleted while 2-static is still running
  1199  	podWorkers.workQueue.GetWork()
  1200  	podWorkers.UpdatePod(UpdatePodOptions{
  1201  		Pod:        newNamedPod("3-static", "test1", "pod1", true),
  1202  		UpdateType: kubetypes.SyncPodKill,
  1203  	})
  1204  	drainAllWorkers(podWorkers)
  1205  
  1206  	// should observe 3-static as terminated because it has never started, but other state should be a no-op
  1207  	pod3 = podWorkers.podSyncStatuses[types.UID("3-static")]
  1208  	if !pod3.IsTerminated() {
  1209  		t.Fatalf("unexpected pod state: %#v", pod3)
  1210  	}
  1211  	// the queue should be empty because the worker is now done
  1212  	if e, a := sets.NewString(), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) {
  1213  		t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
  1214  	}
  1215  	// 2-static is still running
  1216  	if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1217  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1218  	}
  1219  	// 3-static and 4-static are both still queued
  1220  	if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1221  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1222  	}
  1223  
  1224  	// terminate 2-static
  1225  	podWorkers.UpdatePod(UpdatePodOptions{
  1226  		Pod:        newNamedPod("2-static", "test1", "pod1", true),
  1227  		UpdateType: kubetypes.SyncPodKill,
  1228  	})
  1229  	drainAllWorkers(podWorkers)
  1230  
  1231  	// should observe 2-static as terminated, and 2-static should no longer be reported as the started static pod
  1232  	pod2 = podWorkers.podSyncStatuses[types.UID("2-static")]
  1233  	if !pod2.IsTerminated() {
  1234  		t.Fatalf("unexpected pod state: %#v", pod3)
  1235  	}
  1236  	if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1237  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1238  	}
  1239  	if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1240  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1241  	}
  1242  
  1243  	// simulate a periodic event from the work queue for 4-static
  1244  	podWorkers.UpdatePod(UpdatePodOptions{
  1245  		Pod:        newNamedPod("4-static", "test1", "pod1", true),
  1246  		UpdateType: kubetypes.SyncPodUpdate,
  1247  	})
  1248  	drainAllWorkers(podWorkers)
  1249  
  1250  	// 4-static should be started because 3-static has already terminated
  1251  	pod4 = podWorkers.podSyncStatuses[types.UID("4-static")]
  1252  	if pod4.IsTerminated() {
  1253  		t.Fatalf("unexpected pod state: %#v", pod3)
  1254  	}
  1255  	if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1256  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1257  	}
  1258  	if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1259  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1260  	}
  1261  
  1262  	// initiate a sync with all pods remaining
  1263  	state := podWorkers.SyncKnownPods([]*v1.Pod{
  1264  		newNamedPod("1-normal", "test1", "pod1", false),
  1265  		newNamedPod("2-static", "test1", "pod1", true),
  1266  		newNamedPod("3-static", "test1", "pod1", true),
  1267  		newNamedPod("4-static", "test1", "pod1", true),
  1268  	})
  1269  	drainAllWorkers(podWorkers)
  1270  
  1271  	// 2-static and 3-static should both be listed as terminated
  1272  	if e, a := map[types.UID]PodWorkerSync{
  1273  		"1-normal": {State: SyncPod, HasConfig: true},
  1274  		"2-static": {State: TerminatedPod, HasConfig: true, Static: true},
  1275  		"3-static": {State: TerminatedPod},
  1276  		"4-static": {State: SyncPod, HasConfig: true, Static: true},
  1277  	}, state; !reflect.DeepEqual(e, a) {
  1278  		t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a))
  1279  	}
  1280  	// 3-static is still in the config, it should still be in our status
  1281  	if status, ok := podWorkers.podSyncStatuses["3-static"]; !ok || status.terminatedAt.IsZero() || !status.finished || status.working {
  1282  		t.Fatalf("unexpected post termination status: %#v", status)
  1283  	}
  1284  
  1285  	// initiate a sync with 3-static removed
  1286  	state = podWorkers.SyncKnownPods([]*v1.Pod{
  1287  		newNamedPod("1-normal", "test1", "pod1", false),
  1288  		newNamedPod("2-static", "test1", "pod1", true),
  1289  		newNamedPod("4-static", "test1", "pod1", true),
  1290  	})
  1291  	drainAllWorkers(podWorkers)
  1292  
  1293  	// expect sync to put 3-static into final state and remove the status
  1294  	if e, a := map[types.UID]PodWorkerSync{
  1295  		"1-normal": {State: SyncPod, HasConfig: true},
  1296  		"2-static": {State: TerminatedPod, HasConfig: true, Static: true},
  1297  		"4-static": {State: SyncPod, HasConfig: true, Static: true},
  1298  	}, state; !reflect.DeepEqual(e, a) {
  1299  		t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a))
  1300  	}
  1301  	if status, ok := podWorkers.podSyncStatuses["3-static"]; ok {
  1302  		t.Fatalf("unexpected post termination status: %#v", status)
  1303  	}
  1304  
  1305  	// start a static pod, kill it, then add another one, but ensure the pod worker
  1306  	// for pod 5 doesn't see the kill event (so it remains waiting to start)
  1307  	podWorkers.UpdatePod(UpdatePodOptions{
  1308  		Pod:        newNamedPod("5-static", "test1", "pod1", true),
  1309  		UpdateType: kubetypes.SyncPodUpdate,
  1310  	})
  1311  	// Wait for the previous work to be delivered to the worker
  1312  	drainAllWorkers(podWorkers)
  1313  	channels.Channel("5-static").Hold()
  1314  	podWorkers.UpdatePod(UpdatePodOptions{
  1315  		Pod:        newNamedPod("5-static", "test1", "pod1", true),
  1316  		UpdateType: kubetypes.SyncPodKill,
  1317  	})
  1318  	podWorkers.UpdatePod(UpdatePodOptions{
  1319  		Pod:        newNamedPod("6-static", "test1", "pod1", true),
  1320  		UpdateType: kubetypes.SyncPodUpdate,
  1321  	})
  1322  	drainWorkersExcept(podWorkers, "5-static")
  1323  
  1324  	// pod 5 should have termination requested, but hasn't cleaned up
  1325  	pod5 := podWorkers.podSyncStatuses[types.UID("5-static")]
  1326  	if !pod5.IsTerminationRequested() || pod5.IsTerminated() {
  1327  		t.Fatalf("unexpected status for pod 5: %#v", pod5)
  1328  	}
  1329  	if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1330  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1331  	}
  1332  	if e, a := map[string][]types.UID{"pod1_test1": {"5-static", "6-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1333  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1334  	}
  1335  
  1336  	// terminate 4-static and wake 6-static
  1337  	podWorkers.UpdatePod(UpdatePodOptions{
  1338  		Pod:        newNamedPod("4-static", "test1", "pod1", true),
  1339  		UpdateType: kubetypes.SyncPodKill,
  1340  	})
  1341  	drainWorkersExcept(podWorkers, "5-static")
  1342  	podWorkers.UpdatePod(UpdatePodOptions{
  1343  		Pod:        newNamedPod("6-static", "test1", "pod1", true),
  1344  		UpdateType: kubetypes.SyncPodUpdate,
  1345  	})
  1346  	drainWorkersExcept(podWorkers, "5-static")
  1347  
  1348  	// 5-static should still be waiting, 6-static should have started and synced
  1349  	pod5 = podWorkers.podSyncStatuses[types.UID("5-static")]
  1350  	if !pod5.IsTerminationRequested() || pod5.IsTerminated() {
  1351  		t.Fatalf("unexpected status for pod 5: %#v", pod5)
  1352  	}
  1353  	if e, a := map[string]types.UID{"pod1_test1": "6-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1354  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1355  	}
  1356  	// no static pods shoud be waiting
  1357  	if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1358  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1359  	}
  1360  	// prove 6-static synced
  1361  	if expected, actual :=
  1362  		[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
  1363  		processed[types.UID("6-static")]; !reflect.DeepEqual(expected, actual) {
  1364  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1365  	}
  1366  
  1367  	// ensure 5-static exits when we deliver the event out of order
  1368  	channels.Channel("5-static").Release()
  1369  	drainAllWorkers(podWorkers)
  1370  	pod5 = podWorkers.podSyncStatuses[types.UID("5-static")]
  1371  	if !pod5.IsTerminated() {
  1372  		t.Fatalf("unexpected status for pod 5: %#v", pod5)
  1373  	}
  1374  
  1375  	// start three more static pods, kill the previous static pod blocking start,
  1376  	// and simulate the second pod of three (8) getting to run first
  1377  	podWorkers.UpdatePod(UpdatePodOptions{
  1378  		Pod:        newNamedPod("7-static", "test1", "pod1", true),
  1379  		UpdateType: kubetypes.SyncPodUpdate,
  1380  	})
  1381  	podWorkers.UpdatePod(UpdatePodOptions{
  1382  		Pod:        newNamedPod("8-static", "test1", "pod1", true),
  1383  		UpdateType: kubetypes.SyncPodUpdate,
  1384  	})
  1385  	podWorkers.UpdatePod(UpdatePodOptions{
  1386  		Pod:        newNamedPod("9-static", "test1", "pod1", true),
  1387  		UpdateType: kubetypes.SyncPodUpdate,
  1388  	})
  1389  	drainAllWorkers(podWorkers)
  1390  	podWorkers.UpdatePod(UpdatePodOptions{
  1391  		Pod:        newNamedPod("6-static", "test1", "pod1", true),
  1392  		UpdateType: kubetypes.SyncPodKill,
  1393  	})
  1394  	drainAllWorkers(podWorkers)
  1395  	podWorkers.UpdatePod(UpdatePodOptions{
  1396  		Pod:        newNamedPod("6-static", "test1", "pod1", true),
  1397  		UpdateType: kubetypes.SyncPodCreate,
  1398  	})
  1399  	drainAllWorkers(podWorkers)
  1400  	podWorkers.UpdatePod(UpdatePodOptions{
  1401  		Pod:        newNamedPod("8-static", "test1", "pod1", true),
  1402  		UpdateType: kubetypes.SyncPodUpdate,
  1403  	})
  1404  	drainAllWorkers(podWorkers)
  1405  
  1406  	// 6 should have been detected as restartable
  1407  	if status := podWorkers.podSyncStatuses["6-static"]; !status.restartRequested {
  1408  		t.Fatalf("unexpected restarted static pod: %#v", status)
  1409  	}
  1410  	// 7 and 8 should both be waiting still with no syncs
  1411  	if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1412  		t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
  1413  	}
  1414  	// only 7-static can start now, but it hasn't received an event
  1415  	if e, a := map[string][]types.UID{"pod1_test1": {"7-static", "8-static", "9-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
  1416  		t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
  1417  	}
  1418  	// none of the new pods have synced
  1419  	if expected, actual :=
  1420  		[]syncPodRecord(nil),
  1421  		processed[types.UID("7-static")]; !reflect.DeepEqual(expected, actual) {
  1422  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1423  	}
  1424  	if expected, actual :=
  1425  		[]syncPodRecord(nil),
  1426  		processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) {
  1427  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1428  	}
  1429  	if expected, actual :=
  1430  		[]syncPodRecord(nil),
  1431  		processed[types.UID("9-static")]; !reflect.DeepEqual(expected, actual) {
  1432  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1433  	}
  1434  
  1435  	// terminate 7-static and wake 8-static
  1436  	podWorkers.UpdatePod(UpdatePodOptions{
  1437  		Pod:        newNamedPod("7-static", "test1", "pod1", true),
  1438  		UpdateType: kubetypes.SyncPodKill,
  1439  	})
  1440  	drainAllWorkers(podWorkers)
  1441  	podWorkers.UpdatePod(UpdatePodOptions{
  1442  		Pod:        newNamedPod("8-static", "test1", "pod1", true),
  1443  		UpdateType: kubetypes.SyncPodUpdate,
  1444  	})
  1445  	drainAllWorkers(podWorkers)
  1446  
  1447  	// 8 should have synced
  1448  	if expected, actual :=
  1449  		[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
  1450  		processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) {
  1451  		t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
  1452  	}
  1453  
  1454  	// initiate a sync with all but 8-static pods undesired
  1455  	state = podWorkers.SyncKnownPods([]*v1.Pod{
  1456  		newNamedPod("8-static", "test1", "pod1", true),
  1457  	})
  1458  	drainAllWorkers(podWorkers)
  1459  	if e, a := map[types.UID]PodWorkerSync{
  1460  		"1-normal": {State: TerminatingPod, Orphan: true, HasConfig: true},
  1461  		"8-static": {State: SyncPod, HasConfig: true, Static: true},
  1462  	}, state; !reflect.DeepEqual(e, a) {
  1463  		t.Fatalf("unexpected actual restartable: %s", cmp.Diff(e, a))
  1464  	}
  1465  }
  1466  
  1467  type WorkChannelItem struct {
  1468  	out   chan struct{}
  1469  	lock  sync.Mutex
  1470  	pause bool
  1471  	queue int
  1472  }
  1473  
  1474  func (item *WorkChannelItem) Handle() {
  1475  	item.lock.Lock()
  1476  	defer item.lock.Unlock()
  1477  	if item.pause {
  1478  		item.queue++
  1479  		return
  1480  	}
  1481  	item.out <- struct{}{}
  1482  }
  1483  
  1484  func (item *WorkChannelItem) Hold() {
  1485  	item.lock.Lock()
  1486  	defer item.lock.Unlock()
  1487  	item.pause = true
  1488  }
  1489  
  1490  func (item *WorkChannelItem) Close() {
  1491  	item.lock.Lock()
  1492  	defer item.lock.Unlock()
  1493  	if item.out != nil {
  1494  		close(item.out)
  1495  		item.out = nil
  1496  	}
  1497  }
  1498  
  1499  // Release blocks until all work is passed on the chain
  1500  func (item *WorkChannelItem) Release() {
  1501  	item.lock.Lock()
  1502  	defer item.lock.Unlock()
  1503  	item.pause = false
  1504  	for i := 0; i < item.queue; i++ {
  1505  		item.out <- struct{}{}
  1506  	}
  1507  	item.queue = 0
  1508  }
  1509  
  1510  // WorkChannel intercepts podWork channels between the pod worker and its child
  1511  // goroutines and allows tests to pause or release the flow of podWork to the
  1512  // workers.
  1513  type WorkChannel struct {
  1514  	lock     sync.Mutex
  1515  	channels map[types.UID]*WorkChannelItem
  1516  }
  1517  
  1518  func (w *WorkChannel) Channel(uid types.UID) *WorkChannelItem {
  1519  	w.lock.Lock()
  1520  	defer w.lock.Unlock()
  1521  	if w.channels == nil {
  1522  		w.channels = make(map[types.UID]*WorkChannelItem)
  1523  	}
  1524  	channel, ok := w.channels[uid]
  1525  	if !ok {
  1526  		channel = &WorkChannelItem{
  1527  			out: make(chan struct{}, 1),
  1528  		}
  1529  		w.channels[uid] = channel
  1530  	}
  1531  	return channel
  1532  }
  1533  
  1534  func (w *WorkChannel) Intercept(uid types.UID, ch chan struct{}) (outCh <-chan struct{}) {
  1535  	channel := w.Channel(uid)
  1536  	w.lock.Lock()
  1537  
  1538  	defer w.lock.Unlock()
  1539  	go func() {
  1540  		defer func() {
  1541  			channel.Close()
  1542  			w.lock.Lock()
  1543  			defer w.lock.Unlock()
  1544  			delete(w.channels, uid)
  1545  		}()
  1546  		for range ch {
  1547  			channel.Handle()
  1548  		}
  1549  	}()
  1550  	return channel.out
  1551  }
  1552  
  1553  func TestSyncKnownPods(t *testing.T) {
  1554  	podWorkers, _, _ := createPodWorkers()
  1555  
  1556  	numPods := 20
  1557  	for i := 0; i < numPods; i++ {
  1558  		podWorkers.UpdatePod(UpdatePodOptions{
  1559  			Pod:        newNamedPod(strconv.Itoa(i), "ns", "name", false),
  1560  			UpdateType: kubetypes.SyncPodUpdate,
  1561  		})
  1562  	}
  1563  	drainWorkers(podWorkers, numPods)
  1564  
  1565  	if len(podWorkers.podUpdates) != numPods {
  1566  		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  1567  	}
  1568  
  1569  	desiredPods := map[types.UID]sets.Empty{}
  1570  	desiredPods[types.UID("2")] = sets.Empty{}
  1571  	desiredPods[types.UID("14")] = sets.Empty{}
  1572  	desiredPodList := []*v1.Pod{newNamedPod("2", "ns", "name", false), newNamedPod("14", "ns", "name", false)}
  1573  
  1574  	// kill all but the requested pods
  1575  	for i := 0; i < numPods; i++ {
  1576  		pod := newNamedPod(strconv.Itoa(i), "ns", "name", false)
  1577  		if _, ok := desiredPods[pod.UID]; ok {
  1578  			continue
  1579  		}
  1580  		if (i % 2) == 0 {
  1581  			now := metav1.Now()
  1582  			pod.DeletionTimestamp = &now
  1583  		}
  1584  		podWorkers.UpdatePod(UpdatePodOptions{
  1585  			Pod:        pod,
  1586  			UpdateType: kubetypes.SyncPodKill,
  1587  		})
  1588  	}
  1589  	drainWorkers(podWorkers, numPods)
  1590  
  1591  	if !podWorkers.ShouldPodContainersBeTerminating(types.UID("0")) {
  1592  		t.Errorf("Expected pod to be terminating")
  1593  	}
  1594  	if !podWorkers.ShouldPodContainersBeTerminating(types.UID("1")) {
  1595  		t.Errorf("Expected pod to be terminating")
  1596  	}
  1597  	if podWorkers.ShouldPodContainersBeTerminating(types.UID("2")) {
  1598  		t.Errorf("Expected pod to not be terminating")
  1599  	}
  1600  	if !podWorkers.IsPodTerminationRequested(types.UID("0")) {
  1601  		t.Errorf("Expected pod to be terminating")
  1602  	}
  1603  	if podWorkers.IsPodTerminationRequested(types.UID("2")) {
  1604  		t.Errorf("Expected pod to not be terminating")
  1605  	}
  1606  
  1607  	if podWorkers.CouldHaveRunningContainers(types.UID("0")) {
  1608  		t.Errorf("Expected pod to be terminated (deleted and terminated)")
  1609  	}
  1610  	if podWorkers.CouldHaveRunningContainers(types.UID("1")) {
  1611  		t.Errorf("Expected pod to be terminated")
  1612  	}
  1613  	if !podWorkers.CouldHaveRunningContainers(types.UID("2")) {
  1614  		t.Errorf("Expected pod to not be terminated")
  1615  	}
  1616  
  1617  	if !podWorkers.ShouldPodContentBeRemoved(types.UID("0")) {
  1618  		t.Errorf("Expected pod to be suitable for removal (deleted and terminated)")
  1619  	}
  1620  	if podWorkers.ShouldPodContentBeRemoved(types.UID("1")) {
  1621  		t.Errorf("Expected pod to not be suitable for removal (terminated but not deleted)")
  1622  	}
  1623  	if podWorkers.ShouldPodContentBeRemoved(types.UID("2")) {
  1624  		t.Errorf("Expected pod to not be suitable for removal (not terminated)")
  1625  	}
  1626  
  1627  	if podWorkers.ShouldPodContainersBeTerminating(types.UID("abc")) {
  1628  		t.Errorf("Expected pod to not be known to be terminating (does not exist but not yet synced)")
  1629  	}
  1630  	if !podWorkers.CouldHaveRunningContainers(types.UID("abc")) {
  1631  		t.Errorf("Expected pod to potentially have running containers (does not exist but not yet synced)")
  1632  	}
  1633  	if podWorkers.ShouldPodContentBeRemoved(types.UID("abc")) {
  1634  		t.Errorf("Expected pod to not be suitable for removal (does not exist but not yet synced)")
  1635  	}
  1636  
  1637  	podWorkers.SyncKnownPods(desiredPodList)
  1638  	if len(podWorkers.podUpdates) != 2 {
  1639  		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  1640  	}
  1641  	if _, exists := podWorkers.podUpdates[types.UID("2")]; !exists {
  1642  		t.Errorf("No updates channel for pod 2")
  1643  	}
  1644  	if _, exists := podWorkers.podUpdates[types.UID("14")]; !exists {
  1645  		t.Errorf("No updates channel for pod 14")
  1646  	}
  1647  	if podWorkers.IsPodTerminationRequested(types.UID("2")) {
  1648  		t.Errorf("Expected pod termination request to be cleared after sync")
  1649  	}
  1650  
  1651  	if !podWorkers.ShouldPodContainersBeTerminating(types.UID("abc")) {
  1652  		t.Errorf("Expected pod to be expected to terminate containers (does not exist and synced at least once)")
  1653  	}
  1654  	if podWorkers.CouldHaveRunningContainers(types.UID("abc")) {
  1655  		t.Errorf("Expected pod to be known not to have running containers (does not exist and synced at least once)")
  1656  	}
  1657  	if !podWorkers.ShouldPodContentBeRemoved(types.UID("abc")) {
  1658  		t.Errorf("Expected pod to be suitable for removal (does not exist and synced at least once)")
  1659  	}
  1660  
  1661  	// verify workers that are not terminated stay open even if config no longer
  1662  	// sees them
  1663  	podWorkers.SyncKnownPods(nil)
  1664  	drainAllWorkers(podWorkers)
  1665  	if len(podWorkers.podUpdates) != 0 {
  1666  		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  1667  	}
  1668  	if len(podWorkers.podSyncStatuses) != 2 {
  1669  		t.Errorf("Incorrect number of tracked statuses: %#v", podWorkers.podSyncStatuses)
  1670  	}
  1671  
  1672  	for uid := range desiredPods {
  1673  		pod := newNamedPod(string(uid), "ns", "name", false)
  1674  		podWorkers.UpdatePod(UpdatePodOptions{
  1675  			Pod:        pod,
  1676  			UpdateType: kubetypes.SyncPodKill,
  1677  		})
  1678  	}
  1679  	drainWorkers(podWorkers, numPods)
  1680  
  1681  	// verify once those pods terminate (via some other flow) the workers are cleared
  1682  	podWorkers.SyncKnownPods(nil)
  1683  	if len(podWorkers.podUpdates) != 0 {
  1684  		t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
  1685  	}
  1686  	if len(podWorkers.podSyncStatuses) != 0 {
  1687  		t.Errorf("Incorrect number of tracked statuses: %#v", podWorkers.podSyncStatuses)
  1688  	}
  1689  }
  1690  
  1691  func Test_removeTerminatedWorker(t *testing.T) {
  1692  	podUID := types.UID("pod-uid")
  1693  
  1694  	testCases := []struct {
  1695  		desc                               string
  1696  		orphan                             bool
  1697  		podSyncStatus                      *podSyncStatus
  1698  		startedStaticPodsByFullname        map[string]types.UID
  1699  		waitingToStartStaticPodsByFullname map[string][]types.UID
  1700  		removed                            bool
  1701  		expectGracePeriod                  int64
  1702  		expectPending                      *UpdatePodOptions
  1703  	}{
  1704  		{
  1705  			desc: "finished worker",
  1706  			podSyncStatus: &podSyncStatus{
  1707  				finished: true,
  1708  			},
  1709  			removed: true,
  1710  		},
  1711  		{
  1712  			desc: "waiting to start worker because of another started pod with the same fullname",
  1713  			podSyncStatus: &podSyncStatus{
  1714  				finished: false,
  1715  				fullname: "fake-fullname",
  1716  			},
  1717  			startedStaticPodsByFullname: map[string]types.UID{
  1718  				"fake-fullname": "another-pod-uid",
  1719  			},
  1720  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  1721  				"fake-fullname": {podUID},
  1722  			},
  1723  			removed: false,
  1724  		},
  1725  		{
  1726  			desc: "not yet started worker",
  1727  			podSyncStatus: &podSyncStatus{
  1728  				finished: false,
  1729  				fullname: "fake-fullname",
  1730  			},
  1731  			startedStaticPodsByFullname: make(map[string]types.UID),
  1732  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  1733  				"fake-fullname": {podUID},
  1734  			},
  1735  			removed: false,
  1736  		},
  1737  		{
  1738  			desc: "orphaned not started worker",
  1739  			podSyncStatus: &podSyncStatus{
  1740  				finished: false,
  1741  				fullname: "fake-fullname",
  1742  			},
  1743  			orphan:  true,
  1744  			removed: true,
  1745  		},
  1746  		{
  1747  			desc: "orphaned started worker",
  1748  			podSyncStatus: &podSyncStatus{
  1749  				startedAt: time.Unix(1, 0),
  1750  				finished:  false,
  1751  				fullname:  "fake-fullname",
  1752  			},
  1753  			orphan:  true,
  1754  			removed: false,
  1755  		},
  1756  		{
  1757  			desc: "orphaned terminating worker with no activeUpdate",
  1758  			podSyncStatus: &podSyncStatus{
  1759  				startedAt:     time.Unix(1, 0),
  1760  				terminatingAt: time.Unix(2, 0),
  1761  				finished:      false,
  1762  				fullname:      "fake-fullname",
  1763  			},
  1764  			orphan:  true,
  1765  			removed: false,
  1766  		},
  1767  		{
  1768  			desc: "orphaned terminating worker",
  1769  			podSyncStatus: &podSyncStatus{
  1770  				startedAt:     time.Unix(1, 0),
  1771  				terminatingAt: time.Unix(2, 0),
  1772  				finished:      false,
  1773  				fullname:      "fake-fullname",
  1774  				activeUpdate: &UpdatePodOptions{
  1775  					Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
  1776  				},
  1777  			},
  1778  			orphan:  true,
  1779  			removed: false,
  1780  			expectPending: &UpdatePodOptions{
  1781  				Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
  1782  			},
  1783  		},
  1784  		{
  1785  			desc: "orphaned terminating worker with pendingUpdate",
  1786  			podSyncStatus: &podSyncStatus{
  1787  				startedAt:     time.Unix(1, 0),
  1788  				terminatingAt: time.Unix(2, 0),
  1789  				finished:      false,
  1790  				fullname:      "fake-fullname",
  1791  				working:       true,
  1792  				pendingUpdate: &UpdatePodOptions{
  1793  					Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
  1794  				},
  1795  				activeUpdate: &UpdatePodOptions{
  1796  					Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
  1797  				},
  1798  			},
  1799  			orphan:  true,
  1800  			removed: false,
  1801  			expectPending: &UpdatePodOptions{
  1802  				Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
  1803  			},
  1804  		},
  1805  		{
  1806  			desc: "orphaned terminated worker with no activeUpdate",
  1807  			podSyncStatus: &podSyncStatus{
  1808  				startedAt:     time.Unix(1, 0),
  1809  				terminatingAt: time.Unix(2, 0),
  1810  				terminatedAt:  time.Unix(3, 0),
  1811  				finished:      false,
  1812  				fullname:      "fake-fullname",
  1813  			},
  1814  			orphan:  true,
  1815  			removed: false,
  1816  		},
  1817  		{
  1818  			desc: "orphaned terminated worker",
  1819  			podSyncStatus: &podSyncStatus{
  1820  				startedAt:     time.Unix(1, 0),
  1821  				terminatingAt: time.Unix(2, 0),
  1822  				terminatedAt:  time.Unix(3, 0),
  1823  				finished:      false,
  1824  				fullname:      "fake-fullname",
  1825  				activeUpdate: &UpdatePodOptions{
  1826  					Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
  1827  				},
  1828  			},
  1829  			orphan:  true,
  1830  			removed: false,
  1831  			expectPending: &UpdatePodOptions{
  1832  				Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
  1833  			},
  1834  		},
  1835  		{
  1836  			desc: "orphaned terminated worker with pendingUpdate",
  1837  			podSyncStatus: &podSyncStatus{
  1838  				startedAt:     time.Unix(1, 0),
  1839  				terminatingAt: time.Unix(2, 0),
  1840  				terminatedAt:  time.Unix(3, 0),
  1841  				finished:      false,
  1842  				working:       true,
  1843  				fullname:      "fake-fullname",
  1844  				pendingUpdate: &UpdatePodOptions{
  1845  					Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
  1846  				},
  1847  				activeUpdate: &UpdatePodOptions{
  1848  					Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "1"}},
  1849  				},
  1850  			},
  1851  			orphan:  true,
  1852  			removed: false,
  1853  			expectPending: &UpdatePodOptions{
  1854  				Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: podUID, Name: "2"}},
  1855  			},
  1856  		},
  1857  	}
  1858  
  1859  	for _, tc := range testCases {
  1860  		t.Run(tc.desc, func(t *testing.T) {
  1861  			podWorkers, _, _ := createPodWorkers()
  1862  			podWorkers.podSyncStatuses[podUID] = tc.podSyncStatus
  1863  			podWorkers.podUpdates[podUID] = make(chan struct{}, 1)
  1864  			if tc.podSyncStatus.working {
  1865  				podWorkers.podUpdates[podUID] <- struct{}{}
  1866  			}
  1867  			podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
  1868  			podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
  1869  
  1870  			podWorkers.removeTerminatedWorker(podUID, podWorkers.podSyncStatuses[podUID], tc.orphan)
  1871  			status, exists := podWorkers.podSyncStatuses[podUID]
  1872  			if tc.removed && exists {
  1873  				t.Fatalf("Expected pod worker to be removed")
  1874  			}
  1875  			if !tc.removed && !exists {
  1876  				t.Fatalf("Expected pod worker to not be removed")
  1877  			}
  1878  			if tc.removed {
  1879  				return
  1880  			}
  1881  			if tc.expectGracePeriod > 0 && status.gracePeriod != tc.expectGracePeriod {
  1882  				t.Errorf("Unexpected grace period %d", status.gracePeriod)
  1883  			}
  1884  			if !reflect.DeepEqual(tc.expectPending, status.pendingUpdate) {
  1885  				t.Errorf("Unexpected pending: %s", cmp.Diff(tc.expectPending, status.pendingUpdate))
  1886  			}
  1887  			if tc.expectPending != nil {
  1888  				if !status.working {
  1889  					t.Errorf("Should be working")
  1890  				}
  1891  				if len(podWorkers.podUpdates[podUID]) != 1 {
  1892  					t.Errorf("Should have one entry in podUpdates")
  1893  				}
  1894  			}
  1895  		})
  1896  	}
  1897  }
  1898  
  1899  type simpleFakeKubelet struct {
  1900  	pod       *v1.Pod
  1901  	mirrorPod *v1.Pod
  1902  	podStatus *kubecontainer.PodStatus
  1903  	wg        sync.WaitGroup
  1904  }
  1905  
  1906  func (kl *simpleFakeKubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
  1907  	kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
  1908  	return false, nil
  1909  }
  1910  
  1911  func (kl *simpleFakeKubelet) SyncPodWithWaitGroup(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
  1912  	kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, podStatus
  1913  	kl.wg.Done()
  1914  	return false, nil
  1915  }
  1916  
  1917  func (kl *simpleFakeKubelet) SyncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
  1918  	return nil
  1919  }
  1920  
  1921  func (kl *simpleFakeKubelet) SyncTerminatingRuntimePod(ctx context.Context, runningPod *kubecontainer.Pod) error {
  1922  	return nil
  1923  }
  1924  
  1925  func (kl *simpleFakeKubelet) SyncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
  1926  	return nil
  1927  }
  1928  
  1929  // TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
  1930  // for their invocation of the syncPodFn.
  1931  func TestFakePodWorkers(t *testing.T) {
  1932  	fakeRecorder := &record.FakeRecorder{}
  1933  	fakeRuntime := &containertest.FakeRuntime{}
  1934  	fakeCache := containertest.NewFakeCache(fakeRuntime)
  1935  
  1936  	kubeletForRealWorkers := &simpleFakeKubelet{}
  1937  	kubeletForFakeWorkers := &simpleFakeKubelet{}
  1938  	realPodSyncer := newPodSyncerFuncs(kubeletForRealWorkers)
  1939  	realPodSyncer.syncPod = kubeletForRealWorkers.SyncPodWithWaitGroup
  1940  
  1941  	realPodWorkers := newPodWorkers(
  1942  		realPodSyncer,
  1943  		fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)
  1944  	fakePodWorkers := &fakePodWorkers{
  1945  		syncPodFn: kubeletForFakeWorkers.SyncPod,
  1946  		cache:     fakeCache,
  1947  		t:         t,
  1948  	}
  1949  
  1950  	tests := []struct {
  1951  		pod       *v1.Pod
  1952  		mirrorPod *v1.Pod
  1953  	}{
  1954  		{
  1955  			&v1.Pod{},
  1956  			&v1.Pod{},
  1957  		},
  1958  		{
  1959  			podWithUIDNameNs("12345678", "foo", "new"),
  1960  			podWithUIDNameNs("12345678", "fooMirror", "new"),
  1961  		},
  1962  		{
  1963  			podWithUIDNameNs("98765", "bar", "new"),
  1964  			podWithUIDNameNs("98765", "barMirror", "new"),
  1965  		},
  1966  	}
  1967  
  1968  	for i, tt := range tests {
  1969  		kubeletForRealWorkers.wg.Add(1)
  1970  		realPodWorkers.UpdatePod(UpdatePodOptions{
  1971  			Pod:        tt.pod,
  1972  			MirrorPod:  tt.mirrorPod,
  1973  			UpdateType: kubetypes.SyncPodUpdate,
  1974  		})
  1975  		fakePodWorkers.UpdatePod(UpdatePodOptions{
  1976  			Pod:        tt.pod,
  1977  			MirrorPod:  tt.mirrorPod,
  1978  			UpdateType: kubetypes.SyncPodUpdate,
  1979  		})
  1980  
  1981  		kubeletForRealWorkers.wg.Wait()
  1982  
  1983  		if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
  1984  			t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
  1985  		}
  1986  
  1987  		if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
  1988  			t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
  1989  		}
  1990  
  1991  		if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
  1992  			t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
  1993  		}
  1994  	}
  1995  }
  1996  
  1997  // TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
  1998  func TestKillPodNowFunc(t *testing.T) {
  1999  	fakeRecorder := &record.FakeRecorder{}
  2000  	podWorkers, _, processed := createPodWorkers()
  2001  	killPodFunc := killPodNow(podWorkers, fakeRecorder)
  2002  	pod := newNamedPod("test", "ns", "test", false)
  2003  	gracePeriodOverride := int64(0)
  2004  	err := killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
  2005  		status.Phase = v1.PodFailed
  2006  		status.Reason = "reason"
  2007  		status.Message = "message"
  2008  	})
  2009  	if err != nil {
  2010  		t.Fatalf("Unexpected error: %v", err)
  2011  	}
  2012  	drainAllWorkers(podWorkers)
  2013  	if len(processed) != 1 {
  2014  		t.Fatalf("len(processed) expected: %v, actual: %#v", 1, processed)
  2015  	}
  2016  	syncPodRecords := processed[pod.UID]
  2017  	if len(syncPodRecords) != 2 {
  2018  		t.Fatalf("Pod processed expected %v times, got %#v", 1, syncPodRecords)
  2019  	}
  2020  	if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
  2021  		t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
  2022  	}
  2023  	if !syncPodRecords[1].terminated {
  2024  		t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true)
  2025  	}
  2026  }
  2027  
  2028  func Test_allowPodStart(t *testing.T) {
  2029  	testCases := []struct {
  2030  		desc                               string
  2031  		pod                                *v1.Pod
  2032  		podSyncStatuses                    map[types.UID]*podSyncStatus
  2033  		startedStaticPodsByFullname        map[string]types.UID
  2034  		waitingToStartStaticPodsByFullname map[string][]types.UID
  2035  
  2036  		expectedStartedStaticPodsByFullname        map[string]types.UID
  2037  		expectedWaitingToStartStaticPodsByFullname map[string][]types.UID
  2038  		allowed                                    bool
  2039  		allowedEver                                bool
  2040  	}{
  2041  		{
  2042  			// TODO: Do we want to allow non-static pods with the same full name?
  2043  			// Note that it may disable the force deletion of pods.
  2044  			desc: "non-static pod",
  2045  			pod:  newNamedPod("uid-0", "ns", "test", false),
  2046  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2047  				"uid-0": {
  2048  					fullname: "test_",
  2049  				},
  2050  				"uid-1": {
  2051  					fullname: "test_",
  2052  				},
  2053  			},
  2054  			allowed:     true,
  2055  			allowedEver: true,
  2056  		},
  2057  		{
  2058  			// TODO: Do we want to allow a non-static pod with the same full name
  2059  			// as the started static pod?
  2060  			desc: "non-static pod when there is a started static pod with the same full name",
  2061  			pod:  newNamedPod("uid-0", "ns", "test", false),
  2062  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2063  				"uid-0": {
  2064  					fullname: "test_",
  2065  				},
  2066  				"uid-1": {
  2067  					fullname: "test_",
  2068  				},
  2069  			},
  2070  			startedStaticPodsByFullname: map[string]types.UID{
  2071  				"test_": types.UID("uid-1"),
  2072  			},
  2073  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2074  				"test_": types.UID("uid-1"),
  2075  			},
  2076  			allowed:     true,
  2077  			allowedEver: true,
  2078  		},
  2079  		{
  2080  			// TODO: Do we want to allow a static pod with the same full name as the
  2081  			// started non-static pod?
  2082  			desc: "static pod when there is a started non-static pod with the same full name",
  2083  			pod:  newNamedPod("uid-0", "ns", "test", false),
  2084  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2085  				"uid-0": {
  2086  					fullname: "test_",
  2087  				},
  2088  				"uid-1": {
  2089  					fullname: "test_",
  2090  				},
  2091  			},
  2092  			allowed:     true,
  2093  			allowedEver: true,
  2094  		},
  2095  		{
  2096  			desc: "static pod when there are no started static pods with the same full name",
  2097  			pod:  newStaticPod("uid-0", "foo"),
  2098  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2099  				"uid-0": {
  2100  					fullname: "foo_",
  2101  				},
  2102  				"uid-1": {
  2103  					fullname: "bar_",
  2104  				},
  2105  			},
  2106  			startedStaticPodsByFullname: map[string]types.UID{
  2107  				"bar_": types.UID("uid-1"),
  2108  			},
  2109  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2110  				"foo_": types.UID("uid-0"),
  2111  				"bar_": types.UID("uid-1"),
  2112  			},
  2113  			allowed:     true,
  2114  			allowedEver: true,
  2115  		},
  2116  		{
  2117  			desc: "static pod when there is a started static pod with the same full name",
  2118  			pod:  newStaticPod("uid-0", "foo"),
  2119  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2120  				"uid-0": {
  2121  					fullname: "foo_",
  2122  				},
  2123  				"uid-1": {
  2124  					fullname: "foo_",
  2125  				},
  2126  			},
  2127  			startedStaticPodsByFullname: map[string]types.UID{
  2128  				"foo_": types.UID("uid-1"),
  2129  			},
  2130  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2131  				"foo_": types.UID("uid-1"),
  2132  			},
  2133  			allowed:     false,
  2134  			allowedEver: true,
  2135  		},
  2136  		{
  2137  			desc: "static pod if the static pod has already started",
  2138  			pod:  newStaticPod("uid-0", "foo"),
  2139  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2140  				"uid-0": {
  2141  					fullname: "foo_",
  2142  				},
  2143  			},
  2144  			startedStaticPodsByFullname: map[string]types.UID{
  2145  				"foo_": types.UID("uid-0"),
  2146  			},
  2147  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2148  				"foo_": types.UID("uid-0"),
  2149  			},
  2150  			allowed:     true,
  2151  			allowedEver: true,
  2152  		},
  2153  		{
  2154  			desc: "static pod if the static pod is the first pod waiting to start",
  2155  			pod:  newStaticPod("uid-0", "foo"),
  2156  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2157  				"uid-0": {
  2158  					fullname: "foo_",
  2159  				},
  2160  			},
  2161  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  2162  				"foo_": {
  2163  					types.UID("uid-0"),
  2164  				},
  2165  			},
  2166  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2167  				"foo_": types.UID("uid-0"),
  2168  			},
  2169  			expectedWaitingToStartStaticPodsByFullname: make(map[string][]types.UID),
  2170  			allowed:     true,
  2171  			allowedEver: true,
  2172  		},
  2173  		{
  2174  			desc: "static pod if the static pod is not the first pod waiting to start",
  2175  			pod:  newStaticPod("uid-0", "foo"),
  2176  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2177  				"uid-0": {
  2178  					fullname: "foo_",
  2179  				},
  2180  				"uid-1": {
  2181  					fullname: "foo_",
  2182  				},
  2183  			},
  2184  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  2185  				"foo_": {
  2186  					types.UID("uid-1"),
  2187  					types.UID("uid-0"),
  2188  				},
  2189  			},
  2190  			expectedStartedStaticPodsByFullname: make(map[string]types.UID),
  2191  			expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
  2192  				"foo_": {
  2193  					types.UID("uid-1"),
  2194  					types.UID("uid-0"),
  2195  				},
  2196  			},
  2197  			allowed:     false,
  2198  			allowedEver: true,
  2199  		},
  2200  		{
  2201  			desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod",
  2202  			pod:  newStaticPod("uid-0", "foo"),
  2203  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2204  				"uid-0": {
  2205  					fullname: "foo_",
  2206  				},
  2207  				"uid-1": {
  2208  					fullname: "foo_",
  2209  				},
  2210  			},
  2211  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  2212  				"foo_": {
  2213  					types.UID("uid-2"),
  2214  					types.UID("uid-2"),
  2215  					types.UID("uid-3"),
  2216  					types.UID("uid-0"),
  2217  					types.UID("uid-1"),
  2218  				},
  2219  			},
  2220  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2221  				"foo_": types.UID("uid-0"),
  2222  			},
  2223  			expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
  2224  				"foo_": {
  2225  					types.UID("uid-1"),
  2226  				},
  2227  			},
  2228  			allowed:     true,
  2229  			allowedEver: true,
  2230  		},
  2231  		{
  2232  			desc: "static pod if the static pod is the first pod that is not termination requested and waiting to start",
  2233  			pod:  newStaticPod("uid-0", "foo"),
  2234  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2235  				"uid-0": {
  2236  					fullname: "foo_",
  2237  				},
  2238  				"uid-1": {
  2239  					fullname: "foo_",
  2240  				},
  2241  				"uid-2": {
  2242  					fullname:      "foo_",
  2243  					terminatingAt: time.Now(),
  2244  				},
  2245  				"uid-3": {
  2246  					fullname:     "foo_",
  2247  					terminatedAt: time.Now(),
  2248  				},
  2249  			},
  2250  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  2251  				"foo_": {
  2252  					types.UID("uid-2"),
  2253  					types.UID("uid-3"),
  2254  					types.UID("uid-0"),
  2255  					types.UID("uid-1"),
  2256  				},
  2257  			},
  2258  			expectedStartedStaticPodsByFullname: map[string]types.UID{
  2259  				"foo_": types.UID("uid-0"),
  2260  			},
  2261  			expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
  2262  				"foo_": {
  2263  					types.UID("uid-1"),
  2264  				},
  2265  			},
  2266  			allowed:     true,
  2267  			allowedEver: true,
  2268  		},
  2269  		{
  2270  			desc: "static pod if there is no sync status for the pod should be denied",
  2271  			pod:  newStaticPod("uid-0", "foo"),
  2272  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2273  				"uid-1": {
  2274  					fullname: "foo_",
  2275  				},
  2276  				"uid-2": {
  2277  					fullname:      "foo_",
  2278  					terminatingAt: time.Now(),
  2279  				},
  2280  				"uid-3": {
  2281  					fullname:     "foo_",
  2282  					terminatedAt: time.Now(),
  2283  				},
  2284  			},
  2285  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  2286  				"foo_": {
  2287  					types.UID("uid-1"),
  2288  				},
  2289  			},
  2290  			expectedStartedStaticPodsByFullname: map[string]types.UID{},
  2291  			expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
  2292  				"foo_": {
  2293  					types.UID("uid-1"),
  2294  				},
  2295  			},
  2296  			allowed:     false,
  2297  			allowedEver: false,
  2298  		},
  2299  		{
  2300  			desc: "static pod if the static pod is terminated should not be allowed",
  2301  			pod:  newStaticPod("uid-0", "foo"),
  2302  			podSyncStatuses: map[types.UID]*podSyncStatus{
  2303  				"uid-0": {
  2304  					fullname:      "foo_",
  2305  					terminatingAt: time.Now(),
  2306  				},
  2307  			},
  2308  			waitingToStartStaticPodsByFullname: map[string][]types.UID{
  2309  				"foo_": {
  2310  					types.UID("uid-2"),
  2311  					types.UID("uid-3"),
  2312  					types.UID("uid-0"),
  2313  					types.UID("uid-1"),
  2314  				},
  2315  			},
  2316  			expectedStartedStaticPodsByFullname: map[string]types.UID{},
  2317  			expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
  2318  				"foo_": {
  2319  					types.UID("uid-2"),
  2320  					types.UID("uid-3"),
  2321  					types.UID("uid-0"),
  2322  					types.UID("uid-1"),
  2323  				},
  2324  			},
  2325  			allowed:     false,
  2326  			allowedEver: false,
  2327  		},
  2328  	}
  2329  
  2330  	for _, tc := range testCases {
  2331  		t.Run(tc.desc, func(t *testing.T) {
  2332  			podWorkers, _, _ := createPodWorkers()
  2333  			if tc.podSyncStatuses != nil {
  2334  				podWorkers.podSyncStatuses = tc.podSyncStatuses
  2335  			}
  2336  			if tc.startedStaticPodsByFullname != nil {
  2337  				podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
  2338  			}
  2339  			if tc.waitingToStartStaticPodsByFullname != nil {
  2340  				podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
  2341  			}
  2342  			allowed, allowedEver := podWorkers.allowPodStart(tc.pod)
  2343  			if allowed != tc.allowed {
  2344  				if tc.allowed {
  2345  					t.Errorf("Pod should be allowed")
  2346  				} else {
  2347  					t.Errorf("Pod should not be allowed")
  2348  				}
  2349  			}
  2350  
  2351  			if allowedEver != tc.allowedEver {
  2352  				if tc.allowedEver {
  2353  					t.Errorf("Pod should be allowed ever")
  2354  				} else {
  2355  					t.Errorf("Pod should not be allowed ever")
  2356  				}
  2357  			}
  2358  
  2359  			// if maps are neither nil nor empty
  2360  			if len(podWorkers.startedStaticPodsByFullname) != 0 ||
  2361  				len(podWorkers.startedStaticPodsByFullname) != len(tc.expectedStartedStaticPodsByFullname) {
  2362  				if !reflect.DeepEqual(
  2363  					podWorkers.startedStaticPodsByFullname,
  2364  					tc.expectedStartedStaticPodsByFullname) {
  2365  					t.Errorf("startedStaticPodsByFullname: expected %v, got %v",
  2366  						tc.expectedStartedStaticPodsByFullname,
  2367  						podWorkers.startedStaticPodsByFullname)
  2368  				}
  2369  			}
  2370  
  2371  			// if maps are neither nil nor empty
  2372  			if len(podWorkers.waitingToStartStaticPodsByFullname) != 0 ||
  2373  				len(podWorkers.waitingToStartStaticPodsByFullname) != len(tc.expectedWaitingToStartStaticPodsByFullname) {
  2374  				if !reflect.DeepEqual(
  2375  					podWorkers.waitingToStartStaticPodsByFullname,
  2376  					tc.expectedWaitingToStartStaticPodsByFullname) {
  2377  					t.Errorf("waitingToStartStaticPodsByFullname: expected %v, got %v",
  2378  						tc.expectedWaitingToStartStaticPodsByFullname,
  2379  						podWorkers.waitingToStartStaticPodsByFullname)
  2380  				}
  2381  			}
  2382  		})
  2383  	}
  2384  }
  2385  

View as plain text