1
16
17 package kubelet
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "testing"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/runtime/schema"
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/apimachinery/pkg/watch"
33 "k8s.io/client-go/kubernetes"
34 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
35 "k8s.io/kubernetes/pkg/kubelet/util/manager"
36 "k8s.io/kubernetes/test/integration/framework"
37 testingclock "k8s.io/utils/clock/testing"
38 )
39
40 func TestWatchBasedManager(t *testing.T) {
41 testNamespace := "test-watch-based-manager"
42 server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
43 defer server.TearDownFn()
44
45 const n = 10
46 server.ClientConfig.QPS = 10000
47 server.ClientConfig.Burst = 10000
48 client, err := kubernetes.NewForConfig(server.ClientConfig)
49 if err != nil {
50 t.Fatalf("unexpected error: %v", err)
51 }
52 if _, err := client.CoreV1().Namespaces().Create(context.TODO(), (&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}), metav1.CreateOptions{}); err != nil {
53 t.Fatal(err)
54 }
55
56 listObj := func(namespace string, options metav1.ListOptions) (runtime.Object, error) {
57 return client.CoreV1().Secrets(namespace).List(context.TODO(), options)
58 }
59 watchObj := func(namespace string, options metav1.ListOptions) (watch.Interface, error) {
60 return client.CoreV1().Secrets(namespace).Watch(context.TODO(), options)
61 }
62 newObj := func() runtime.Object { return &v1.Secret{} }
63
64
65 isImmutable := func(_ runtime.Object) bool { return false }
66 fakeClock := testingclock.NewFakeClock(time.Now())
67
68 stopCh := make(chan struct{})
69 defer close(stopCh)
70 store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh)
71
72
73 t.Log(time.Now(), "creating 1000 secrets")
74 wg := sync.WaitGroup{}
75 errCh := make(chan error, n)
76 for i := 0; i < n; i++ {
77 wg.Add(1)
78 go func(i int) {
79 defer wg.Done()
80 for j := 0; j < 100; j++ {
81 name := fmt.Sprintf("s%d", i*100+j)
82 if _, err := client.CoreV1().Secrets(testNamespace).Create(context.TODO(), &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}, metav1.CreateOptions{}); err != nil {
83 select {
84 case errCh <- err:
85 default:
86 }
87 }
88 }
89 fmt.Print(".")
90 }(i)
91 }
92
93 wg.Wait()
94 select {
95 case err := <-errCh:
96 t.Fatal(err)
97 default:
98 }
99 t.Log(time.Now(), "finished creating 1000 secrets")
100
101
102 wg = sync.WaitGroup{}
103 errCh = make(chan error, n)
104 for i := 0; i < n; i++ {
105 wg.Add(1)
106 go func(i int) {
107 defer wg.Done()
108 for j := 0; j < 100; j++ {
109 name := fmt.Sprintf("s%d", i*100+j)
110 start := time.Now()
111 store.AddReference(testNamespace, name, types.UID(name))
112 err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
113 obj, err := store.Get(testNamespace, name)
114 if err != nil {
115 t.Logf("failed on %s, retrying: %v", name, err)
116 return false, nil
117 }
118 if obj.(*v1.Secret).Name != name {
119 return false, fmt.Errorf("wrong object: %v", obj.(*v1.Secret).Name)
120 }
121 return true, nil
122 })
123 if err != nil {
124 select {
125 case errCh <- fmt.Errorf("failed on :%s: %v", name, err):
126 default:
127 }
128 }
129 if d := time.Since(start); d > time.Second {
130 t.Logf("%s took %v", name, d)
131 }
132 }
133 }(i)
134 }
135
136 wg.Wait()
137 select {
138 case err = <-errCh:
139 t.Fatal(err)
140 default:
141 }
142 }
143
View as plain text