1
16
17 package openapiv3
18
19 import (
20 "fmt"
21 "reflect"
22 "sync"
23 "time"
24
25 apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
26 "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/labels"
28 "k8s.io/apimachinery/pkg/runtime/schema"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/util/workqueue"
34 "k8s.io/klog/v2"
35 "k8s.io/kube-openapi/pkg/handler3"
36 "k8s.io/kube-openapi/pkg/spec3"
37
38 apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
39 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
40 informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
41 listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
42 "k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
43 )
44
45
46 type Controller struct {
47 crdLister listers.CustomResourceDefinitionLister
48 crdsSynced cache.InformerSynced
49
50
51 syncFn func(string) error
52
53 queue workqueue.RateLimitingInterface
54
55 openAPIV3Service *handler3.OpenAPIService
56
57
58 lock sync.Mutex
59 specsByGVandName map[schema.GroupVersion]map[string]*spec3.OpenAPI
60 }
61
62
63 func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
64 c := &Controller{
65 crdLister: crdInformer.Lister(),
66 crdsSynced: crdInformer.Informer().HasSynced,
67 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_v3_controller"),
68 specsByGVandName: map[schema.GroupVersion]map[string]*spec3.OpenAPI{},
69 }
70
71 crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
72 AddFunc: c.addCustomResourceDefinition,
73 UpdateFunc: c.updateCustomResourceDefinition,
74 DeleteFunc: c.deleteCustomResourceDefinition,
75 })
76
77 c.syncFn = c.sync
78 return c
79 }
80
81
82 func (c *Controller) Run(openAPIV3Service *handler3.OpenAPIService, stopCh <-chan struct{}) {
83 defer utilruntime.HandleCrash()
84 defer c.queue.ShutDown()
85 defer klog.Infof("Shutting down OpenAPI V3 controller")
86
87 klog.Infof("Starting OpenAPI V3 controller")
88
89 c.openAPIV3Service = openAPIV3Service
90
91 if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
92 utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
93 return
94 }
95
96 crds, err := c.crdLister.List(labels.Everything())
97 if err != nil {
98 utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err))
99 return
100 }
101 for _, crd := range crds {
102 if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
103 continue
104 }
105 for _, v := range crd.Spec.Versions {
106 if !v.Served {
107 continue
108 }
109 c.buildV3Spec(crd, crd.Name, v.Name)
110 }
111 }
112
113
114 go wait.Until(c.runWorker, time.Second, stopCh)
115
116 <-stopCh
117 }
118
119 func (c *Controller) runWorker() {
120 for c.processNextWorkItem() {
121 }
122 }
123
124 func (c *Controller) processNextWorkItem() bool {
125 key, quit := c.queue.Get()
126 if quit {
127 return false
128 }
129 defer c.queue.Done(key)
130
131
132 start := time.Now()
133 defer func() {
134 elapsed := time.Since(start)
135 if elapsed > time.Second {
136 klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed)
137 }
138 }()
139
140 err := c.syncFn(key.(string))
141 if err == nil {
142 c.queue.Forget(key)
143 return true
144 }
145
146 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
147 c.queue.AddRateLimited(key)
148 return true
149 }
150
151 func (c *Controller) sync(name string) error {
152 c.lock.Lock()
153 defer c.lock.Unlock()
154
155 crd, err := c.crdLister.Get(name)
156 if err != nil && !errors.IsNotFound(err) {
157 return err
158 }
159
160 if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
161 c.deleteCRD(name)
162 return nil
163 }
164
165 for _, v := range crd.Spec.Versions {
166 if !v.Served {
167 continue
168 }
169 c.buildV3Spec(crd, name, v.Name)
170 }
171
172 return nil
173 }
174
175 func (c *Controller) deleteCRD(name string) {
176 for gv, crdListForGV := range c.specsByGVandName {
177 _, needOpenAPIUpdate := crdListForGV[name]
178 if needOpenAPIUpdate {
179 delete(crdListForGV, name)
180 if len(crdListForGV) == 0 {
181 delete(c.specsByGVandName, gv)
182 }
183 regenerationCounter.With(map[string]string{"group": gv.Group, "version": gv.Version, "crd": name, "reason": "remove"})
184 c.updateGroupVersion(gv)
185 }
186 }
187 }
188
189 func (c *Controller) updateGroupVersion(gv schema.GroupVersion) error {
190 if _, ok := c.specsByGVandName[gv]; !ok {
191 c.openAPIV3Service.DeleteGroupVersion(groupVersionToOpenAPIV3Path(gv))
192 return nil
193 }
194
195 var specs []*spec3.OpenAPI
196 for _, spec := range c.specsByGVandName[gv] {
197 specs = append(specs, spec)
198 }
199
200 mergedSpec, err := builder.MergeSpecsV3(specs...)
201 if err != nil {
202 return fmt.Errorf("failed to merge specs: %v", err)
203 }
204
205 c.openAPIV3Service.UpdateGroupVersion(groupVersionToOpenAPIV3Path(gv), mergedSpec)
206 return nil
207 }
208
209 func (c *Controller) updateCRDSpec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string, v3 *spec3.OpenAPI) error {
210 gv := schema.GroupVersion{
211 Group: crd.Spec.Group,
212 Version: versionName,
213 }
214
215 _, ok := c.specsByGVandName[gv]
216 reason := "update"
217 if !ok {
218 reason = "add"
219 c.specsByGVandName[gv] = map[string]*spec3.OpenAPI{}
220 }
221
222 oldSpec, ok := c.specsByGVandName[gv][name]
223 if ok {
224 if reflect.DeepEqual(oldSpec, v3) {
225
226 return nil
227 }
228 }
229 c.specsByGVandName[gv][name] = v3
230 regenerationCounter.With(map[string]string{"crd": name, "group": gv.Group, "version": gv.Version, "reason": reason})
231 return c.updateGroupVersion(gv)
232 }
233
234 func (c *Controller) buildV3Spec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string) error {
235 v3, err := builder.BuildOpenAPIV3(crd, versionName, builder.Options{
236 V2: false,
237 IncludeSelectableFields: utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceFieldSelectors),
238 })
239
240 if err != nil {
241 return err
242 }
243
244 c.updateCRDSpec(crd, name, versionName, v3)
245 return nil
246 }
247
248 func (c *Controller) addCustomResourceDefinition(obj interface{}) {
249 castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
250 klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
251 c.enqueue(castObj)
252 }
253
254 func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) {
255 castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
256 klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name)
257 c.enqueue(castNewObj)
258 }
259
260 func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
261 castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
262 if !ok {
263 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
264 if !ok {
265 klog.Errorf("Couldn't get object from tombstone %#v", obj)
266 return
267 }
268 castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
269 if !ok {
270 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
271 return
272 }
273 }
274 klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
275 c.enqueue(castObj)
276 }
277
278 func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
279 c.queue.Add(obj.Name)
280 }
281
View as plain text