...

Source file src/k8s.io/kubernetes/test/integration/apimachinery/watchlist_test.go

Documentation: k8s.io/kubernetes/test/integration/apimachinery

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apimachinery
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  	"testing"
    24  	"time"
    25  
    26  	"github.com/stretchr/testify/require"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  	v1 "k8s.io/api/core/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/apimachinery/pkg/watch"
    34  	"k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/tools/cache"
    36  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    37  	"k8s.io/kubernetes/test/integration/framework"
    38  	"k8s.io/utils/ptr"
    39  )
    40  
    41  func TestReflectorWatchListFallback(t *testing.T) {
    42  	ctx := context.TODO()
    43  
    44  	t.Log("Starting etcd that will be used by two different instances of kube-apiserver")
    45  	etcdURL, etcdTearDownFn, err := framework.RunCustomEtcd("etcd_watchlist", []string{"--experimental-watch-progress-notify-interval", "1s"}, nil)
    46  	require.NoError(t, err)
    47  	defer etcdTearDownFn()
    48  	etcdOptions := framework.DefaultEtcdOptions()
    49  	etcdOptions.StorageConfig.Transport.ServerList = []string{etcdURL}
    50  
    51  	t.Log("Starting the first server with the WatchList feature enabled")
    52  	server1 := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--feature-gates=WatchList=true"}, &etcdOptions.StorageConfig)
    53  	defer server1.TearDownFn()
    54  	clientSet, err := kubernetes.NewForConfig(server1.ClientConfig)
    55  	require.NoError(t, err)
    56  
    57  	ns := framework.CreateNamespaceOrDie(clientSet, "reflector-fallback-watchlist", t)
    58  	defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
    59  
    60  	t.Logf("Adding 5 secrets to %s namespace", ns.Name)
    61  	for i := 1; i <= 5; i++ {
    62  		_, err := clientSet.CoreV1().Secrets(ns.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
    63  		require.NoError(t, err)
    64  	}
    65  
    66  	t.Log("Creating a secret reflector that will use the WatchList feature to synchronise the store")
    67  	store := &wrappedStore{Store: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)}
    68  	lw := &wrappedListWatch{&cache.ListWatch{}}
    69  	lw.SetClient(ctx, clientSet, ns)
    70  	target := cache.NewReflector(lw, &v1.Secret{}, store, time.Duration(0))
    71  	target.UseWatchList = ptr.To(true)
    72  
    73  	t.Log("Waiting until the secret reflector synchronises to the store (call to the Replace method)")
    74  	reflectorCtx, reflectorCtxCancel := context.WithCancel(context.Background())
    75  	defer reflectorCtxCancel()
    76  	store.setCancelOnReplace(reflectorCtxCancel)
    77  	err = target.ListAndWatch(reflectorCtx.Done())
    78  	require.NoError(t, err)
    79  
    80  	t.Log("Verifying if the secret reflector was properly synchronised")
    81  	verifyStore(t, ctx, clientSet, store, ns)
    82  
    83  	t.Log("Starting the second server with the WatchList feature disabled")
    84  	server2 := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--feature-gates=WatchList=false"}, &etcdOptions.StorageConfig)
    85  	defer server2.TearDownFn()
    86  	clientSet2, err := kubernetes.NewForConfig(server2.ClientConfig)
    87  	require.NoError(t, err)
    88  
    89  	t.Log("Pointing the ListWatcher used by the secret reflector to the second server (with the WatchList feature disabled)")
    90  	lw.SetClient(ctx, clientSet2, ns)
    91  	reflectorCtx, reflectorCtxCancel = context.WithCancel(context.Background())
    92  	defer reflectorCtxCancel()
    93  	store.setCancelOnReplace(reflectorCtxCancel)
    94  	err = target.ListAndWatch(reflectorCtx.Done())
    95  	require.NoError(t, err)
    96  
    97  	t.Log("Verifying if the secret reflector was properly synchronised")
    98  	verifyStore(t, ctx, clientSet, store, ns)
    99  }
   100  
   101  // TODO(#115478): refactor with e2e/apimachinery/watchlist
   102  func verifyStore(t *testing.T, ctx context.Context, clientSet kubernetes.Interface, store cache.Store, namespace *v1.Namespace) {
   103  	t.Logf("Listing secrets directly from the server from %s namespace", namespace.Name)
   104  	expectedSecretsList, err := clientSet.CoreV1().Secrets(namespace.Name).List(ctx, metav1.ListOptions{})
   105  	require.NoError(t, err)
   106  	expectedSecrets := expectedSecretsList.Items
   107  
   108  	err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
   109  		t.Log("Comparing secrets retrieved directly from the server with the ones that have been streamed to the secret reflector")
   110  		rawStreamedSecrets := store.List()
   111  		streamedSecrets := make([]v1.Secret, 0, len(rawStreamedSecrets))
   112  		for _, rawSecret := range rawStreamedSecrets {
   113  			streamedSecrets = append(streamedSecrets, *rawSecret.(*v1.Secret))
   114  		}
   115  		sort.Sort(byName(expectedSecrets))
   116  		sort.Sort(byName(streamedSecrets))
   117  		return cmp.Equal(expectedSecrets, streamedSecrets), nil
   118  	})
   119  	require.NoError(t, err)
   120  }
   121  
   122  type byName []v1.Secret
   123  
   124  func (a byName) Len() int           { return len(a) }
   125  func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
   126  func (a byName) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
   127  
   128  func newSecret(name string) *v1.Secret {
   129  	return &v1.Secret{
   130  		ObjectMeta: metav1.ObjectMeta{Name: name},
   131  	}
   132  }
   133  
   134  type wrappedStore struct {
   135  	cache.Store
   136  	ctxCancel context.CancelFunc
   137  }
   138  
   139  func (s *wrappedStore) Replace(items []interface{}, rv string) error {
   140  	s.ctxCancel()
   141  	return s.Store.Replace(items, rv)
   142  }
   143  
   144  func (s *wrappedStore) setCancelOnReplace(ctxCancel context.CancelFunc) {
   145  	s.ctxCancel = ctxCancel
   146  }
   147  
   148  type wrappedListWatch struct {
   149  	*cache.ListWatch
   150  }
   151  
   152  func (lw *wrappedListWatch) SetClient(ctx context.Context, clientSet kubernetes.Interface, ns *v1.Namespace) {
   153  	lw.ListFunc = func(options metav1.ListOptions) (runtime.Object, error) {
   154  		return clientSet.CoreV1().Secrets(ns.Name).List(ctx, options)
   155  	}
   156  	lw.WatchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
   157  		return clientSet.CoreV1().Secrets(ns.Name).Watch(ctx, options)
   158  	}
   159  }
   160  

View as plain text