1
16
17 package namespace
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "golang.org/x/time/rate"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 coreinformers "k8s.io/client-go/informers/core/v1"
32 clientset "k8s.io/client-go/kubernetes"
33 corelisters "k8s.io/client-go/listers/core/v1"
34 "k8s.io/client-go/metadata"
35 "k8s.io/client-go/tools/cache"
36 "k8s.io/client-go/util/workqueue"
37 "k8s.io/kubernetes/pkg/controller"
38 "k8s.io/kubernetes/pkg/controller/namespace/deletion"
39
40 "k8s.io/klog/v2"
41 )
42
43 const (
44
45
46
47
48
49
50 namespaceDeletionGracePeriod = 5 * time.Second
51 )
52
53
54 type NamespaceController struct {
55
56 lister corelisters.NamespaceLister
57
58 listerSynced cache.InformerSynced
59
60 queue workqueue.RateLimitingInterface
61
62 namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
63 }
64
65
66 func NewNamespaceController(
67 ctx context.Context,
68 kubeClient clientset.Interface,
69 metadataClient metadata.Interface,
70 discoverResourcesFn func() ([]*metav1.APIResourceList, error),
71 namespaceInformer coreinformers.NamespaceInformer,
72 resyncPeriod time.Duration,
73 finalizerToken v1.FinalizerName) *NamespaceController {
74
75
76 namespaceController := &NamespaceController{
77 queue: workqueue.NewNamedRateLimitingQueue(nsControllerRateLimiter(), "namespace"),
78 namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(ctx, kubeClient.CoreV1().Namespaces(), metadataClient, kubeClient.CoreV1(), discoverResourcesFn, finalizerToken),
79 }
80
81
82 namespaceInformer.Informer().AddEventHandlerWithResyncPeriod(
83 cache.ResourceEventHandlerFuncs{
84 AddFunc: func(obj interface{}) {
85 namespace := obj.(*v1.Namespace)
86 namespaceController.enqueueNamespace(namespace)
87 },
88 UpdateFunc: func(oldObj, newObj interface{}) {
89 namespace := newObj.(*v1.Namespace)
90 namespaceController.enqueueNamespace(namespace)
91 },
92 },
93 resyncPeriod,
94 )
95 namespaceController.lister = namespaceInformer.Lister()
96 namespaceController.listerSynced = namespaceInformer.Informer().HasSynced
97
98 return namespaceController
99 }
100
101
102
103
104 func nsControllerRateLimiter() workqueue.RateLimiter {
105 return workqueue.NewMaxOfRateLimiter(
106
107 workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 60*time.Second),
108
109 &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
110 )
111 }
112
113
114
115 func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
116 key, err := controller.KeyFunc(obj)
117 if err != nil {
118 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
119 return
120 }
121
122 namespace := obj.(*v1.Namespace)
123
124 if namespace.DeletionTimestamp == nil || namespace.DeletionTimestamp.IsZero() {
125 return
126 }
127
128
129
130 nm.queue.AddAfter(key, namespaceDeletionGracePeriod)
131 }
132
133
134
135
136
137 func (nm *NamespaceController) worker(ctx context.Context) {
138 workFunc := func(ctx context.Context) bool {
139 key, quit := nm.queue.Get()
140 if quit {
141 return true
142 }
143 defer nm.queue.Done(key)
144
145 err := nm.syncNamespaceFromKey(ctx, key.(string))
146 if err == nil {
147
148 nm.queue.Forget(key)
149 return false
150 }
151
152 if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
153 t := estimate.Estimate/2 + 1
154 klog.FromContext(ctx).V(4).Info("Content remaining in namespace", "namespace", key, "waitSeconds", t)
155 nm.queue.AddAfter(key, time.Duration(t)*time.Second)
156 } else {
157
158 nm.queue.AddRateLimited(key)
159 utilruntime.HandleError(fmt.Errorf("deletion of namespace %v failed: %v", key, err))
160 }
161 return false
162 }
163 for {
164 quit := workFunc(ctx)
165
166 if quit {
167 return
168 }
169 }
170 }
171
172
173 func (nm *NamespaceController) syncNamespaceFromKey(ctx context.Context, key string) (err error) {
174 startTime := time.Now()
175 logger := klog.FromContext(ctx)
176 defer func() {
177 logger.V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
178 }()
179
180 namespace, err := nm.lister.Get(key)
181 if errors.IsNotFound(err) {
182 logger.Info("Namespace has been deleted", "namespace", key)
183 return nil
184 }
185 if err != nil {
186 utilruntime.HandleError(fmt.Errorf("Unable to retrieve namespace %v from store: %v", key, err))
187 return err
188 }
189 return nm.namespacedResourcesDeleter.Delete(ctx, namespace.Name)
190 }
191
192
193 func (nm *NamespaceController) Run(ctx context.Context, workers int) {
194 defer utilruntime.HandleCrash()
195 defer nm.queue.ShutDown()
196 logger := klog.FromContext(ctx)
197 logger.Info("Starting namespace controller")
198 defer logger.Info("Shutting down namespace controller")
199
200 if !cache.WaitForNamedCacheSync("namespace", ctx.Done(), nm.listerSynced) {
201 return
202 }
203
204 logger.V(5).Info("Starting workers of namespace controller")
205 for i := 0; i < workers; i++ {
206 go wait.UntilWithContext(ctx, nm.worker, time.Second)
207 }
208 <-ctx.Done()
209 }
210
View as plain text