...

Source file src/k8s.io/client-go/tools/watch/until_test.go

Documentation: k8s.io/client-go/tools/watch

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package watch
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"reflect"
    24  	"strings"
    25  	"testing"
    26  	"time"
    27  
    28  	corev1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	"k8s.io/apimachinery/pkg/runtime/schema"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/apimachinery/pkg/watch"
    34  	fakeclient "k8s.io/client-go/kubernetes/fake"
    35  	"k8s.io/client-go/tools/cache"
    36  )
    37  
    38  type fakePod struct {
    39  }
    40  
    41  func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
    42  func (obj *fakePod) DeepCopyObject() runtime.Object   { panic("DeepCopyObject not supported by fakePod") }
    43  
    44  func TestUntil(t *testing.T) {
    45  	fw := watch.NewFake()
    46  	go func() {
    47  		var obj *fakePod
    48  		fw.Add(obj)
    49  		fw.Modify(obj)
    50  	}()
    51  	conditions := []ConditionFunc{
    52  		func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
    53  		func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil },
    54  	}
    55  
    56  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    57  	defer cancel()
    58  
    59  	lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
    60  	if err != nil {
    61  		t.Fatalf("expected nil error, got %#v", err)
    62  	}
    63  	if lastEvent == nil {
    64  		t.Fatal("expected an event")
    65  	}
    66  	if lastEvent.Type != watch.Modified {
    67  		t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
    68  	}
    69  	if got, isPod := lastEvent.Object.(*fakePod); !isPod {
    70  		t.Fatalf("expected a pod event, got %#v", got)
    71  	}
    72  }
    73  
    74  func TestUntilMultipleConditions(t *testing.T) {
    75  	fw := watch.NewFake()
    76  	go func() {
    77  		var obj *fakePod
    78  		fw.Add(obj)
    79  	}()
    80  	conditions := []ConditionFunc{
    81  		func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
    82  		func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
    83  	}
    84  
    85  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    86  	defer cancel()
    87  
    88  	lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
    89  	if err != nil {
    90  		t.Fatalf("expected nil error, got %#v", err)
    91  	}
    92  	if lastEvent == nil {
    93  		t.Fatal("expected an event")
    94  	}
    95  	if lastEvent.Type != watch.Added {
    96  		t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
    97  	}
    98  	if got, isPod := lastEvent.Object.(*fakePod); !isPod {
    99  		t.Fatalf("expected a pod event, got %#v", got)
   100  	}
   101  }
   102  
   103  func TestUntilMultipleConditionsFail(t *testing.T) {
   104  	fw := watch.NewFake()
   105  	go func() {
   106  		var obj *fakePod
   107  		fw.Add(obj)
   108  	}()
   109  	conditions := []ConditionFunc{
   110  		func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
   111  		func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
   112  		func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil },
   113  	}
   114  
   115  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   116  	defer cancel()
   117  
   118  	lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
   119  	if err != wait.ErrWaitTimeout {
   120  		t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
   121  	}
   122  	if lastEvent == nil {
   123  		t.Fatal("expected an event")
   124  	}
   125  	if lastEvent.Type != watch.Added {
   126  		t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
   127  	}
   128  	if got, isPod := lastEvent.Object.(*fakePod); !isPod {
   129  		t.Fatalf("expected a pod event, got %#v", got)
   130  	}
   131  }
   132  
   133  func TestUntilTimeout(t *testing.T) {
   134  	fw := watch.NewFake()
   135  	go func() {
   136  		var obj *fakePod
   137  		fw.Add(obj)
   138  		fw.Modify(obj)
   139  	}()
   140  	conditions := []ConditionFunc{
   141  		func(event watch.Event) (bool, error) {
   142  			return event.Type == watch.Added, nil
   143  		},
   144  		func(event watch.Event) (bool, error) {
   145  			return event.Type == watch.Modified, nil
   146  		},
   147  	}
   148  
   149  	lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...)
   150  	if err != nil {
   151  		t.Fatalf("expected nil error, got %#v", err)
   152  	}
   153  	if lastEvent == nil {
   154  		t.Fatal("expected an event")
   155  	}
   156  	if lastEvent.Type != watch.Modified {
   157  		t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
   158  	}
   159  	if got, isPod := lastEvent.Object.(*fakePod); !isPod {
   160  		t.Fatalf("expected a pod event, got %#v", got)
   161  	}
   162  }
   163  
   164  func TestUntilErrorCondition(t *testing.T) {
   165  	fw := watch.NewFake()
   166  	go func() {
   167  		var obj *fakePod
   168  		fw.Add(obj)
   169  	}()
   170  	expected := "something bad"
   171  	conditions := []ConditionFunc{
   172  		func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
   173  		func(event watch.Event) (bool, error) { return false, errors.New(expected) },
   174  	}
   175  
   176  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
   177  	defer cancel()
   178  
   179  	_, err := UntilWithoutRetry(ctx, fw, conditions...)
   180  	if err == nil {
   181  		t.Fatal("expected an error")
   182  	}
   183  	if !strings.Contains(err.Error(), expected) {
   184  		t.Fatalf("expected %q in error string, got %q", expected, err.Error())
   185  	}
   186  }
   187  
   188  func TestUntilWithSync(t *testing.T) {
   189  	// FIXME: test preconditions
   190  	tt := []struct {
   191  		name             string
   192  		lw               *cache.ListWatch
   193  		preconditionFunc PreconditionFunc
   194  		conditionFunc    ConditionFunc
   195  		expectedErr      error
   196  		expectedEvent    *watch.Event
   197  	}{
   198  		{
   199  			name: "doesn't wait for sync with no precondition",
   200  			lw: &cache.ListWatch{
   201  				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   202  					select {}
   203  				},
   204  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   205  					select {}
   206  				},
   207  			},
   208  			preconditionFunc: nil,
   209  			conditionFunc: func(e watch.Event) (bool, error) {
   210  				return true, nil
   211  			},
   212  			expectedErr:   wait.ErrWaitTimeout,
   213  			expectedEvent: nil,
   214  		},
   215  		{
   216  			name: "waits indefinitely with precondition if it can't sync",
   217  			lw: &cache.ListWatch{
   218  				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   219  					select {}
   220  				},
   221  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   222  					select {}
   223  				},
   224  			},
   225  			preconditionFunc: func(store cache.Store) (bool, error) {
   226  				return true, nil
   227  			},
   228  			conditionFunc: func(e watch.Event) (bool, error) {
   229  				return true, nil
   230  			},
   231  			expectedErr:   fmt.Errorf("UntilWithSync: unable to sync caches: %w", context.DeadlineExceeded),
   232  			expectedEvent: nil,
   233  		},
   234  		{
   235  			name: "precondition can stop the loop",
   236  			lw: func() *cache.ListWatch {
   237  				fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
   238  
   239  				return &cache.ListWatch{
   240  					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   241  						return fakeclient.CoreV1().Secrets("").List(context.TODO(), options)
   242  					},
   243  					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   244  						return fakeclient.CoreV1().Secrets("").Watch(context.TODO(), options)
   245  					},
   246  				}
   247  			}(),
   248  			preconditionFunc: func(store cache.Store) (bool, error) {
   249  				_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"})
   250  				if err != nil {
   251  					return true, err
   252  				}
   253  				if exists {
   254  					return true, nil
   255  				}
   256  				return false, nil
   257  			},
   258  			conditionFunc: func(e watch.Event) (bool, error) {
   259  				return true, errors.New("should never reach this")
   260  			},
   261  			expectedErr:   nil,
   262  			expectedEvent: nil,
   263  		},
   264  		{
   265  			name: "precondition lets it proceed to regular condition",
   266  			lw: func() *cache.ListWatch {
   267  				fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
   268  
   269  				return &cache.ListWatch{
   270  					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   271  						return fakeclient.CoreV1().Secrets("").List(context.TODO(), options)
   272  					},
   273  					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   274  						return fakeclient.CoreV1().Secrets("").Watch(context.TODO(), options)
   275  					},
   276  				}
   277  			}(),
   278  			preconditionFunc: func(store cache.Store) (bool, error) {
   279  				return false, nil
   280  			},
   281  			conditionFunc: func(e watch.Event) (bool, error) {
   282  				if e.Type == watch.Added {
   283  					return true, nil
   284  				}
   285  				panic("no other events are expected")
   286  			},
   287  			expectedErr:   nil,
   288  			expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
   289  		},
   290  	}
   291  
   292  	for _, tc := range tt {
   293  		t.Run(tc.name, func(t *testing.T) {
   294  			// Informer waits for caches to sync by polling in 100ms intervals,
   295  			// timeout needs to be reasonably higher
   296  			ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
   297  			defer cancel()
   298  
   299  			event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc)
   300  
   301  			if !reflect.DeepEqual(err, tc.expectedErr) {
   302  				t.Errorf("expected error %#v, got %#v", tc.expectedErr, err)
   303  			}
   304  
   305  			if !reflect.DeepEqual(event, tc.expectedEvent) {
   306  				t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event)
   307  			}
   308  		})
   309  	}
   310  }
   311  

View as plain text