1
16
17 package nonstructuralschema
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "time"
24
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/apimachinery/pkg/util/validation/field"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/client-go/tools/cache"
31 "k8s.io/client-go/util/workqueue"
32 "k8s.io/klog/v2"
33
34 apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
35 apiextensionsinternal "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
36 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
37 "k8s.io/apiextensions-apiserver/pkg/apiserver/schema"
38 client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
39 informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
40 listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
41 )
42
43
44 type ConditionController struct {
45 crdClient client.CustomResourceDefinitionsGetter
46
47 crdLister listers.CustomResourceDefinitionLister
48 crdSynced cache.InformerSynced
49
50
51 syncFn func(key string) error
52
53 queue workqueue.RateLimitingInterface
54
55
56
57 lastSeenGenerationLock sync.Mutex
58 lastSeenGeneration map[string]int64
59 }
60
61
62 func NewConditionController(
63 crdInformer informers.CustomResourceDefinitionInformer,
64 crdClient client.CustomResourceDefinitionsGetter,
65 ) *ConditionController {
66 c := &ConditionController{
67 crdClient: crdClient,
68 crdLister: crdInformer.Lister(),
69 crdSynced: crdInformer.Informer().HasSynced,
70 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "non_structural_schema_condition_controller"),
71 lastSeenGeneration: map[string]int64{},
72 }
73
74 crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
75 AddFunc: c.addCustomResourceDefinition,
76 UpdateFunc: c.updateCustomResourceDefinition,
77 DeleteFunc: c.deleteCustomResourceDefinition,
78 })
79
80 c.syncFn = c.sync
81
82 return c
83 }
84
85 func calculateCondition(in *apiextensionsv1.CustomResourceDefinition) *apiextensionsv1.CustomResourceDefinitionCondition {
86 cond := &apiextensionsv1.CustomResourceDefinitionCondition{
87 Type: apiextensionsv1.NonStructuralSchema,
88 Status: apiextensionsv1.ConditionUnknown,
89 }
90
91 allErrs := field.ErrorList{}
92
93 if in.Spec.PreserveUnknownFields {
94 allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "preserveUnknownFields"),
95 in.Spec.PreserveUnknownFields,
96 fmt.Sprint("must be false")))
97 }
98
99 for i, v := range in.Spec.Versions {
100 if v.Schema == nil || v.Schema.OpenAPIV3Schema == nil {
101 continue
102 }
103
104 internalSchema := &apiextensionsinternal.CustomResourceValidation{}
105 if err := apiextensionsv1.Convert_v1_CustomResourceValidation_To_apiextensions_CustomResourceValidation(v.Schema, internalSchema, nil); err != nil {
106 klog.Errorf("failed to convert CRD validation to internal version: %v", err)
107 continue
108 }
109 s, err := schema.NewStructural(internalSchema.OpenAPIV3Schema)
110 if err != nil {
111 cond.Reason = "StructuralError"
112 cond.Message = fmt.Sprintf("failed to check validation schema for version %s: %v", v.Name, err)
113 return cond
114 }
115
116 pth := field.NewPath("spec", "versions").Index(i).Child("schema", "openAPIV3Schema")
117
118 allErrs = append(allErrs, schema.ValidateStructural(pth, s)...)
119 }
120
121 if len(allErrs) == 0 {
122 return nil
123 }
124
125 cond.Status = apiextensionsv1.ConditionTrue
126 cond.Reason = "Violations"
127 cond.Message = allErrs.ToAggregate().Error()
128
129 return cond
130 }
131
132 func (c *ConditionController) sync(key string) error {
133 inCustomResourceDefinition, err := c.crdLister.Get(key)
134 if apierrors.IsNotFound(err) {
135 return nil
136 }
137 if err != nil {
138 return err
139 }
140
141
142 c.lastSeenGenerationLock.Lock()
143 lastSeen, seenBefore := c.lastSeenGeneration[inCustomResourceDefinition.Name]
144 c.lastSeenGenerationLock.Unlock()
145 if seenBefore && inCustomResourceDefinition.Generation <= lastSeen {
146 return nil
147 }
148
149
150 cond := calculateCondition(inCustomResourceDefinition)
151 old := apiextensionshelpers.FindCRDCondition(inCustomResourceDefinition, apiextensionsv1.NonStructuralSchema)
152
153 if cond == nil && old == nil {
154 return nil
155 }
156 if cond != nil && old != nil && old.Status == cond.Status && old.Reason == cond.Reason && old.Message == cond.Message {
157 return nil
158 }
159
160
161 crd := inCustomResourceDefinition.DeepCopy()
162 if cond == nil {
163 apiextensionshelpers.RemoveCRDCondition(crd, apiextensionsv1.NonStructuralSchema)
164 } else {
165 cond.LastTransitionTime = metav1.NewTime(time.Now())
166 apiextensionshelpers.SetCRDCondition(crd, *cond)
167 }
168
169 _, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
170 if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
171
172 return nil
173 }
174 if err != nil {
175 return err
176 }
177
178
179
180 c.lastSeenGenerationLock.Lock()
181 defer c.lastSeenGenerationLock.Unlock()
182 c.lastSeenGeneration[crd.Name] = crd.Generation
183
184 return nil
185 }
186
187
188 func (c *ConditionController) Run(workers int, stopCh <-chan struct{}) {
189 defer utilruntime.HandleCrash()
190 defer c.queue.ShutDown()
191
192 klog.Infof("Starting NonStructuralSchemaConditionController")
193 defer klog.Infof("Shutting down NonStructuralSchemaConditionController")
194
195 if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
196 return
197 }
198
199 for i := 0; i < workers; i++ {
200 go wait.Until(c.runWorker, time.Second, stopCh)
201 }
202
203 <-stopCh
204 }
205
206 func (c *ConditionController) runWorker() {
207 for c.processNextWorkItem() {
208 }
209 }
210
211
212 func (c *ConditionController) processNextWorkItem() bool {
213 key, quit := c.queue.Get()
214 if quit {
215 return false
216 }
217 defer c.queue.Done(key)
218
219 err := c.syncFn(key.(string))
220 if err == nil {
221 c.queue.Forget(key)
222 return true
223 }
224
225 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
226 c.queue.AddRateLimited(key)
227
228 return true
229 }
230
231 func (c *ConditionController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
232 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
233 if err != nil {
234 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
235 return
236 }
237
238 c.queue.Add(key)
239 }
240
241 func (c *ConditionController) addCustomResourceDefinition(obj interface{}) {
242 castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
243 klog.V(4).Infof("Adding %s", castObj.Name)
244 c.enqueue(castObj)
245 }
246
247 func (c *ConditionController) updateCustomResourceDefinition(obj, _ interface{}) {
248 castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
249 klog.V(4).Infof("Updating %s", castObj.Name)
250 c.enqueue(castObj)
251 }
252
253 func (c *ConditionController) deleteCustomResourceDefinition(obj interface{}) {
254 castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
255 if !ok {
256 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
257 if !ok {
258 klog.Errorf("Couldn't get object from tombstone %#v", obj)
259 return
260 }
261 castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
262 if !ok {
263 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
264 return
265 }
266 }
267
268 c.lastSeenGenerationLock.Lock()
269 defer c.lastSeenGenerationLock.Unlock()
270 delete(c.lastSeenGeneration, castObj.Name)
271 }
272
View as plain text