...

Source file src/k8s.io/kubernetes/test/integration/kubelet/watch_manager_test.go

Documentation: k8s.io/kubernetes/test/integration/kubelet

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	"k8s.io/apimachinery/pkg/runtime/schema"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	"k8s.io/apimachinery/pkg/watch"
    33  	"k8s.io/client-go/kubernetes"
    34  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    35  	"k8s.io/kubernetes/pkg/kubelet/util/manager"
    36  	"k8s.io/kubernetes/test/integration/framework"
    37  	testingclock "k8s.io/utils/clock/testing"
    38  )
    39  
    40  func TestWatchBasedManager(t *testing.T) {
    41  	testNamespace := "test-watch-based-manager"
    42  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
    43  	defer server.TearDownFn()
    44  
    45  	const n = 10
    46  	server.ClientConfig.QPS = 10000
    47  	server.ClientConfig.Burst = 10000
    48  	client, err := kubernetes.NewForConfig(server.ClientConfig)
    49  	if err != nil {
    50  		t.Fatalf("unexpected error: %v", err)
    51  	}
    52  	if _, err := client.CoreV1().Namespaces().Create(context.TODO(), (&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}), metav1.CreateOptions{}); err != nil {
    53  		t.Fatal(err)
    54  	}
    55  
    56  	listObj := func(namespace string, options metav1.ListOptions) (runtime.Object, error) {
    57  		return client.CoreV1().Secrets(namespace).List(context.TODO(), options)
    58  	}
    59  	watchObj := func(namespace string, options metav1.ListOptions) (watch.Interface, error) {
    60  		return client.CoreV1().Secrets(namespace).Watch(context.TODO(), options)
    61  	}
    62  	newObj := func() runtime.Object { return &v1.Secret{} }
    63  	// We want all watches to be up and running to stress test it.
    64  	// So don't treat any secret as immutable here.
    65  	isImmutable := func(_ runtime.Object) bool { return false }
    66  	fakeClock := testingclock.NewFakeClock(time.Now())
    67  
    68  	stopCh := make(chan struct{})
    69  	defer close(stopCh)
    70  	store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh)
    71  
    72  	// create 1000 secrets in parallel
    73  	t.Log(time.Now(), "creating 1000 secrets")
    74  	wg := sync.WaitGroup{}
    75  	errCh := make(chan error, n)
    76  	for i := 0; i < n; i++ {
    77  		wg.Add(1)
    78  		go func(i int) {
    79  			defer wg.Done()
    80  			for j := 0; j < 100; j++ {
    81  				name := fmt.Sprintf("s%d", i*100+j)
    82  				if _, err := client.CoreV1().Secrets(testNamespace).Create(context.TODO(), &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}, metav1.CreateOptions{}); err != nil {
    83  					select {
    84  					case errCh <- err:
    85  					default:
    86  					}
    87  				}
    88  			}
    89  			fmt.Print(".")
    90  		}(i)
    91  	}
    92  
    93  	wg.Wait()
    94  	select {
    95  	case err := <-errCh:
    96  		t.Fatal(err)
    97  	default:
    98  	}
    99  	t.Log(time.Now(), "finished creating 1000 secrets")
   100  
   101  	// fetch all secrets
   102  	wg = sync.WaitGroup{}
   103  	errCh = make(chan error, n)
   104  	for i := 0; i < n; i++ {
   105  		wg.Add(1)
   106  		go func(i int) {
   107  			defer wg.Done()
   108  			for j := 0; j < 100; j++ {
   109  				name := fmt.Sprintf("s%d", i*100+j)
   110  				start := time.Now()
   111  				store.AddReference(testNamespace, name, types.UID(name))
   112  				err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
   113  					obj, err := store.Get(testNamespace, name)
   114  					if err != nil {
   115  						t.Logf("failed on %s, retrying: %v", name, err)
   116  						return false, nil
   117  					}
   118  					if obj.(*v1.Secret).Name != name {
   119  						return false, fmt.Errorf("wrong object: %v", obj.(*v1.Secret).Name)
   120  					}
   121  					return true, nil
   122  				})
   123  				if err != nil {
   124  					select {
   125  					case errCh <- fmt.Errorf("failed on :%s: %v", name, err):
   126  					default:
   127  					}
   128  				}
   129  				if d := time.Since(start); d > time.Second {
   130  					t.Logf("%s took %v", name, d)
   131  				}
   132  			}
   133  		}(i)
   134  	}
   135  
   136  	wg.Wait()
   137  	select {
   138  	case err = <-errCh:
   139  		t.Fatal(err)
   140  	default:
   141  	}
   142  }
   143  

View as plain text