1
16
17 package apiserver
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "net/http"
24 "net/http/httptest"
25 "strconv"
26 "strings"
27 "sync"
28 "sync/atomic"
29 "testing"
30 "time"
31
32 "github.com/google/go-cmp/cmp"
33 fuzz "github.com/google/gofuzz"
34 "github.com/stretchr/testify/require"
35
36 apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
37 apidiscoveryv2beta1 "k8s.io/api/apidiscovery/v2beta1"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 apidiscoveryv2scheme "k8s.io/apiserver/pkg/apis/apidiscovery/v2"
42 "k8s.io/apiserver/pkg/endpoints"
43 "k8s.io/apiserver/pkg/endpoints/discovery"
44 discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
45 scheme "k8s.io/client-go/kubernetes/scheme"
46 "k8s.io/client-go/tools/cache"
47 "k8s.io/client-go/util/workqueue"
48 apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
49 )
50
51 func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
52 dm := NewDiscoveryManager(rm).(*discoveryManager)
53 dm.dirtyAPIServiceQueue = newCompleterWorkqueue(dm.dirtyAPIServiceQueue)
54
55 return dm
56 }
57
58
59
60 func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
61 return cache.WaitForCacheSync(stopCh, func() bool {
62 return dm.dirtyAPIServiceQueue.(*completerWorkqueue).isComplete()
63 })
64 }
65
66
67 func TestBasic(t *testing.T) {
68 service1 := discoveryendpoint.NewResourceManager("apis")
69 service2 := discoveryendpoint.NewResourceManager("apis")
70 service3 := discoveryendpoint.NewResourceManager("apis")
71 apiGroup1 := fuzzAPIGroups(2, 5, 25)
72 apiGroup2 := fuzzAPIGroups(2, 5, 50)
73 apiGroup3 := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
74 {
75 ObjectMeta: metav1.ObjectMeta{Name: "weird.example.com"},
76 Versions: []apidiscoveryv2.APIVersionDiscovery{
77 {
78 Version: "v1",
79 Freshness: "Current",
80 Resources: []apidiscoveryv2.APIResourceDiscovery{
81 {
82 Resource: "parent-missing-kind",
83 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
84 {Subresource: "subresource-missing-kind"},
85 },
86 },
87 {
88 Resource: "parent-empty-kind",
89 ResponseKind: &metav1.GroupVersionKind{},
90 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
91 {Subresource: "subresource-empty-kind", ResponseKind: &metav1.GroupVersionKind{}},
92 },
93 },
94 {
95 Resource: "parent-with-kind",
96 ResponseKind: &metav1.GroupVersionKind{Kind: "ParentWithKind"},
97 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
98 {Subresource: "subresource-with-kind", ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithKind"}},
99 },
100 },
101 },
102 },
103 },
104 },
105 }}
106 apiGroup3WithFixup := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
107 {
108 ObjectMeta: metav1.ObjectMeta{Name: "weird.example.com"},
109 Versions: []apidiscoveryv2.APIVersionDiscovery{
110 {
111 Version: "v1",
112 Freshness: "Current",
113 Resources: []apidiscoveryv2.APIResourceDiscovery{
114 {
115 Resource: "parent-missing-kind",
116 ResponseKind: &metav1.GroupVersionKind{},
117 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
118 {Subresource: "subresource-missing-kind", ResponseKind: &metav1.GroupVersionKind{}},
119 },
120 },
121 {
122 Resource: "parent-empty-kind",
123 ResponseKind: &metav1.GroupVersionKind{},
124 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
125 {Subresource: "subresource-empty-kind", ResponseKind: &metav1.GroupVersionKind{}},
126 },
127 },
128 {
129 Resource: "parent-with-kind",
130 ResponseKind: &metav1.GroupVersionKind{Kind: "ParentWithKind"},
131 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
132 {Subresource: "subresource-with-kind", ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithKind"}},
133 },
134 },
135 },
136 },
137 },
138 },
139 }}
140 service1.SetGroups(apiGroup1.Items)
141 service2.SetGroups(apiGroup2.Items)
142 service3.SetGroups(apiGroup3.Items)
143 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
144 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
145
146 for _, g := range apiGroup1.Items {
147 versionPriority := int32(len(g.Versions) + 1)
148 for _, v := range g.Versions {
149 versionPriority--
150 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
151 ObjectMeta: metav1.ObjectMeta{
152 Name: v.Version + "." + g.Name,
153 },
154 Spec: apiregistrationv1.APIServiceSpec{
155 Group: g.Name,
156 Version: v.Version,
157 VersionPriority: versionPriority,
158 Service: &apiregistrationv1.ServiceReference{
159 Name: "service1",
160 },
161 },
162 }, service1)
163 }
164 }
165
166 for _, g := range apiGroup2.Items {
167 versionPriority := int32(len(g.Versions) + 1)
168 for _, v := range g.Versions {
169 versionPriority--
170 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
171 ObjectMeta: metav1.ObjectMeta{
172 Name: v.Version + "." + g.Name,
173 },
174 Spec: apiregistrationv1.APIServiceSpec{
175 Group: g.Name,
176 Version: v.Version,
177 VersionPriority: versionPriority,
178 Service: &apiregistrationv1.ServiceReference{
179 Name: "service2",
180 },
181 },
182 }, service2)
183 }
184 }
185
186 for _, g := range apiGroup3.Items {
187 versionPriority := int32(len(g.Versions) + 1)
188 for _, v := range g.Versions {
189 versionPriority--
190 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
191 ObjectMeta: metav1.ObjectMeta{
192 Name: v.Version + "." + g.Name,
193 },
194 Spec: apiregistrationv1.APIServiceSpec{
195 Group: g.Name,
196 Version: v.Version,
197 VersionPriority: versionPriority,
198 Service: &apiregistrationv1.ServiceReference{
199 Name: "service3",
200 },
201 },
202 }, service3)
203 }
204 }
205
206 testCtx, testCancel := context.WithCancel(context.Background())
207 defer testCancel()
208
209 go aggregatedManager.Run(testCtx.Done(), nil)
210
211 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
212
213 response, _, parsed := fetchPath(aggregatedResourceManager, "")
214 if response.StatusCode != 200 {
215 t.Fatalf("unexpected status code %d", response.StatusCode)
216 }
217 checkAPIGroups(t, apiGroup1, parsed)
218 checkAPIGroups(t, apiGroup2, parsed)
219 checkAPIGroups(t, apiGroup3WithFixup, parsed)
220 }
221
222 func checkAPIGroups(t *testing.T, api apidiscoveryv2.APIGroupDiscoveryList, response *apidiscoveryv2.APIGroupDiscoveryList) {
223 t.Helper()
224 if len(response.Items) < len(api.Items) {
225 t.Errorf("expected to check for at least %d groups, only have %d groups in response", len(api.Items), len(response.Items))
226 }
227 for _, knownGroup := range api.Items {
228 found := false
229 for _, possibleGroup := range response.Items {
230 if knownGroup.Name == possibleGroup.Name {
231 t.Logf("found %s", knownGroup.Name)
232 found = true
233 diff := cmp.Diff(knownGroup, possibleGroup)
234 if len(diff) > 0 {
235 t.Error(diff)
236 }
237 }
238 }
239 if found == false {
240 t.Errorf("could not find %s", knownGroup.Name)
241 }
242 }
243 }
244
245
246
247 func TestInitialRunHasAllAPIServices(t *testing.T) {
248 neverReturnCh := make(chan struct{})
249 defer close(neverReturnCh)
250 service := discoveryendpoint.NewResourceManager("apis")
251 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
252
253 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
254
255 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
256 ObjectMeta: metav1.ObjectMeta{
257 Name: "v1.stable.example.com",
258 },
259 Spec: apiregistrationv1.APIServiceSpec{
260 Group: "stable.example.com",
261 Version: "v1",
262 Service: &apiregistrationv1.ServiceReference{
263 Name: "test-service",
264 },
265 },
266 }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
267 <-neverReturnCh
268 service.ServeHTTP(w, r)
269 }))
270 testCtx, cancel := context.WithCancel(context.Background())
271 defer cancel()
272
273 initialSyncedCh := make(chan struct{})
274 go aggregatedManager.Run(testCtx.Done(), initialSyncedCh)
275 select {
276 case <-initialSyncedCh:
277 case <-time.After(10 * time.Second):
278 t.Fatal("timed out waiting for initial sync")
279 }
280
281 response, _, parsed := fetchPath(aggregatedResourceManager, "")
282 if response.StatusCode != 200 {
283 t.Fatalf("unexpected status code %d", response.StatusCode)
284 }
285
286 apiGroup := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
287 {
288 ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"},
289 Versions: []apidiscoveryv2.APIVersionDiscovery{
290 {
291 Version: "v1",
292 Freshness: "Stale",
293 },
294 },
295 },
296 }}
297
298 checkAPIGroups(t, apiGroup, parsed)
299 }
300
301 func TestServiceGC(t *testing.T) {
302 service := discoveryendpoint.NewResourceManager("apis")
303
304 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
305 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
306 testCtx, cancel := context.WithCancel(context.Background())
307 defer cancel()
308
309 go aggregatedManager.Run(testCtx.Done(), nil)
310
311 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
312 ObjectMeta: metav1.ObjectMeta{
313 Name: "v1.stable.example.com",
314 },
315 Spec: apiregistrationv1.APIServiceSpec{
316 Group: "stable.example.com",
317 Version: "v1",
318 Service: &apiregistrationv1.ServiceReference{
319 Name: "test-service",
320 },
321 },
322 }, service)
323
324 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
325
326
327 getCacheLen := func() int {
328 aggregatedManager.resultsLock.Lock()
329 defer aggregatedManager.resultsLock.Unlock()
330 return len(aggregatedManager.cachedResults)
331 }
332
333 require.Equal(t, 1, getCacheLen())
334
335
336 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
337 ObjectMeta: metav1.ObjectMeta{
338 Name: "v1.stable.example.com",
339 },
340 Spec: apiregistrationv1.APIServiceSpec{
341 Group: "stable.example.com",
342 Version: "v1",
343 Service: &apiregistrationv1.ServiceReference{
344 Name: "test-service-changed",
345 },
346 },
347 }, service)
348
349 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
350 require.Equal(t, 1, getCacheLen())
351 }
352
353
354
355 func TestV2Beta1Skew(t *testing.T) {
356 apiGroup := apidiscoveryv2.APIGroupDiscoveryList{Items: []apidiscoveryv2.APIGroupDiscovery{
357 {
358 ObjectMeta: metav1.ObjectMeta{Name: "stable.example.com"},
359 Versions: []apidiscoveryv2.APIVersionDiscovery{
360 {
361 Version: "v1",
362 Freshness: "Current",
363 Resources: []apidiscoveryv2.APIResourceDiscovery{
364 {
365 Resource: "parent-with-kind",
366 ResponseKind: &metav1.GroupVersionKind{Kind: "ParentWithKind"},
367 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
368 {Subresource: "subresource-with-kind", ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithKind"}},
369 },
370 },
371 },
372 },
373 },
374 },
375 }}
376
377 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
378
379 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
380
381 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
382 ObjectMeta: metav1.ObjectMeta{
383 Name: "v1.stable.example.com",
384 },
385 Spec: apiregistrationv1.APIServiceSpec{
386 Group: "stable.example.com",
387 Version: "v1",
388 Service: &apiregistrationv1.ServiceReference{
389 Name: "test-service",
390 },
391 },
392 }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
393
394 v2b := apidiscoveryv2beta1.APIGroupDiscoveryList{}
395 err := apidiscoveryv2scheme.Convertv2APIGroupDiscoveryListTov2beta1APIGroupDiscoveryList(&apiGroup, &v2b, nil)
396 require.NoError(t, err)
397 converted, err := json.Marshal(v2b)
398 require.NoError(t, err)
399 w.Header().Set("Content-Type", "application/json;"+"g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
400 w.WriteHeader(200)
401 _, err = w.Write(converted)
402 require.NoError(t, err)
403 }))
404 testCtx, cancel := context.WithCancel(context.Background())
405 defer cancel()
406
407 go aggregatedManager.Run(testCtx.Done(), nil)
408 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
409
410 response, _, parsed := fetchPath(aggregatedResourceManager, "")
411 if response.StatusCode != 200 {
412 t.Fatalf("unexpected status code %d", response.StatusCode)
413 }
414
415 checkAPIGroups(t, apiGroup, parsed)
416 }
417
418
419
420 func TestDirty(t *testing.T) {
421 var pinged atomic.Bool
422 service := discoveryendpoint.NewResourceManager("apis")
423 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
424
425 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
426
427 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
428 ObjectMeta: metav1.ObjectMeta{
429 Name: "v1.stable.example.com",
430 },
431 Spec: apiregistrationv1.APIServiceSpec{
432 Group: "stable.example.com",
433 Version: "v1",
434 Service: &apiregistrationv1.ServiceReference{
435 Name: "test-service",
436 },
437 },
438 }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
439 pinged.Store(true)
440 service.ServeHTTP(w, r)
441 }))
442 testCtx, cancel := context.WithCancel(context.Background())
443 defer cancel()
444
445 go aggregatedManager.Run(testCtx.Done(), nil)
446 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
447
448
449 if !pinged.Load() {
450 t.Errorf("service handler never pinged")
451 }
452 }
453
454
455
456 func TestWaitForSync(t *testing.T) {
457 pinged := atomic.Bool{}
458 service := discoveryendpoint.NewResourceManager("apis")
459 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
460
461 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
462
463 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
464 ObjectMeta: metav1.ObjectMeta{
465 Name: "v1.stable.example.com",
466 },
467 Spec: apiregistrationv1.APIServiceSpec{
468 Group: "stable.example.com",
469 Version: "v1",
470 Service: &apiregistrationv1.ServiceReference{
471 Name: "test-service",
472 },
473 },
474 }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
475 time.Sleep(3 * time.Second)
476 pinged.Store(true)
477 service.ServeHTTP(w, r)
478 }))
479 testCtx, cancel := context.WithCancel(context.Background())
480 defer cancel()
481
482 go aggregatedManager.Run(testCtx.Done(), nil)
483 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
484
485
486 if !pinged.Load() {
487 t.Errorf("service handler never pinged")
488 }
489 }
490
491
492
493 func TestRemoveAPIService(t *testing.T) {
494 aggyService := discoveryendpoint.NewResourceManager("apis")
495 service := discoveryendpoint.NewResourceManager("apis")
496 apiGroup := fuzzAPIGroups(2, 3, 10)
497 service.SetGroups(apiGroup.Items)
498
499 var apiServices []*apiregistrationv1.APIService
500 for _, g := range apiGroup.Items {
501 for _, v := range g.Versions {
502 apiservice := &apiregistrationv1.APIService{
503 ObjectMeta: metav1.ObjectMeta{
504 Name: v.Version + "." + g.Name,
505 },
506 Spec: apiregistrationv1.APIServiceSpec{
507 Group: g.Name,
508 Version: v.Version,
509 Service: &apiregistrationv1.ServiceReference{
510 Namespace: "serviceNamespace",
511 Name: "serviceName",
512 },
513 },
514 }
515
516 apiServices = append(apiServices, apiservice)
517 }
518 }
519
520 aggregatedManager := newDiscoveryManager(aggyService)
521
522 for _, s := range apiServices {
523 aggregatedManager.AddAPIService(s, service)
524 }
525
526 testCtx, testCancel := context.WithCancel(context.Background())
527 defer testCancel()
528
529 go aggregatedManager.Run(testCtx.Done(), nil)
530
531 for _, s := range apiServices {
532 aggregatedManager.RemoveAPIService(s.Name)
533 }
534
535 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
536
537 response, _, parsed := fetchPath(aggyService, "")
538 if response.StatusCode != 200 {
539 t.Fatalf("unexpected status code %d", response.StatusCode)
540 }
541 if len(parsed.Items) > 0 {
542 t.Errorf("expected to find no groups after service deletion (got %d groups)", len(parsed.Items))
543 }
544 }
545
546 func TestLegacyFallbackNoCache(t *testing.T) {
547 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
548 rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
549
550 legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
551 Name: "stable.example.com",
552 PreferredVersion: metav1.GroupVersionForDiscovery{
553 GroupVersion: "stable.example.com/v1",
554 Version: "v1",
555 },
556 Versions: []metav1.GroupVersionForDiscovery{
557 {
558 GroupVersion: "stable.example.com/v1",
559 Version: "v1",
560 },
561 {
562 GroupVersion: "stable.example.com/v1beta1",
563 Version: "v1beta1",
564 },
565 {
566 GroupVersion: "stable.example.com/v1alpha1",
567 Version: "v1alpha1",
568 },
569 {
570 GroupVersion: "stable.example.com/v2alpha1",
571 Version: "v2alpha1",
572 },
573 },
574 })
575
576 generateVersionResource := func(version string) metav1.APIResource {
577 return metav1.APIResource{
578 Name: "foos",
579 SingularName: "foo",
580 Group: "stable.example.com",
581 Version: version,
582 Namespaced: false,
583 Kind: "Foo",
584 Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"},
585 Categories: []string{"all"},
586 }
587 }
588
589 resources := map[string]metav1.APIResource{
590 "v1": generateVersionResource("v1"),
591 "v1beta1": generateVersionResource("v1beta1"),
592 "v1alpha1": generateVersionResource("v1alpha1"),
593 }
594
595 legacyResourceHandlerV1 := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
596 Group: "stable.example.com",
597 Version: "v1",
598 }, discovery.APIResourceListerFunc(func() []metav1.APIResource {
599 return []metav1.APIResource{
600 resources["v1"],
601 }
602 }))
603
604 legacyResourceHandlerV1Beta1 := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
605 Group: "stable.example.com",
606 Version: "v1beta1",
607 }, discovery.APIResourceListerFunc(func() []metav1.APIResource {
608 return []metav1.APIResource{
609 resources["v1beta1"],
610 }
611 }))
612
613 legacyResourceHandlerV1Alpha1 := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
614 Group: "stable.example.com",
615 Version: "v1alpha1",
616 }, discovery.APIResourceListerFunc(func() []metav1.APIResource {
617 return []metav1.APIResource{
618 resources["v1alpha1"],
619 }
620 }))
621
622 handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
623 if r.URL.Path == "/apis/stable.example.com" {
624 legacyGroupHandler.ServeHTTP(w, r)
625 } else if r.URL.Path == "/apis/stable.example.com/v1" {
626
627 legacyResourceHandlerV1.ServeHTTP(w, r)
628 } else if r.URL.Path == "/apis/stable.example.com/v1beta1" {
629
630 legacyResourceHandlerV1Beta1.ServeHTTP(w, r)
631 } else if r.URL.Path == "/apis/stable.example.com/v1alpha1" {
632 legacyResourceHandlerV1Alpha1.ServeHTTP(w, r)
633 } else if r.URL.Path == "/apis/stable.example.com/v2alpha1" {
634
635 json.NewEncoder(w).Encode(&metav1.APIResourceList{
636 GroupVersion: "stable.example.com/v2alpha1",
637 APIResources: []metav1.APIResource{
638 {Name: "parent-without-kind"},
639 {Name: "missing-parent/subresource-without-parent", Kind: "SubresourceWithoutParent"},
640 {Name: "parent-without-kind/subresource", Kind: "Subresource"},
641 {Name: "parent-without-kind/subresource-without-kind"},
642 },
643 })
644 } else if r.URL.Path == "/apis" {
645 rootAPIsHandler.ServeHTTP(w, r)
646 } else {
647
648 t.Fatalf("unexpected request sent to %v", r.URL.Path)
649 }
650 })
651
652 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
653 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
654 ObjectMeta: metav1.ObjectMeta{
655 Name: "v1.stable.example.com",
656 },
657 Spec: apiregistrationv1.APIServiceSpec{
658 Group: "stable.example.com",
659 Version: "v1",
660 Service: &apiregistrationv1.ServiceReference{
661 Name: "test-service",
662 },
663 },
664 }, handlerFunc)
665 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
666 ObjectMeta: metav1.ObjectMeta{
667 Name: "v1beta1.stable.example.com",
668 },
669 Spec: apiregistrationv1.APIServiceSpec{
670 Group: "stable.example.com",
671 Version: "v1beta1",
672 Service: &apiregistrationv1.ServiceReference{
673 Name: "test-service",
674 },
675 },
676 }, handlerFunc)
677 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
678 ObjectMeta: metav1.ObjectMeta{
679 Name: "v1alpha1.stable.example.com",
680 },
681 Spec: apiregistrationv1.APIServiceSpec{
682 Group: "stable.example.com",
683 Version: "v1alpha1",
684 Service: &apiregistrationv1.ServiceReference{
685 Name: "test-service",
686 },
687 },
688 }, handlerFunc)
689 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
690 ObjectMeta: metav1.ObjectMeta{
691 Name: "v2alpha1.stable.example.com",
692 },
693 Spec: apiregistrationv1.APIServiceSpec{
694 Group: "stable.example.com",
695 Version: "v2alpha1",
696 Service: &apiregistrationv1.ServiceReference{
697 Name: "test-service",
698 },
699 },
700 }, handlerFunc)
701
702 testCtx, cancel := context.WithCancel(context.Background())
703 defer cancel()
704
705 go aggregatedManager.Run(testCtx.Done(), nil)
706 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
707
708
709
710 _, _, doc := fetchPath(aggregatedResourceManager, "")
711
712 mustConvert := func(r []metav1.APIResource) []apidiscoveryv2.APIResourceDiscovery {
713 converted, err := endpoints.ConvertGroupVersionIntoToDiscovery(r)
714 require.NoError(t, err)
715 return converted
716 }
717 expectAggregatedDiscovery := []apidiscoveryv2.APIGroupDiscovery{{
718 ObjectMeta: metav1.ObjectMeta{
719 Name: "stable.example.com",
720 },
721 Versions: []apidiscoveryv2.APIVersionDiscovery{
722 {
723 Version: "v1",
724 Resources: mustConvert([]metav1.APIResource{resources["v1"]}),
725 Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
726 },
727 {
728 Version: "v1beta1",
729 Resources: mustConvert([]metav1.APIResource{resources["v1beta1"]}),
730 Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
731 },
732 {
733 Version: "v2alpha1",
734 Resources: []apidiscoveryv2.APIResourceDiscovery{
735 {
736 Resource: "parent-without-kind",
737 ResponseKind: &metav1.GroupVersionKind{},
738 Scope: "Cluster",
739 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
740 {
741 Subresource: "subresource",
742 ResponseKind: &metav1.GroupVersionKind{Kind: "Subresource"},
743 },
744 {
745 Subresource: "subresource-without-kind",
746 ResponseKind: &metav1.GroupVersionKind{},
747 },
748 },
749 },
750 {
751 Resource: "missing-parent",
752 ResponseKind: &metav1.GroupVersionKind{},
753 Scope: "Cluster",
754 Subresources: []apidiscoveryv2.APISubresourceDiscovery{
755 {
756 Subresource: "subresource-without-parent",
757 ResponseKind: &metav1.GroupVersionKind{Kind: "SubresourceWithoutParent"},
758 },
759 },
760 },
761 },
762 Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
763 },
764 {
765 Version: "v1alpha1",
766 Resources: mustConvert([]metav1.APIResource{resources["v1alpha1"]}),
767 Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
768 },
769 },
770 }}
771 require.Equal(t, doc.Items, expectAggregatedDiscovery)
772 }
773
774 func testLegacyFallbackWithCustomRootHandler(t *testing.T, rootHandlerFn func(http.ResponseWriter, *http.Request)) {
775 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
776
777 legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
778 Name: "stable.example.com",
779 PreferredVersion: metav1.GroupVersionForDiscovery{
780 GroupVersion: "stable.example.com/v1",
781 Version: "v1",
782 },
783 Versions: []metav1.GroupVersionForDiscovery{
784 {
785 GroupVersion: "stable.example.com/v1",
786 Version: "v1",
787 },
788 {
789 GroupVersion: "stable.example.com/v1beta1",
790 Version: "v1beta1",
791 },
792 },
793 })
794
795 resource := metav1.APIResource{
796 Name: "foos",
797 SingularName: "foo",
798 Group: "stable.example.com",
799 Version: "v1",
800 Namespaced: false,
801 Kind: "Foo",
802 Verbs: []string{"get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"},
803 Categories: []string{"all"},
804 }
805
806 legacyResourceHandler := discovery.NewAPIVersionHandler(scheme.Codecs, schema.GroupVersion{
807 Group: "stable.example.com",
808 Version: "v1",
809 }, discovery.APIResourceListerFunc(func() []metav1.APIResource {
810 return []metav1.APIResource{
811 resource,
812 }
813 }))
814
815 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
816 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
817 ObjectMeta: metav1.ObjectMeta{
818 Name: "v1.stable.example.com",
819 },
820 Spec: apiregistrationv1.APIServiceSpec{
821 Group: "stable.example.com",
822 Version: "v1",
823 Service: &apiregistrationv1.ServiceReference{
824 Name: "test-service",
825 },
826 },
827 }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
828 if r.URL.Path == "/apis/stable.example.com" {
829 legacyGroupHandler.ServeHTTP(w, r)
830 } else if r.URL.Path == "/apis/stable.example.com/v1" {
831
832 legacyResourceHandler.ServeHTTP(w, r)
833 } else if r.URL.Path == "/apis" {
834 rootHandlerFn(w, r)
835 } else {
836
837 t.Fatalf("unexpected request sent to %v", r.URL.Path)
838 }
839 }))
840 testCtx, cancel := context.WithCancel(context.Background())
841 defer cancel()
842
843 go aggregatedManager.Run(testCtx.Done(), nil)
844 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
845
846
847
848 _, _, doc := fetchPath(aggregatedResourceManager, "")
849
850 converted, err := endpoints.ConvertGroupVersionIntoToDiscovery([]metav1.APIResource{resource})
851 require.NoError(t, err)
852 require.Equal(t, []apidiscoveryv2.APIGroupDiscovery{
853 {
854 ObjectMeta: metav1.ObjectMeta{
855 Name: resource.Group,
856 },
857 Versions: []apidiscoveryv2.APIVersionDiscovery{
858 {
859 Version: resource.Version,
860 Resources: converted,
861 Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
862 },
863 },
864 },
865 }, doc.Items)
866 }
867 func TestLegacyFallback(t *testing.T) {
868 rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
869 testCases := []struct {
870 name string
871 rootHandler func(http.ResponseWriter, *http.Request)
872 }{
873 {
874 name: "Default root handler (406)",
875 rootHandler: rootAPIsHandler.ServeHTTP,
876 },
877 {
878 name: "Root handler with non 200 status code",
879 rootHandler: func(w http.ResponseWriter, r *http.Request) {
880 w.WriteHeader(404)
881 },
882 },
883 {
884 name: "Root handler with 200 response code no content type",
885 rootHandler: func(w http.ResponseWriter, r *http.Request) {
886 w.Header().Set("Content-Type", "application/json")
887 w.WriteHeader(200)
888 },
889 },
890 {
891 name: "Root handler with 200 response code incorrect content type",
892 rootHandler: func(w http.ResponseWriter, r *http.Request) {
893 w.Header().Set("Content-Type", "application/json;g=apidiscovery.k8s.io;v=v1alpha1;as=APIGroupDiscoveryList")
894 w.WriteHeader(200)
895 },
896 },
897 }
898 for _, tc := range testCases {
899 testLegacyFallbackWithCustomRootHandler(t, tc.rootHandler)
900 }
901 }
902
903 func TestAPIServiceStale(t *testing.T) {
904 aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
905 aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
906 aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
907 ObjectMeta: metav1.ObjectMeta{
908 Name: "v1.stable.example.com",
909 },
910 Spec: apiregistrationv1.APIServiceSpec{
911 Group: "stable.example.com",
912 Version: "v1",
913 Service: &apiregistrationv1.ServiceReference{
914 Name: "test-service",
915 },
916 },
917 }, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
918 w.WriteHeader(503)
919 }))
920 testCtx, cancel := context.WithCancel(context.Background())
921 defer cancel()
922
923 go aggregatedManager.Run(testCtx.Done(), nil)
924 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
925
926
927
928 _, _, doc := fetchPath(aggregatedResourceManager, "")
929 require.Equal(t, []apidiscoveryv2.APIGroupDiscovery{
930 {
931 ObjectMeta: metav1.ObjectMeta{
932 Name: "stable.example.com",
933 },
934 Versions: []apidiscoveryv2.APIVersionDiscovery{
935 {
936 Version: "v1",
937 Freshness: apidiscoveryv2.DiscoveryFreshnessStale,
938 },
939 },
940 },
941 }, doc.Items)
942 }
943
944
945
946
947 func TestNotModified(t *testing.T) {
948 aggyService := discoveryendpoint.NewResourceManager("apis")
949 service := discoveryendpoint.NewResourceManager("apis")
950 apiGroup := fuzzAPIGroups(2, 3, 10)
951 service.SetGroups(apiGroup.Items)
952
953 var apiServices []*apiregistrationv1.APIService
954 for _, g := range apiGroup.Items {
955 for _, v := range g.Versions {
956 apiservice := &apiregistrationv1.APIService{
957 ObjectMeta: metav1.ObjectMeta{
958 Name: v.Version + "." + g.Name,
959 },
960 Spec: apiregistrationv1.APIServiceSpec{
961 Group: g.Name,
962 Version: v.Version,
963 Service: &apiregistrationv1.ServiceReference{
964 Namespace: "serviceNamespace",
965 Name: "serviceName",
966 },
967 },
968 }
969
970 apiServices = append(apiServices, apiservice)
971 }
972 }
973
974 aggregatedManager := newDiscoveryManager(aggyService)
975
976
977
978 for _, s := range apiServices[:len(apiServices)-1] {
979 aggregatedManager.AddAPIService(s, service)
980 }
981
982 testCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
983 defer cancel()
984
985 go aggregatedManager.Run(testCtx.Done(), nil)
986
987
988
989 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
990
991
992
993
994
995 for _, s := range apiServices {
996 aggregatedManager.AddAPIService(s, service)
997 }
998
999
1000 require.True(t, waitForQueueComplete(testCtx.Done(), aggregatedManager))
1001 }
1002
1003
1004 func fuzzAPIGroups(atLeastNumGroups, maxNumGroups int, seed int64) apidiscoveryv2.APIGroupDiscoveryList {
1005 fuzzer := fuzz.NewWithSeed(seed)
1006 fuzzer.NumElements(atLeastNumGroups, maxNumGroups)
1007 fuzzer.NilChance(0)
1008 fuzzer.Funcs(func(o *apidiscoveryv2.APIGroupDiscovery, c fuzz.Continue) {
1009 c.FuzzNoCustom(o)
1010
1011
1012
1013 atLeastOne := apidiscoveryv2.APIVersionDiscovery{}
1014 c.Fuzz(&atLeastOne)
1015 o.Versions = append(o.Versions, atLeastOne)
1016
1017
1018 o.TypeMeta = metav1.TypeMeta{}
1019
1020 o.ObjectMeta = metav1.ObjectMeta{Name: o.ObjectMeta.Name}
1021
1022 for i := range o.Versions {
1023 o.Versions[i].Freshness = "Current"
1024 o.Versions[i].Version = fmt.Sprintf("v%d", i+1)
1025 }
1026 })
1027
1028 var apis []apidiscoveryv2.APIGroupDiscovery
1029 fuzzer.Fuzz(&apis)
1030
1031 return apidiscoveryv2.APIGroupDiscoveryList{
1032 TypeMeta: metav1.TypeMeta{
1033 Kind: "APIGroupDiscoveryList",
1034 APIVersion: "v1",
1035 },
1036 Items: apis,
1037 }
1038
1039 }
1040
1041
1042 func fetchPath(handler http.Handler, etag string) (*http.Response, []byte, *apidiscoveryv2.APIGroupDiscoveryList) {
1043
1044 w := httptest.NewRecorder()
1045 req := httptest.NewRequest("GET", "/apis", nil)
1046
1047
1048 req.Header.Set("Accept", runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2;as=APIGroupDiscoveryList,"+runtime.ContentTypeJSON+";g=apidiscovery.k8s.io;v=v2beta1;as=APIGroupDiscoveryList")
1049
1050 if etag != "" {
1051
1052 quoted := etag
1053 if !strings.HasPrefix(etag, "\"") {
1054 quoted = strconv.Quote(etag)
1055 }
1056 req.Header.Set("If-None-Match", quoted)
1057 }
1058
1059 handler.ServeHTTP(w, req)
1060
1061 bytes := w.Body.Bytes()
1062 var decoded *apidiscoveryv2.APIGroupDiscoveryList
1063 if len(bytes) > 0 {
1064 decoded = &apidiscoveryv2.APIGroupDiscoveryList{}
1065 runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), bytes, decoded)
1066 }
1067
1068 return w.Result(), bytes, decoded
1069 }
1070
1071
1072
1073 type completerWorkqueue struct {
1074 lock sync.Mutex
1075 workqueue.RateLimitingInterface
1076 processing map[interface{}]struct{}
1077 }
1078
1079 var _ = workqueue.RateLimitingInterface(&completerWorkqueue{})
1080
1081 func newCompleterWorkqueue(wq workqueue.RateLimitingInterface) *completerWorkqueue {
1082 return &completerWorkqueue{
1083 RateLimitingInterface: wq,
1084 processing: make(map[interface{}]struct{}),
1085 }
1086 }
1087
1088 func (q *completerWorkqueue) Add(item interface{}) {
1089 q.lock.Lock()
1090 defer q.lock.Unlock()
1091 q.processing[item] = struct{}{}
1092 q.RateLimitingInterface.Add(item)
1093 }
1094
1095 func (q *completerWorkqueue) AddAfter(item interface{}, duration time.Duration) {
1096 q.Add(item)
1097 }
1098
1099 func (q *completerWorkqueue) AddRateLimited(item interface{}) {
1100 q.Add(item)
1101 }
1102
1103 func (q *completerWorkqueue) Done(item interface{}) {
1104 q.lock.Lock()
1105 defer q.lock.Unlock()
1106 delete(q.processing, item)
1107 q.RateLimitingInterface.Done(item)
1108 }
1109
1110 func (q *completerWorkqueue) isComplete() bool {
1111 q.lock.Lock()
1112 defer q.lock.Unlock()
1113 return q.Len() == 0 && len(q.processing) == 0
1114 }
1115
View as plain text