...
1
16
17 package crdregistration
18
19 import (
20 "fmt"
21 "time"
22
23 "k8s.io/klog/v2"
24
25 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
26 crdinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
27 crdlisters "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/runtime/schema"
31 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/client-go/tools/cache"
34 "k8s.io/client-go/util/workqueue"
35 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
36 )
37
38
39
40 type AutoAPIServiceRegistration interface {
41
42 AddAPIServiceToSync(in *v1.APIService)
43
44 RemoveAPIServiceToSync(name string)
45 }
46
47 type crdRegistrationController struct {
48 crdLister crdlisters.CustomResourceDefinitionLister
49 crdSynced cache.InformerSynced
50
51 apiServiceRegistration AutoAPIServiceRegistration
52
53 syncHandler func(groupVersion schema.GroupVersion) error
54
55 syncedInitialSet chan struct{}
56
57
58
59 queue workqueue.RateLimitingInterface
60 }
61
62
63
64 func NewCRDRegistrationController(crdinformer crdinformers.CustomResourceDefinitionInformer, apiServiceRegistration AutoAPIServiceRegistration) *crdRegistrationController {
65 c := &crdRegistrationController{
66 crdLister: crdinformer.Lister(),
67 crdSynced: crdinformer.Informer().HasSynced,
68 apiServiceRegistration: apiServiceRegistration,
69 syncedInitialSet: make(chan struct{}),
70 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_autoregistration_controller"),
71 }
72 c.syncHandler = c.handleVersionUpdate
73
74 crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
75 AddFunc: func(obj interface{}) {
76 cast := obj.(*apiextensionsv1.CustomResourceDefinition)
77 c.enqueueCRD(cast)
78 },
79 UpdateFunc: func(oldObj, newObj interface{}) {
80
81
82 c.enqueueCRD(oldObj.(*apiextensionsv1.CustomResourceDefinition))
83 c.enqueueCRD(newObj.(*apiextensionsv1.CustomResourceDefinition))
84 },
85 DeleteFunc: func(obj interface{}) {
86 cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
87 if !ok {
88 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
89 if !ok {
90 klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
91 return
92 }
93 cast, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
94 if !ok {
95 klog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
96 return
97 }
98 }
99 c.enqueueCRD(cast)
100 },
101 })
102
103 return c
104 }
105
106 func (c *crdRegistrationController) Run(workers int, stopCh <-chan struct{}) {
107 defer utilruntime.HandleCrash()
108
109 defer c.queue.ShutDown()
110
111 klog.Infof("Starting crd-autoregister controller")
112 defer klog.Infof("Shutting down crd-autoregister controller")
113
114
115 if !cache.WaitForNamedCacheSync("crd-autoregister", stopCh, c.crdSynced) {
116 return
117 }
118
119
120 if crds, err := c.crdLister.List(labels.Everything()); err != nil {
121 utilruntime.HandleError(err)
122 } else {
123 for _, crd := range crds {
124 for _, version := range crd.Spec.Versions {
125 if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}); err != nil {
126 utilruntime.HandleError(err)
127 }
128 }
129 }
130 }
131 close(c.syncedInitialSet)
132
133
134 for i := 0; i < workers; i++ {
135
136
137 go wait.Until(c.runWorker, time.Second, stopCh)
138 }
139
140
141 <-stopCh
142 }
143
144
145 func (c *crdRegistrationController) WaitForInitialSync() {
146 <-c.syncedInitialSet
147 }
148
149 func (c *crdRegistrationController) runWorker() {
150
151
152 for c.processNextWorkItem() {
153 }
154 }
155
156
157 func (c *crdRegistrationController) processNextWorkItem() bool {
158
159 key, quit := c.queue.Get()
160 if quit {
161 return false
162 }
163
164 defer c.queue.Done(key)
165
166
167 err := c.syncHandler(key.(schema.GroupVersion))
168 if err == nil {
169
170
171 c.queue.Forget(key)
172 return true
173 }
174
175
176
177 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
178
179
180
181
182 c.queue.AddRateLimited(key)
183
184 return true
185 }
186
187 func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition) {
188 for _, version := range crd.Spec.Versions {
189 c.queue.Add(schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name})
190 }
191 }
192
193 func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
194 apiServiceName := groupVersion.Version + "." + groupVersion.Group
195
196
197 crds, err := c.crdLister.List(labels.Everything())
198 if err != nil {
199 return err
200 }
201 for _, crd := range crds {
202 if crd.Spec.Group != groupVersion.Group {
203 continue
204 }
205 for _, version := range crd.Spec.Versions {
206 if version.Name != groupVersion.Version || !version.Served {
207 continue
208 }
209
210 c.apiServiceRegistration.AddAPIServiceToSync(&v1.APIService{
211 ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
212 Spec: v1.APIServiceSpec{
213 Group: groupVersion.Group,
214 Version: groupVersion.Version,
215 GroupPriorityMinimum: 1000,
216 VersionPriority: 100,
217 },
218 })
219 return nil
220 }
221 }
222
223 c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)
224 return nil
225 }
226
View as plain text