1
16
17 package apiserver
18
19 import (
20 "fmt"
21 "sort"
22 "time"
23
24 "k8s.io/klog/v2"
25
26 apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
27 autoscaling "k8s.io/api/autoscaling/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/runtime/schema"
31 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apimachinery/pkg/version"
34 "k8s.io/apiserver/pkg/endpoints/discovery"
35 discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
36 "k8s.io/client-go/tools/cache"
37 "k8s.io/client-go/util/workqueue"
38
39 apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
40 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
41 informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
42 listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
43 )
44
45 type DiscoveryController struct {
46 versionHandler *versionDiscoveryHandler
47 groupHandler *groupDiscoveryHandler
48 resourceManager discoveryendpoint.ResourceManager
49
50 crdLister listers.CustomResourceDefinitionLister
51 crdsSynced cache.InformerSynced
52
53
54 syncFn func(version schema.GroupVersion) error
55
56 queue workqueue.RateLimitingInterface
57 }
58
59 func NewDiscoveryController(
60 crdInformer informers.CustomResourceDefinitionInformer,
61 versionHandler *versionDiscoveryHandler,
62 groupHandler *groupDiscoveryHandler,
63 resourceManager discoveryendpoint.ResourceManager,
64 ) *DiscoveryController {
65 c := &DiscoveryController{
66 versionHandler: versionHandler,
67 groupHandler: groupHandler,
68 resourceManager: resourceManager,
69 crdLister: crdInformer.Lister(),
70 crdsSynced: crdInformer.Informer().HasSynced,
71
72 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DiscoveryController"),
73 }
74
75 crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
76 AddFunc: c.addCustomResourceDefinition,
77 UpdateFunc: c.updateCustomResourceDefinition,
78 DeleteFunc: c.deleteCustomResourceDefinition,
79 })
80
81 c.syncFn = c.sync
82
83 return c
84 }
85
86 func (c *DiscoveryController) sync(version schema.GroupVersion) error {
87
88 apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
89 apiResourcesForDiscovery := []metav1.APIResource{}
90 aggregatedAPIResourcesForDiscovery := []apidiscoveryv2.APIResourceDiscovery{}
91 versionsForDiscoveryMap := map[metav1.GroupVersion]bool{}
92
93 crds, err := c.crdLister.List(labels.Everything())
94 if err != nil {
95 return err
96 }
97 foundVersion := false
98 foundGroup := false
99 for _, crd := range crds {
100 if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
101 continue
102 }
103
104 if crd.Spec.Group != version.Group {
105 continue
106 }
107
108 foundThisVersion := false
109 var storageVersionHash string
110 for _, v := range crd.Spec.Versions {
111 if !v.Served {
112 continue
113 }
114
115 foundGroup = true
116
117 gv := metav1.GroupVersion{Group: crd.Spec.Group, Version: v.Name}
118 if !versionsForDiscoveryMap[gv] {
119 versionsForDiscoveryMap[gv] = true
120 apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
121 GroupVersion: crd.Spec.Group + "/" + v.Name,
122 Version: v.Name,
123 })
124 }
125 if v.Name == version.Version {
126 foundThisVersion = true
127 }
128 if v.Storage {
129 storageVersionHash = discovery.StorageVersionHash(gv.Group, gv.Version, crd.Spec.Names.Kind)
130 }
131 }
132
133 if !foundThisVersion {
134 continue
135 }
136 foundVersion = true
137
138 verbs := metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"})
139
140 if apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating) {
141 verbs = metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "watch"})
142 }
143
144 apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
145 Name: crd.Status.AcceptedNames.Plural,
146 SingularName: crd.Status.AcceptedNames.Singular,
147 Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
148 Kind: crd.Status.AcceptedNames.Kind,
149 Verbs: verbs,
150 ShortNames: crd.Status.AcceptedNames.ShortNames,
151 Categories: crd.Status.AcceptedNames.Categories,
152 StorageVersionHash: storageVersionHash,
153 })
154
155 subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, version.Version)
156 if err != nil {
157 return err
158 }
159
160 if c.resourceManager != nil {
161 var scope apidiscoveryv2.ResourceScope
162 if crd.Spec.Scope == apiextensionsv1.NamespaceScoped {
163 scope = apidiscoveryv2.ScopeNamespace
164 } else {
165 scope = apidiscoveryv2.ScopeCluster
166 }
167 apiResourceDiscovery := apidiscoveryv2.APIResourceDiscovery{
168 Resource: crd.Status.AcceptedNames.Plural,
169 SingularResource: crd.Status.AcceptedNames.Singular,
170 Scope: scope,
171 ResponseKind: &metav1.GroupVersionKind{
172 Group: version.Group,
173 Version: version.Version,
174 Kind: crd.Status.AcceptedNames.Kind,
175 },
176 Verbs: verbs,
177 ShortNames: crd.Status.AcceptedNames.ShortNames,
178 Categories: crd.Status.AcceptedNames.Categories,
179 }
180 if subresources != nil && subresources.Status != nil {
181 apiResourceDiscovery.Subresources = append(apiResourceDiscovery.Subresources, apidiscoveryv2.APISubresourceDiscovery{
182 Subresource: "status",
183 ResponseKind: &metav1.GroupVersionKind{
184 Group: version.Group,
185 Version: version.Version,
186 Kind: crd.Status.AcceptedNames.Kind,
187 },
188 Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
189 })
190 }
191 if subresources != nil && subresources.Scale != nil {
192 apiResourceDiscovery.Subresources = append(apiResourceDiscovery.Subresources, apidiscoveryv2.APISubresourceDiscovery{
193 Subresource: "scale",
194 ResponseKind: &metav1.GroupVersionKind{
195 Group: autoscaling.GroupName,
196 Version: "v1",
197 Kind: "Scale",
198 },
199 Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
200 })
201
202 }
203 aggregatedAPIResourcesForDiscovery = append(aggregatedAPIResourcesForDiscovery, apiResourceDiscovery)
204 }
205
206 if subresources != nil && subresources.Status != nil {
207 apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
208 Name: crd.Status.AcceptedNames.Plural + "/status",
209 Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
210 Kind: crd.Status.AcceptedNames.Kind,
211 Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
212 })
213 }
214
215 if subresources != nil && subresources.Scale != nil {
216 apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
217 Group: autoscaling.GroupName,
218 Version: "v1",
219 Kind: "Scale",
220 Name: crd.Status.AcceptedNames.Plural + "/scale",
221 Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
222 Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
223 })
224 }
225 }
226
227 if !foundGroup {
228 c.groupHandler.unsetDiscovery(version.Group)
229 c.versionHandler.unsetDiscovery(version)
230
231 if c.resourceManager != nil {
232 c.resourceManager.RemoveGroup(version.Group)
233 }
234 return nil
235 }
236
237 sortGroupDiscoveryByKubeAwareVersion(apiVersionsForDiscovery)
238
239 apiGroup := metav1.APIGroup{
240 Name: version.Group,
241 Versions: apiVersionsForDiscovery,
242
243
244 PreferredVersion: apiVersionsForDiscovery[0],
245 }
246 c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup))
247
248 if !foundVersion {
249 c.versionHandler.unsetDiscovery(version)
250
251 if c.resourceManager != nil {
252 c.resourceManager.RemoveGroupVersion(metav1.GroupVersion{
253 Group: version.Group,
254 Version: version.Version,
255 })
256 }
257 return nil
258 }
259 c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {
260 return apiResourcesForDiscovery
261 })))
262
263 sort.Slice(aggregatedAPIResourcesForDiscovery, func(i, j int) bool {
264 return aggregatedAPIResourcesForDiscovery[i].Resource < aggregatedAPIResourcesForDiscovery[j].Resource
265 })
266 if c.resourceManager != nil {
267 c.resourceManager.AddGroupVersion(version.Group, apidiscoveryv2.APIVersionDiscovery{
268 Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
269 Version: version.Version,
270 Resources: aggregatedAPIResourcesForDiscovery,
271 })
272
273 c.resourceManager.SetGroupVersionPriority(metav1.GroupVersion(version), 1000, 100)
274 }
275 return nil
276 }
277
278 func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery) {
279 sort.Slice(gd, func(i, j int) bool {
280 return version.CompareKubeAwareVersionStrings(gd[i].Version, gd[j].Version) > 0
281 })
282 }
283
284 func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) {
285 defer utilruntime.HandleCrash()
286 defer c.queue.ShutDown()
287 defer klog.Info("Shutting down DiscoveryController")
288
289 klog.Info("Starting DiscoveryController")
290
291 if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
292 utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
293 return
294 }
295
296
297 if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
298 crds, err := c.crdLister.List(labels.Everything())
299 if err != nil {
300 utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
301 return false, nil
302 }
303 for _, crd := range crds {
304 for _, v := range crd.Spec.Versions {
305 gv := schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}
306 if err := c.sync(gv); err != nil {
307 utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err))
308 return false, nil
309 }
310 }
311 }
312 return true, nil
313 }, stopCh); err == wait.ErrWaitTimeout {
314 utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
315 return
316 } else if err != nil {
317 panic(fmt.Errorf("unexpected error: %v", err))
318 }
319 close(synchedCh)
320
321
322 go wait.Until(c.runWorker, time.Second, stopCh)
323
324 <-stopCh
325 }
326
327 func (c *DiscoveryController) runWorker() {
328 for c.processNextWorkItem() {
329 }
330 }
331
332
333 func (c *DiscoveryController) processNextWorkItem() bool {
334 key, quit := c.queue.Get()
335 if quit {
336 return false
337 }
338 defer c.queue.Done(key)
339
340 err := c.syncFn(key.(schema.GroupVersion))
341 if err == nil {
342 c.queue.Forget(key)
343 return true
344 }
345
346 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
347 c.queue.AddRateLimited(key)
348
349 return true
350 }
351
352 func (c *DiscoveryController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
353 for _, v := range obj.Spec.Versions {
354 c.queue.Add(schema.GroupVersion{Group: obj.Spec.Group, Version: v.Name})
355 }
356 }
357
358 func (c *DiscoveryController) addCustomResourceDefinition(obj interface{}) {
359 castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
360 klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
361 c.enqueue(castObj)
362 }
363
364 func (c *DiscoveryController) updateCustomResourceDefinition(oldObj, newObj interface{}) {
365 castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
366 castOldObj := oldObj.(*apiextensionsv1.CustomResourceDefinition)
367 klog.V(4).Infof("Updating customresourcedefinition %s", castOldObj.Name)
368
369
370 c.enqueue(castNewObj)
371 c.enqueue(castOldObj)
372 }
373
374 func (c *DiscoveryController) deleteCustomResourceDefinition(obj interface{}) {
375 castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
376 if !ok {
377 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
378 if !ok {
379 klog.Errorf("Couldn't get object from tombstone %#v", obj)
380 return
381 }
382 castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
383 if !ok {
384 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
385 return
386 }
387 }
388 klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
389 c.enqueue(castObj)
390 }
391
View as plain text