...

Source file src/k8s.io/kubernetes/test/integration/apiserver/watchcache_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	clientset "k8s.io/client-go/kubernetes"
    30  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    31  	"k8s.io/kubernetes/pkg/controlplane"
    32  	"k8s.io/kubernetes/pkg/controlplane/reconcilers"
    33  	"k8s.io/kubernetes/test/integration/framework"
    34  	"k8s.io/kubernetes/test/utils/ktesting"
    35  )
    36  
    37  // setup create kube-apiserver backed up by two separate etcds,
    38  // with one of them containing events and the other all other objects.
    39  func multiEtcdSetup(ctx context.Context, t *testing.T) (clientset.Interface, framework.TearDownFunc) {
    40  	etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"}
    41  	etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil)
    42  	if err != nil {
    43  		t.Fatalf("Couldn't start etcd: %v", err)
    44  	}
    45  
    46  	etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil)
    47  	if err != nil {
    48  		t.Fatalf("Couldn't start etcd: %v", err)
    49  	}
    50  
    51  	etcdOptions := framework.DefaultEtcdOptions()
    52  	// Overwrite etcd setup to our custom etcd instances.
    53  	etcdOptions.StorageConfig.Transport.ServerList = []string{etcd0URL}
    54  	etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)}
    55  	etcdOptions.EnableWatchCache = true
    56  
    57  	clientSet, _, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
    58  		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
    59  			// Ensure we're using the same etcd across apiserver restarts.
    60  			opts.Etcd = etcdOptions
    61  		},
    62  		ModifyServerConfig: func(config *controlplane.Config) {
    63  			// Switch off endpoints reconciler to avoid unnecessary operations.
    64  			config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
    65  		},
    66  	})
    67  
    68  	closeFn := func() {
    69  		tearDownFn()
    70  		stopEtcd1()
    71  		stopEtcd0()
    72  	}
    73  
    74  	// Wait for apiserver to be stabilized.
    75  	// Everything but default service creation is checked in StartTestServer above by
    76  	// waiting for post start hooks, so we just wait for default service to exist.
    77  	// TODO(wojtek-t): Figure out less fragile way.
    78  	if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
    79  		_, err := clientSet.CoreV1().Services("default").Get(ctx, "kubernetes", metav1.GetOptions{})
    80  		return err == nil, nil
    81  	}); err != nil {
    82  		t.Fatalf("Failed to wait for kubernetes service: %v:", err)
    83  	}
    84  	return clientSet, closeFn
    85  }
    86  
    87  func TestWatchCacheUpdatedByEtcd(t *testing.T) {
    88  	tCtx := ktesting.Init(t)
    89  	c, closeFn := multiEtcdSetup(tCtx, t)
    90  	defer closeFn()
    91  
    92  	makeConfigMap := func(name string) *v1.ConfigMap {
    93  		return &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name}}
    94  	}
    95  	makeSecret := func(name string) *v1.Secret {
    96  		return &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}
    97  	}
    98  	makeEvent := func(name string) *v1.Event {
    99  		return &v1.Event{ObjectMeta: metav1.ObjectMeta{Name: name}}
   100  	}
   101  
   102  	cm, err := c.CoreV1().ConfigMaps("default").Create(tCtx, makeConfigMap("name"), metav1.CreateOptions{})
   103  	if err != nil {
   104  		t.Errorf("Couldn't create configmap: %v", err)
   105  	}
   106  	ev, err := c.CoreV1().Events("default").Create(tCtx, makeEvent("name"), metav1.CreateOptions{})
   107  	if err != nil {
   108  		t.Errorf("Couldn't create event: %v", err)
   109  	}
   110  
   111  	listOptions := metav1.ListOptions{
   112  		ResourceVersion:      "0",
   113  		ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   114  	}
   115  
   116  	// Wait until listing from cache returns resource version of corresponding
   117  	// resources (being the last updates).
   118  	t.Logf("Waiting for configmaps watchcache synced to %s", cm.ResourceVersion)
   119  	if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
   120  		res, err := c.CoreV1().ConfigMaps("default").List(tCtx, listOptions)
   121  		if err != nil {
   122  			return false, nil
   123  		}
   124  		return res.ResourceVersion == cm.ResourceVersion, nil
   125  	}); err != nil {
   126  		t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
   127  	}
   128  	t.Logf("Waiting for events watchcache synced to %s", ev.ResourceVersion)
   129  	if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
   130  		res, err := c.CoreV1().Events("default").List(tCtx, listOptions)
   131  		if err != nil {
   132  			return false, nil
   133  		}
   134  		return res.ResourceVersion == ev.ResourceVersion, nil
   135  	}); err != nil {
   136  		t.Errorf("Failed to wait for events watchcache synced: %v", err)
   137  	}
   138  
   139  	// Create a secret, that is stored in the same etcd as configmap, but
   140  	// different than events.
   141  	se, err := c.CoreV1().Secrets("default").Create(tCtx, makeSecret("name"), metav1.CreateOptions{})
   142  	if err != nil {
   143  		t.Errorf("Couldn't create secret: %v", err)
   144  	}
   145  
   146  	t.Logf("Waiting for configmaps watchcache synced to %s", se.ResourceVersion)
   147  	if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
   148  		res, err := c.CoreV1().ConfigMaps("default").List(tCtx, listOptions)
   149  		if err != nil {
   150  			return false, nil
   151  		}
   152  		return res.ResourceVersion == se.ResourceVersion, nil
   153  	}); err != nil {
   154  		t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
   155  	}
   156  	t.Logf("Waiting for events watchcache NOT synced to %s", se.ResourceVersion)
   157  	if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (bool, error) {
   158  		res, err := c.CoreV1().Events("default").List(tCtx, listOptions)
   159  		if err != nil {
   160  			return false, nil
   161  		}
   162  		return res.ResourceVersion == se.ResourceVersion, nil
   163  	}); err == nil || err != wait.ErrWaitTimeout {
   164  		t.Errorf("Events watchcache unexpected synced: %v", err)
   165  	}
   166  }
   167  
   168  func BenchmarkListFromWatchCache(b *testing.B) {
   169  	tCtx := ktesting.Init(b)
   170  	c, _, tearDownFn := framework.StartTestServer(tCtx, b, framework.TestServerSetup{
   171  		ModifyServerConfig: func(config *controlplane.Config) {
   172  			// Switch off endpoints reconciler to avoid unnecessary operations.
   173  			config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
   174  		},
   175  	})
   176  	defer tearDownFn()
   177  
   178  	namespaces, secretsPerNamespace := 100, 1000
   179  	wg := sync.WaitGroup{}
   180  
   181  	errCh := make(chan error, namespaces)
   182  	for i := 0; i < namespaces; i++ {
   183  		wg.Add(1)
   184  		index := i
   185  		go func() {
   186  			defer wg.Done()
   187  
   188  			ns := &v1.Namespace{
   189  				ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("namespace-%d", index)},
   190  			}
   191  			ns, err := c.CoreV1().Namespaces().Create(tCtx, ns, metav1.CreateOptions{})
   192  			if err != nil {
   193  				errCh <- err
   194  				return
   195  			}
   196  
   197  			for j := 0; j < secretsPerNamespace; j++ {
   198  				secret := &v1.Secret{
   199  					ObjectMeta: metav1.ObjectMeta{
   200  						Name: fmt.Sprintf("secret-%d", j),
   201  					},
   202  				}
   203  				_, err := c.CoreV1().Secrets(ns.Name).Create(tCtx, secret, metav1.CreateOptions{})
   204  				if err != nil {
   205  					errCh <- err
   206  					return
   207  				}
   208  			}
   209  		}()
   210  	}
   211  
   212  	wg.Wait()
   213  	close(errCh)
   214  	for err := range errCh {
   215  		b.Error(err)
   216  	}
   217  
   218  	b.ResetTimer()
   219  
   220  	opts := metav1.ListOptions{
   221  		ResourceVersion: "0",
   222  	}
   223  	for i := 0; i < b.N; i++ {
   224  		secrets, err := c.CoreV1().Secrets("").List(tCtx, opts)
   225  		if err != nil {
   226  			b.Errorf("failed to list secrets: %v", err)
   227  		}
   228  		b.Logf("Number of secrets: %d", len(secrets.Items))
   229  	}
   230  }
   231  

View as plain text