1
16
17 package openapi
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 "github.com/google/uuid"
25
26 apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
27 "k8s.io/apimachinery/pkg/api/errors"
28 "k8s.io/apimachinery/pkg/labels"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/util/workqueue"
34 "k8s.io/klog/v2"
35 "k8s.io/kube-openapi/pkg/cached"
36 "k8s.io/kube-openapi/pkg/handler"
37 "k8s.io/kube-openapi/pkg/validation/spec"
38
39 apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
40 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
41 informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
42 listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
43 "k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
44 )
45
46
47 type Controller struct {
48 crdLister listers.CustomResourceDefinitionLister
49 crdsSynced cache.InformerSynced
50
51
52 syncFn func(string) error
53
54 queue workqueue.RateLimitingInterface
55
56 staticSpec *spec.Swagger
57
58 openAPIService *handler.OpenAPIService
59
60
61
62 lock sync.Mutex
63 specsByName map[string]*specCache
64 }
65
66
67
68
69
70
71 type specCache struct {
72 crdCache cached.LastSuccess[*apiextensionsv1.CustomResourceDefinition]
73 mergedVersionSpec cached.Value[*spec.Swagger]
74 }
75
76 func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) {
77 s.crdCache.Store(cached.Static(crd, generateCRDHash(crd)))
78 }
79
80 func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache {
81 s := specCache{}
82 s.update(crd)
83
84 s.mergedVersionSpec = cached.Transform[*apiextensionsv1.CustomResourceDefinition](func(crd *apiextensionsv1.CustomResourceDefinition, etag string, err error) (*spec.Swagger, string, error) {
85 if err != nil {
86
87 return nil, "", err
88 }
89 mergeSpec := &spec.Swagger{}
90 for _, v := range crd.Spec.Versions {
91 if !v.Served {
92 continue
93 }
94 s, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{
95 V2: true,
96 IncludeSelectableFields: utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceFieldSelectors),
97 })
98
99
100 if err != nil {
101 return nil, "", err
102 }
103 s.Definitions = handler.PruneDefaults(s.Definitions)
104 mergeSpec, err = builder.MergeSpecs(mergeSpec, s)
105 if err != nil {
106 return nil, "", err
107 }
108 }
109 return mergeSpec, generateCRDHash(crd), nil
110 }, &s.crdCache)
111 return &s
112 }
113
114
115 func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
116 c := &Controller{
117 crdLister: crdInformer.Lister(),
118 crdsSynced: crdInformer.Informer().HasSynced,
119 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"),
120 specsByName: map[string]*specCache{},
121 }
122
123 crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
124 AddFunc: c.addCustomResourceDefinition,
125 UpdateFunc: c.updateCustomResourceDefinition,
126 DeleteFunc: c.deleteCustomResourceDefinition,
127 })
128
129 c.syncFn = c.sync
130 return c
131 }
132
133
134 func (c *Controller) Run(staticSpec *spec.Swagger, openAPIService *handler.OpenAPIService, stopCh <-chan struct{}) {
135 defer utilruntime.HandleCrash()
136 defer c.queue.ShutDown()
137 defer klog.Infof("Shutting down OpenAPI controller")
138
139 klog.Infof("Starting OpenAPI controller")
140
141 c.staticSpec = staticSpec
142 c.openAPIService = openAPIService
143
144 if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
145 utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
146 return
147 }
148
149
150 crds, err := c.crdLister.List(labels.Everything())
151 if err != nil {
152 utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err))
153 return
154 }
155 for _, crd := range crds {
156 if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
157 continue
158 }
159 c.specsByName[crd.Name] = createSpecCache(crd)
160 }
161 c.updateSpecLocked()
162
163
164 go wait.Until(c.runWorker, time.Second, stopCh)
165
166 <-stopCh
167 }
168
169 func (c *Controller) runWorker() {
170 for c.processNextWorkItem() {
171 }
172 }
173
174 func (c *Controller) processNextWorkItem() bool {
175 key, quit := c.queue.Get()
176 if quit {
177 return false
178 }
179 defer c.queue.Done(key)
180
181
182 start := time.Now()
183 defer func() {
184 elapsed := time.Since(start)
185 if elapsed > time.Second {
186 klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed)
187 }
188 }()
189
190 err := c.syncFn(key.(string))
191 if err == nil {
192 c.queue.Forget(key)
193 return true
194 }
195
196 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
197 c.queue.AddRateLimited(key)
198 return true
199 }
200
201 func (c *Controller) sync(name string) error {
202 c.lock.Lock()
203 defer c.lock.Unlock()
204
205 crd, err := c.crdLister.Get(name)
206 if err != nil && !errors.IsNotFound(err) {
207 return err
208 }
209
210
211 if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
212 if _, found := c.specsByName[name]; !found {
213 return nil
214 }
215 delete(c.specsByName, name)
216 klog.V(2).Infof("Updating CRD OpenAPI spec because %s was removed", name)
217 regenerationCounter.With(map[string]string{"crd": name, "reason": "remove"})
218 c.updateSpecLocked()
219 return nil
220 }
221
222
223
224
225 s, exists := c.specsByName[crd.Name]
226 if exists {
227 s.update(crd)
228 klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
229 regenerationCounter.With(map[string]string{"crd": name, "reason": "update"})
230 return nil
231 }
232
233 c.specsByName[crd.Name] = createSpecCache(crd)
234 klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
235 regenerationCounter.With(map[string]string{"crd": name, "reason": "add"})
236 c.updateSpecLocked()
237 return nil
238 }
239
240
241 func (c *Controller) updateSpecLocked() {
242 specList := make([]cached.Value[*spec.Swagger], 0, len(c.specsByName))
243 for crd := range c.specsByName {
244 specList = append(specList, c.specsByName[crd].mergedVersionSpec)
245 }
246
247 cache := cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) {
248 localCRDSpec := make([]*spec.Swagger, 0, len(results))
249 for k := range results {
250 if results[k].Err == nil {
251 localCRDSpec = append(localCRDSpec, results[k].Value)
252 }
253 }
254 mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...)
255 if err != nil {
256 return nil, "", fmt.Errorf("failed to merge specs: %v", err)
257 }
258
259
260
261
262 return mergedSpec, uuid.New().String(), nil
263 }, specList)
264 c.openAPIService.UpdateSpecLazy(cache)
265 }
266
267 func (c *Controller) addCustomResourceDefinition(obj interface{}) {
268 castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
269 klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
270 c.enqueue(castObj)
271 }
272
273 func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) {
274 castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
275 klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name)
276 c.enqueue(castNewObj)
277 }
278
279 func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
280 castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
281 if !ok {
282 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
283 if !ok {
284 klog.Errorf("Couldn't get object from tombstone %#v", obj)
285 return
286 }
287 castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
288 if !ok {
289 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
290 return
291 }
292 }
293 klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
294 c.enqueue(castObj)
295 }
296
297 func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
298 c.queue.Add(obj.Name)
299 }
300
301 func generateCRDHash(crd *apiextensionsv1.CustomResourceDefinition) string {
302 return fmt.Sprintf("%s,%d", crd.UID, crd.Generation)
303 }
304
View as plain text