1
16
17 package finalizer
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "time"
24
25 "k8s.io/klog/v2"
26
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 "k8s.io/apimachinery/pkg/api/meta"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31 "k8s.io/apimachinery/pkg/runtime/schema"
32 utilerrors "k8s.io/apimachinery/pkg/util/errors"
33 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
34 "k8s.io/apimachinery/pkg/util/sets"
35 "k8s.io/apimachinery/pkg/util/wait"
36 genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
37 "k8s.io/apiserver/pkg/registry/rest"
38 "k8s.io/client-go/tools/cache"
39 "k8s.io/client-go/util/workqueue"
40
41 apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
42 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
43 client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
44 informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
45 listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
46 )
47
48
49
50
51 func OverlappingBuiltInResources() map[schema.GroupResource]bool {
52 return map[schema.GroupResource]bool{
53 {Group: "apiregistration.k8s.io", Resource: "apiservices"}: true,
54 {Group: "apiextensions.k8s.io", Resource: "customresourcedefinitions"}: true,
55 }
56 }
57
58
59 type CRDFinalizer struct {
60 crdClient client.CustomResourceDefinitionsGetter
61 crClientGetter CRClientGetter
62
63 crdLister listers.CustomResourceDefinitionLister
64 crdSynced cache.InformerSynced
65
66
67 syncFn func(key string) error
68
69 queue workqueue.RateLimitingInterface
70 }
71
72
73 type ListerCollectionDeleter interface {
74 rest.Lister
75 rest.CollectionDeleter
76 }
77
78
79 type CRClientGetter interface {
80
81
82 GetCustomResourceListerCollectionDeleter(crd *apiextensionsv1.CustomResourceDefinition) (ListerCollectionDeleter, error)
83 }
84
85
86 func NewCRDFinalizer(
87 crdInformer informers.CustomResourceDefinitionInformer,
88 crdClient client.CustomResourceDefinitionsGetter,
89 crClientGetter CRClientGetter,
90 ) *CRDFinalizer {
91 c := &CRDFinalizer{
92 crdClient: crdClient,
93 crdLister: crdInformer.Lister(),
94 crdSynced: crdInformer.Informer().HasSynced,
95 crClientGetter: crClientGetter,
96 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_finalizer"),
97 }
98
99 crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
100 AddFunc: c.addCustomResourceDefinition,
101 UpdateFunc: c.updateCustomResourceDefinition,
102 })
103
104 c.syncFn = c.sync
105
106 return c
107 }
108
109 func (c *CRDFinalizer) sync(key string) error {
110 cachedCRD, err := c.crdLister.Get(key)
111 if apierrors.IsNotFound(err) {
112 return nil
113 }
114 if err != nil {
115 return err
116 }
117
118
119 if cachedCRD.DeletionTimestamp.IsZero() || !apiextensionshelpers.CRDHasFinalizer(cachedCRD, apiextensionsv1.CustomResourceCleanupFinalizer) {
120 return nil
121 }
122
123 crd := cachedCRD.DeepCopy()
124
125
126 apiextensionshelpers.SetCRDCondition(crd, apiextensionsv1.CustomResourceDefinitionCondition{
127 Type: apiextensionsv1.Terminating,
128 Status: apiextensionsv1.ConditionTrue,
129 Reason: "InstanceDeletionInProgress",
130 Message: "CustomResource deletion is in progress",
131 })
132 crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
133 if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
134
135 return nil
136 }
137 if err != nil {
138 return err
139 }
140
141
142
143 if OverlappingBuiltInResources()[schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}] {
144
145 apiextensionshelpers.SetCRDCondition(crd, apiextensionsv1.CustomResourceDefinitionCondition{
146 Type: apiextensionsv1.Terminating,
147 Status: apiextensionsv1.ConditionFalse,
148 Reason: "OverlappingBuiltInResource",
149 Message: "instances overlap with built-in resources in storage",
150 })
151 } else if apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
152 cond, deleteErr := c.deleteInstances(crd)
153 apiextensionshelpers.SetCRDCondition(crd, cond)
154 if deleteErr != nil {
155 if _, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{}); err != nil {
156 utilruntime.HandleError(err)
157 }
158 return deleteErr
159 }
160 } else {
161 apiextensionshelpers.SetCRDCondition(crd, apiextensionsv1.CustomResourceDefinitionCondition{
162 Type: apiextensionsv1.Terminating,
163 Status: apiextensionsv1.ConditionFalse,
164 Reason: "NeverEstablished",
165 Message: "resource was never established",
166 })
167 }
168
169 apiextensionshelpers.CRDRemoveFinalizer(crd, apiextensionsv1.CustomResourceCleanupFinalizer)
170 _, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
171 if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
172
173 return nil
174 }
175 return err
176 }
177
178 func (c *CRDFinalizer) deleteInstances(crd *apiextensionsv1.CustomResourceDefinition) (apiextensionsv1.CustomResourceDefinitionCondition, error) {
179
180
181
182 crClient, err := c.crClientGetter.GetCustomResourceListerCollectionDeleter(crd)
183 if err != nil {
184 err = fmt.Errorf("unable to find a custom resource client for %s.%s: %v", crd.Status.AcceptedNames.Plural, crd.Spec.Group, err)
185 return apiextensionsv1.CustomResourceDefinitionCondition{
186 Type: apiextensionsv1.Terminating,
187 Status: apiextensionsv1.ConditionTrue,
188 Reason: "InstanceDeletionFailed",
189 Message: fmt.Sprintf("could not list instances: %v", err),
190 }, err
191 }
192
193 ctx := genericapirequest.NewContext()
194 allResources, err := crClient.List(ctx, nil)
195 if err != nil {
196 return apiextensionsv1.CustomResourceDefinitionCondition{
197 Type: apiextensionsv1.Terminating,
198 Status: apiextensionsv1.ConditionTrue,
199 Reason: "InstanceDeletionFailed",
200 Message: fmt.Sprintf("could not list instances: %v", err),
201 }, err
202 }
203
204 deletedNamespaces := sets.String{}
205 deleteErrors := []error{}
206 for _, item := range allResources.(*unstructured.UnstructuredList).Items {
207 metadata, err := meta.Accessor(&item)
208 if err != nil {
209 utilruntime.HandleError(err)
210 continue
211 }
212 if deletedNamespaces.Has(metadata.GetNamespace()) {
213 continue
214 }
215
216 deletedNamespaces.Insert(metadata.GetNamespace())
217 nsCtx := genericapirequest.WithNamespace(ctx, metadata.GetNamespace())
218 if _, err := crClient.DeleteCollection(nsCtx, rest.ValidateAllObjectFunc, nil, nil); err != nil {
219 deleteErrors = append(deleteErrors, err)
220 continue
221 }
222 }
223 if deleteError := utilerrors.NewAggregate(deleteErrors); deleteError != nil {
224 return apiextensionsv1.CustomResourceDefinitionCondition{
225 Type: apiextensionsv1.Terminating,
226 Status: apiextensionsv1.ConditionTrue,
227 Reason: "InstanceDeletionFailed",
228 Message: fmt.Sprintf("could not issue all deletes: %v", deleteError),
229 }, deleteError
230 }
231
232
233
234
235 err = wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
236 listObj, err := crClient.List(ctx, nil)
237 if err != nil {
238 return false, err
239 }
240 if len(listObj.(*unstructured.UnstructuredList).Items) == 0 {
241 return true, nil
242 }
243 klog.V(2).Infof("%s.%s waiting for %d items to be removed", crd.Status.AcceptedNames.Plural, crd.Spec.Group, len(listObj.(*unstructured.UnstructuredList).Items))
244 return false, nil
245 })
246 if err != nil {
247 return apiextensionsv1.CustomResourceDefinitionCondition{
248 Type: apiextensionsv1.Terminating,
249 Status: apiextensionsv1.ConditionTrue,
250 Reason: "InstanceDeletionCheck",
251 Message: fmt.Sprintf("could not confirm zero CustomResources remaining: %v", err),
252 }, err
253 }
254 return apiextensionsv1.CustomResourceDefinitionCondition{
255 Type: apiextensionsv1.Terminating,
256 Status: apiextensionsv1.ConditionFalse,
257 Reason: "InstanceDeletionCompleted",
258 Message: "removed all instances",
259 }, nil
260 }
261
262 func (c *CRDFinalizer) Run(workers int, stopCh <-chan struct{}) {
263 defer utilruntime.HandleCrash()
264 defer c.queue.ShutDown()
265
266 klog.Info("Starting CRDFinalizer")
267 defer klog.Info("Shutting down CRDFinalizer")
268
269 if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
270 return
271 }
272
273 for i := 0; i < workers; i++ {
274 go wait.Until(c.runWorker, time.Second, stopCh)
275 }
276
277 <-stopCh
278 }
279
280 func (c *CRDFinalizer) runWorker() {
281 for c.processNextWorkItem() {
282 }
283 }
284
285
286 func (c *CRDFinalizer) processNextWorkItem() bool {
287 key, quit := c.queue.Get()
288 if quit {
289 return false
290 }
291 defer c.queue.Done(key)
292
293 err := c.syncFn(key.(string))
294 if err == nil {
295 c.queue.Forget(key)
296 return true
297 }
298
299 utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
300 c.queue.AddRateLimited(key)
301
302 return true
303 }
304
305 func (c *CRDFinalizer) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
306 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
307 if err != nil {
308 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
309 return
310 }
311
312 c.queue.Add(key)
313 }
314
315 func (c *CRDFinalizer) addCustomResourceDefinition(obj interface{}) {
316 castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
317
318 if !castObj.DeletionTimestamp.IsZero() && apiextensionshelpers.CRDHasFinalizer(castObj, apiextensionsv1.CustomResourceCleanupFinalizer) {
319 c.enqueue(castObj)
320 }
321 }
322
323 func (c *CRDFinalizer) updateCustomResourceDefinition(oldObj, newObj interface{}) {
324 oldCRD := oldObj.(*apiextensionsv1.CustomResourceDefinition)
325 newCRD := newObj.(*apiextensionsv1.CustomResourceDefinition)
326
327 if newCRD.DeletionTimestamp.IsZero() || !apiextensionshelpers.CRDHasFinalizer(newCRD, apiextensionsv1.CustomResourceCleanupFinalizer) {
328 return
329 }
330
331
332 if oldCRD.ResourceVersion == newCRD.ResourceVersion {
333 c.enqueue(newCRD)
334 return
335 }
336
337
338
339
340
341 oldCopy := oldCRD.DeepCopy()
342 newCopy := newCRD.DeepCopy()
343 oldCopy.ResourceVersion = ""
344 newCopy.ResourceVersion = ""
345 apiextensionshelpers.RemoveCRDCondition(oldCopy, apiextensionsv1.Terminating)
346 apiextensionshelpers.RemoveCRDCondition(newCopy, apiextensionsv1.Terminating)
347
348 if !reflect.DeepEqual(oldCopy, newCopy) {
349 c.enqueue(newCRD)
350 }
351 }
352
View as plain text