1
16
17 package storageversiongc
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
25 coordinationv1 "k8s.io/api/coordination/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 utilerrors "k8s.io/apimachinery/pkg/util/errors"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/apiserver/pkg/storageversion"
32 apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1"
33 coordinformers "k8s.io/client-go/informers/coordination/v1"
34 "k8s.io/client-go/kubernetes"
35 coordlisters "k8s.io/client-go/listers/coordination/v1"
36 "k8s.io/client-go/tools/cache"
37 "k8s.io/client-go/util/workqueue"
38 "k8s.io/kubernetes/pkg/controlplane"
39
40 "k8s.io/klog/v2"
41 )
42
43
44
45 type Controller struct {
46 kubeclientset kubernetes.Interface
47
48 leaseLister coordlisters.LeaseLister
49 leasesSynced cache.InformerSynced
50
51 storageVersionSynced cache.InformerSynced
52
53 leaseQueue workqueue.RateLimitingInterface
54 storageVersionQueue workqueue.RateLimitingInterface
55 }
56
57
58 func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
59 c := &Controller{
60 kubeclientset: clientset,
61 leaseLister: leaseInformer.Lister(),
62 leasesSynced: leaseInformer.Informer().HasSynced,
63 storageVersionSynced: storageVersionInformer.Informer().HasSynced,
64 leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
65 storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
66 }
67 logger := klog.FromContext(ctx)
68 leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
69 DeleteFunc: func(obj interface{}) {
70 c.onDeleteLease(logger, obj)
71 },
72 })
73
74 storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
75 AddFunc: func(obj interface{}) {
76 c.onAddStorageVersion(logger, obj)
77 },
78 UpdateFunc: func(old, newObj interface{}) {
79 c.onUpdateStorageVersion(logger, old, newObj)
80 },
81 })
82
83 return c
84 }
85
86
87 func (c *Controller) Run(ctx context.Context) {
88 logger := klog.FromContext(ctx)
89 defer utilruntime.HandleCrash()
90 defer c.leaseQueue.ShutDown()
91 defer c.storageVersionQueue.ShutDown()
92 defer logger.Info("Shutting down storage version garbage collector")
93
94 logger.Info("Starting storage version garbage collector")
95
96 if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) {
97 utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
98 return
99 }
100
101
102
103
104
105
106 go wait.UntilWithContext(ctx, c.runLeaseWorker, time.Second)
107 go wait.UntilWithContext(ctx, c.runStorageVersionWorker, time.Second)
108
109 <-ctx.Done()
110 }
111
112 func (c *Controller) runLeaseWorker(ctx context.Context) {
113 for c.processNextLease(ctx) {
114 }
115 }
116
117 func (c *Controller) processNextLease(ctx context.Context) bool {
118 key, quit := c.leaseQueue.Get()
119 if quit {
120 return false
121 }
122 defer c.leaseQueue.Done(key)
123
124 err := c.processDeletedLease(ctx, key.(string))
125 if err == nil {
126 c.leaseQueue.Forget(key)
127 return true
128 }
129
130 utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err))
131 c.leaseQueue.AddRateLimited(key)
132 return true
133 }
134
135 func (c *Controller) runStorageVersionWorker(ctx context.Context) {
136 for c.processNextStorageVersion(ctx) {
137 }
138 }
139
140 func (c *Controller) processNextStorageVersion(ctx context.Context) bool {
141 key, quit := c.storageVersionQueue.Get()
142 if quit {
143 return false
144 }
145 defer c.storageVersionQueue.Done(key)
146
147 err := c.syncStorageVersion(ctx, key.(string))
148 if err == nil {
149 c.storageVersionQueue.Forget(key)
150 return true
151 }
152
153 utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err))
154 c.storageVersionQueue.AddRateLimited(key)
155 return true
156 }
157
158 func (c *Controller) processDeletedLease(ctx context.Context, name string) error {
159 _, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
160
161 if err == nil {
162 return nil
163 }
164 if !apierrors.IsNotFound(err) {
165 return err
166 }
167
168 storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(ctx, metav1.ListOptions{})
169 if err != nil {
170 return err
171 }
172
173 var errors []error
174 for _, sv := range storageVersionList.Items {
175 var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
176 hasStaleRecord := false
177 for _, ssv := range sv.Status.StorageVersions {
178 if ssv.APIServerID == name {
179 hasStaleRecord = true
180 continue
181 }
182 serverStorageVersions = append(serverStorageVersions, ssv)
183 }
184 if !hasStaleRecord {
185 continue
186 }
187 if err := c.updateOrDeleteStorageVersion(ctx, &sv, serverStorageVersions); err != nil {
188 errors = append(errors, err)
189 }
190 }
191
192 return utilerrors.NewAggregate(errors)
193 }
194
195 func (c *Controller) syncStorageVersion(ctx context.Context, name string) error {
196 sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(ctx, name, metav1.GetOptions{})
197 if apierrors.IsNotFound(err) {
198
199
200 return nil
201 }
202 if err != nil {
203 return err
204 }
205
206 hasInvalidID := false
207 var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
208 for _, v := range sv.Status.StorageVersions {
209 lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{})
210 if err != nil || lease == nil || lease.Labels == nil ||
211 lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
212
213
214 hasInvalidID = true
215 continue
216 }
217 serverStorageVersions = append(serverStorageVersions, v)
218 }
219 if !hasInvalidID {
220 return nil
221 }
222 return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions)
223 }
224
225 func (c *Controller) onAddStorageVersion(logger klog.Logger, obj interface{}) {
226 castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
227 c.enqueueStorageVersion(logger, castObj)
228 }
229
230 func (c *Controller) onUpdateStorageVersion(logger klog.Logger, oldObj, newObj interface{}) {
231 castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
232 c.enqueueStorageVersion(logger, castNewObj)
233 }
234
235
236 func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverinternalv1alpha1.StorageVersion) {
237 for _, sv := range obj.Status.StorageVersions {
238 lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
239 if err != nil || lease == nil || lease.Labels == nil ||
240 lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
241
242 logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name)
243 c.storageVersionQueue.Add(obj.Name)
244 return
245 }
246 }
247 }
248
249 func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) {
250 castObj, ok := obj.(*coordinationv1.Lease)
251 if !ok {
252 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
253 if !ok {
254 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
255 return
256 }
257 castObj, ok = tombstone.Obj.(*coordinationv1.Lease)
258 if !ok {
259 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj))
260 return
261 }
262 }
263
264 if castObj.Namespace == metav1.NamespaceSystem &&
265 castObj.Labels != nil &&
266 castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
267 logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name)
268 c.enqueueLease(castObj)
269 }
270 }
271
272 func (c *Controller) enqueueLease(obj *coordinationv1.Lease) {
273 c.leaseQueue.Add(obj.Name)
274 }
275
276 func (c *Controller) updateOrDeleteStorageVersion(ctx context.Context, sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error {
277 if len(serverStorageVersions) == 0 {
278 return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete(
279 ctx, sv.Name, metav1.DeleteOptions{})
280 }
281 sv.Status.StorageVersions = serverStorageVersions
282 storageversion.SetCommonEncodingVersion(sv)
283 _, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus(
284 ctx, sv, metav1.UpdateOptions{})
285 return err
286 }
287
View as plain text