...
1
16
17 package integration
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "testing"
24 "time"
25
26 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
27 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
28 "k8s.io/apiextensions-apiserver/test/integration/fixtures"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/apimachinery/pkg/watch"
33 "k8s.io/client-go/dynamic"
34 )
35
36 func TestChangeCRD(t *testing.T) {
37 tearDown, config, _, err := fixtures.StartDefaultServer(t)
38 if err != nil {
39 t.Fatal(err)
40 }
41 defer tearDown()
42 config.QPS = 1000
43 config.Burst = 1000
44 apiExtensionsClient, err := clientset.NewForConfig(config)
45 if err != nil {
46 t.Fatal(err)
47 }
48 dynamicClient, err := dynamic.NewForConfig(config)
49 if err != nil {
50 t.Fatal(err)
51 }
52
53 noxuDefinition := fixtures.NewNoxuV1CustomResourceDefinition(apiextensionsv1.NamespaceScoped)
54 noxuDefinition, err = fixtures.CreateNewV1CustomResourceDefinition(noxuDefinition, apiExtensionsClient, dynamicClient)
55 if err != nil {
56 t.Fatal(err)
57 }
58
59 ns := "default"
60 noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
61
62 stopChan := make(chan struct{})
63
64 wg := &sync.WaitGroup{}
65
66
67 wg.Add(1)
68 go func() {
69 defer wg.Done()
70 for {
71 select {
72 case <-stopChan:
73 return
74 default:
75 }
76
77 time.Sleep(10 * time.Millisecond)
78
79 noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
80 if err != nil {
81 t.Error(err)
82 continue
83 }
84 if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
85 v2 := noxuDefinitionToUpdate.Spec.Versions[0]
86 v2.Name = "v2"
87 v2.Served = true
88 v2.Storage = false
89 noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
90 } else {
91 noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
92 }
93 if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
94 t.Error(err)
95 continue
96 }
97 }
98 }()
99
100
101 for i := 0; i < 10; i++ {
102 wg.Add(1)
103 go func(i int) {
104 defer wg.Done()
105 noxuInstanceToCreate := fixtures.NewNoxuInstance(ns, fmt.Sprintf("foo-%d", i))
106 if _, err := noxuNamespacedResourceClient.Create(context.TODO(), noxuInstanceToCreate, metav1.CreateOptions{}); err != nil {
107 t.Error(err)
108 return
109 }
110 for {
111 time.Sleep(10 * time.Millisecond)
112 select {
113 case <-stopChan:
114 return
115 default:
116 if _, err := noxuNamespacedResourceClient.Get(context.TODO(), noxuInstanceToCreate.GetName(), metav1.GetOptions{}); err != nil {
117 t.Error(err)
118 continue
119 }
120 }
121 }
122 }(i)
123
124 wg.Add(1)
125 go func(i int) {
126 defer wg.Done()
127 for {
128 time.Sleep(10 * time.Millisecond)
129 select {
130 case <-stopChan:
131 return
132 default:
133 w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
134 if err != nil {
135 t.Errorf("unexpected error establishing watch: %v", err)
136 continue
137 }
138 for event := range w.ResultChan() {
139 switch event.Type {
140 case watch.Added, watch.Modified, watch.Deleted:
141
142 default:
143 t.Errorf("unexpected watch event: %#v", event)
144 }
145 }
146 }
147 }
148 }(i)
149 }
150
151
152 time.Sleep(5 * time.Second)
153
154
155 close(stopChan)
156
157
158 drained := make(chan struct{})
159 go func() {
160 defer close(drained)
161 wg.Wait()
162 }()
163
164 select {
165 case <-drained:
166 case <-time.After(wait.ForeverTestTimeout):
167 t.Error("timed out waiting for clients to complete")
168 }
169 }
170
View as plain text