
Source file src/k8s.io/client-go/tools/watch/informerwatcher_test.go

Documentation: k8s.io/client-go/tools/watch

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package watch
    19  import (
    20  	"context"
    21  	"reflect"
    22  	goruntime "runtime"
    23  	"sort"
    24  	"testing"
    25  	"time"
    27  	"github.com/google/go-cmp/cmp"
    29  	corev1 "k8s.io/api/core/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/runtime/schema"
    34  	"k8s.io/apimachinery/pkg/util/dump"
    35  	"k8s.io/apimachinery/pkg/watch"
    36  	fakeclientset "k8s.io/client-go/kubernetes/fake"
    37  	testcore "k8s.io/client-go/testing"
    38  	"k8s.io/client-go/tools/cache"
    39  )
    41  // TestEventProcessorExit is expected to timeout if the event processor fails
    42  // to exit when stopped.
    43  func TestEventProcessorExit(t *testing.T) {
    44  	event := watch.Event{}
    46  	tests := []struct {
    47  		name  string
    48  		write func(e *eventProcessor)
    49  	}{
    50  		{
    51  			name: "exit on blocked read",
    52  			write: func(e *eventProcessor) {
    53  				e.push(event)
    54  			},
    55  		},
    56  		{
    57  			name: "exit on blocked write",
    58  			write: func(e *eventProcessor) {
    59  				e.push(event)
    60  				e.push(event)
    61  			},
    62  		},
    63  	}
    64  	for _, test := range tests {
    65  		t.Run(test.name, func(t *testing.T) {
    66  			out := make(chan watch.Event)
    67  			e := newEventProcessor(out)
    69  			test.write(e)
    71  			exited := make(chan struct{})
    72  			go func() {
    73  				e.run()
    74  				close(exited)
    75  			}()
    77  			<-out
    78  			e.stop()
    79  			goruntime.Gosched()
    80  			<-exited
    81  		})
    82  	}
    83  }
    85  type apiInt int
    87  func (apiInt) GetObjectKind() schema.ObjectKind { return nil }
    88  func (apiInt) DeepCopyObject() runtime.Object   { return nil }
    90  func TestEventProcessorOrdersEvents(t *testing.T) {
    91  	out := make(chan watch.Event)
    92  	e := newEventProcessor(out)
    93  	go e.run()
    95  	numProcessed := 0
    96  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    97  	go func() {
    98  		for i := 0; i < 1000; i++ {
    99  			e := <-out
   100  			if got, want := int(e.Object.(apiInt)), i; got != want {
   101  				t.Errorf("unexpected event: got=%d, want=%d", got, want)
   102  			}
   103  			numProcessed++
   104  		}
   105  		cancel()
   106  	}()
   108  	for i := 0; i < 1000; i++ {
   109  		e.push(watch.Event{Object: apiInt(i)})
   110  	}
   112  	<-ctx.Done()
   113  	e.stop()
   115  	if numProcessed != 1000 {
   116  		t.Errorf("unexpected number of events processed: %d", numProcessed)
   117  	}
   119  }
   121  type byEventTypeAndName []watch.Event
   123  func (a byEventTypeAndName) Len() int      { return len(a) }
   124  func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
   125  func (a byEventTypeAndName) Less(i, j int) bool {
   126  	if a[i].Type < a[j].Type {
   127  		return true
   128  	}
   130  	if a[i].Type > a[j].Type {
   131  		return false
   132  	}
   134  	return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
   135  }
   137  func TestNewInformerWatcher(t *testing.T) {
   138  	// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
   139  	tt := []struct {
   140  		name    string
   141  		objects []runtime.Object
   142  		events  []watch.Event
   143  	}{
   144  		{
   145  			name: "basic test",
   146  			objects: []runtime.Object{
   147  				&corev1.Secret{
   148  					ObjectMeta: metav1.ObjectMeta{
   149  						Name: "pod-1",
   150  					},
   151  					StringData: map[string]string{
   152  						"foo-1": "initial",
   153  					},
   154  				},
   155  				&corev1.Secret{
   156  					ObjectMeta: metav1.ObjectMeta{
   157  						Name: "pod-2",
   158  					},
   159  					StringData: map[string]string{
   160  						"foo-2": "initial",
   161  					},
   162  				},
   163  				&corev1.Secret{
   164  					ObjectMeta: metav1.ObjectMeta{
   165  						Name: "pod-3",
   166  					},
   167  					StringData: map[string]string{
   168  						"foo-3": "initial",
   169  					},
   170  				},
   171  			},
   172  			events: []watch.Event{
   173  				{
   174  					Type: watch.Added,
   175  					Object: &corev1.Secret{
   176  						ObjectMeta: metav1.ObjectMeta{
   177  							Name: "pod-4",
   178  						},
   179  						StringData: map[string]string{
   180  							"foo-4": "initial",
   181  						},
   182  					},
   183  				},
   184  				{
   185  					Type: watch.Modified,
   186  					Object: &corev1.Secret{
   187  						ObjectMeta: metav1.ObjectMeta{
   188  							Name: "pod-2",
   189  						},
   190  						StringData: map[string]string{
   191  							"foo-2": "new",
   192  						},
   193  					},
   194  				},
   195  				{
   196  					Type: watch.Deleted,
   197  					Object: &corev1.Secret{
   198  						ObjectMeta: metav1.ObjectMeta{
   199  							Name: "pod-3",
   200  						},
   201  					},
   202  				},
   203  			},
   204  		},
   205  	}
   207  	for _, tc := range tt {
   208  		t.Run(tc.name, func(t *testing.T) {
   209  			var expected []watch.Event
   210  			for _, o := range tc.objects {
   211  				expected = append(expected, watch.Event{
   212  					Type:   watch.Added,
   213  					Object: o.DeepCopyObject(),
   214  				})
   215  			}
   216  			for _, e := range tc.events {
   217  				expected = append(expected, *e.DeepCopy())
   218  			}
   220  			fake := fakeclientset.NewSimpleClientset(tc.objects...)
   221  			fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
   222  			fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
   224  			for _, e := range tc.events {
   225  				fakeWatch.Action(e.Type, e.Object)
   226  			}
   228  			lw := &cache.ListWatch{
   229  				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   230  					return fake.CoreV1().Secrets("").List(context.TODO(), options)
   231  				},
   232  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   233  					return fake.CoreV1().Secrets("").Watch(context.TODO(), options)
   234  				},
   235  			}
   236  			_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
   238  			var result []watch.Event
   239  		loop:
   240  			for {
   241  				var event watch.Event
   242  				var ok bool
   243  				select {
   244  				case event, ok = <-w.ResultChan():
   245  					if !ok {
   246  						t.Errorf("Failed to read event: channel is already closed!")
   247  						return
   248  					}
   250  					result = append(result, *event.DeepCopy())
   251  				case <-time.After(time.Second * 1):
   252  					// All the events are buffered -> this means we are done
   253  					// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
   254  					break loop
   255  				}
   256  			}
   258  			// Informers don't guarantee event order so we need to sort these arrays to compare them
   259  			sort.Sort(byEventTypeAndName(expected))
   260  			sort.Sort(byEventTypeAndName(result))
   262  			if !reflect.DeepEqual(expected, result) {
   263  				t.Errorf("\nexpected: %s,\ngot:      %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result))
   264  				return
   265  			}
   267  			// Fill in some data to test watch closing while there are some events to be read
   268  			for _, e := range tc.events {
   269  				fakeWatch.Action(e.Type, e.Object)
   270  			}
   272  			// Stop before reading all the data to make sure the informer can deal with closed channel
   273  			w.Stop()
   275  			<-done
   276  		})
   277  	}
   279  }
   281  // TestInformerWatcherDeletedFinalStateUnknown tests the code path when `DeleteFunc`
   282  // in `NewIndexerInformerWatcher` receives a `cache.DeletedFinalStateUnknown`
   283  // object from the underlying `DeltaFIFO`. The triggering condition is described
   284  // at https://github.com/kubernetes/kubernetes/blob/dc39ab2417bfddcec37be4011131c59921fdbe98/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go#L736-L739.
   285  //
   286  // Code from @liggitt
   287  func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) {
   288  	listCalls := 0
   289  	watchCalls := 0
   290  	lw := &cache.ListWatch{
   291  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   292  			retval := &corev1.SecretList{}
   293  			if listCalls == 0 {
   294  				// Return a list with items in it
   295  				retval.ResourceVersion = "1"
   296  				retval.Items = []corev1.Secret{{ObjectMeta: metav1.ObjectMeta{Name: "secret1", Namespace: "ns1", ResourceVersion: "123"}}}
   297  			} else {
   298  				// Return empty lists after the first call
   299  				retval.ResourceVersion = "2"
   300  			}
   301  			listCalls++
   302  			return retval, nil
   303  		},
   304  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   305  			w := watch.NewRaceFreeFake()
   306  			if options.ResourceVersion == "1" {
   307  				go func() {
   308  					// Close with a "Gone" error when trying to start a watch from the first list
   309  					w.Error(&apierrors.NewGone("gone").ErrStatus)
   310  					w.Stop()
   311  				}()
   312  			}
   313  			watchCalls++
   314  			return w, nil
   315  		},
   316  	}
   317  	_, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
   318  	defer w.Stop()
   320  	// Expect secret add
   321  	select {
   322  	case event, ok := <-w.ResultChan():
   323  		if !ok {
   324  			t.Fatal("unexpected close")
   325  		}
   326  		if event.Type != watch.Added {
   327  			t.Fatalf("expected Added event, got %#v", event)
   328  		}
   329  		if event.Object.(*corev1.Secret).ResourceVersion != "123" {
   330  			t.Fatalf("expected added Secret with rv=123, got %#v", event.Object)
   331  		}
   332  	case <-time.After(time.Second * 10):
   333  		t.Fatal("timeout")
   334  	}
   336  	// Expect secret delete because the relist was missing the secret
   337  	select {
   338  	case event, ok := <-w.ResultChan():
   339  		if !ok {
   340  			t.Fatal("unexpected close")
   341  		}
   342  		if event.Type != watch.Deleted {
   343  			t.Fatalf("expected Deleted event, got %#v", event)
   344  		}
   345  		if event.Object.(*corev1.Secret).ResourceVersion != "123" {
   346  			t.Fatalf("expected deleted Secret with rv=123, got %#v", event.Object)
   347  		}
   348  	case <-time.After(time.Second * 10):
   349  		t.Fatal("timeout")
   350  	}
   352  	w.Stop()
   353  	select {
   354  	case <-done:
   355  	case <-time.After(time.Second * 10):
   356  		t.Fatal("timeout")
   357  	}
   359  	if listCalls < 2 {
   360  		t.Fatalf("expected at least 2 list calls, got %d", listCalls)
   361  	}
   362  	if watchCalls < 1 {
   363  		t.Fatalf("expected at least 1 watch call, got %d", watchCalls)
   364  	}
   365  }

View as plain text