...

Source file src/k8s.io/kubernetes/pkg/controller/storageversiongc/gc_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/storageversiongc

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storageversiongc
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
    25  	coordinationv1 "k8s.io/api/coordination/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	"k8s.io/apiserver/pkg/storageversion"
    32  	apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1"
    33  	coordinformers "k8s.io/client-go/informers/coordination/v1"
    34  	"k8s.io/client-go/kubernetes"
    35  	coordlisters "k8s.io/client-go/listers/coordination/v1"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/util/workqueue"
    38  	"k8s.io/kubernetes/pkg/controlplane"
    39  
    40  	"k8s.io/klog/v2"
    41  )
    42  
    43  // Controller watches kube-apiserver leases and storageversions, and delete stale
    44  // storage version entries and objects.
    45  type Controller struct {
    46  	kubeclientset kubernetes.Interface
    47  
    48  	leaseLister  coordlisters.LeaseLister
    49  	leasesSynced cache.InformerSynced
    50  
    51  	storageVersionSynced cache.InformerSynced
    52  
    53  	leaseQueue          workqueue.RateLimitingInterface
    54  	storageVersionQueue workqueue.RateLimitingInterface
    55  }
    56  
    57  // NewStorageVersionGC creates a new Controller.
    58  func NewStorageVersionGC(ctx context.Context, clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller {
    59  	c := &Controller{
    60  		kubeclientset:        clientset,
    61  		leaseLister:          leaseInformer.Lister(),
    62  		leasesSynced:         leaseInformer.Informer().HasSynced,
    63  		storageVersionSynced: storageVersionInformer.Informer().HasSynced,
    64  		leaseQueue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"),
    65  		storageVersionQueue:  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"),
    66  	}
    67  	logger := klog.FromContext(ctx)
    68  	leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    69  		DeleteFunc: func(obj interface{}) {
    70  			c.onDeleteLease(logger, obj)
    71  		},
    72  	})
    73  	// use the default resync period from the informer
    74  	storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    75  		AddFunc: func(obj interface{}) {
    76  			c.onAddStorageVersion(logger, obj)
    77  		},
    78  		UpdateFunc: func(old, newObj interface{}) {
    79  			c.onUpdateStorageVersion(logger, old, newObj)
    80  		},
    81  	})
    82  
    83  	return c
    84  }
    85  
    86  // Run starts one worker.
    87  func (c *Controller) Run(ctx context.Context) {
    88  	logger := klog.FromContext(ctx)
    89  	defer utilruntime.HandleCrash()
    90  	defer c.leaseQueue.ShutDown()
    91  	defer c.storageVersionQueue.ShutDown()
    92  	defer logger.Info("Shutting down storage version garbage collector")
    93  
    94  	logger.Info("Starting storage version garbage collector")
    95  
    96  	if !cache.WaitForCacheSync(ctx.Done(), c.leasesSynced, c.storageVersionSynced) {
    97  		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
    98  		return
    99  	}
   100  
   101  	// Identity lease deletion and storageversion update don't happen too often. Start one
   102  	// worker for each of them.
   103  	// runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker
   104  	// handles storageversion creation/update with non-existing id. The latter should rarely
   105  	// happen. It's okay for the two workers to conflict on update.
   106  	go wait.UntilWithContext(ctx, c.runLeaseWorker, time.Second)
   107  	go wait.UntilWithContext(ctx, c.runStorageVersionWorker, time.Second)
   108  
   109  	<-ctx.Done()
   110  }
   111  
   112  func (c *Controller) runLeaseWorker(ctx context.Context) {
   113  	for c.processNextLease(ctx) {
   114  	}
   115  }
   116  
   117  func (c *Controller) processNextLease(ctx context.Context) bool {
   118  	key, quit := c.leaseQueue.Get()
   119  	if quit {
   120  		return false
   121  	}
   122  	defer c.leaseQueue.Done(key)
   123  
   124  	err := c.processDeletedLease(ctx, key.(string))
   125  	if err == nil {
   126  		c.leaseQueue.Forget(key)
   127  		return true
   128  	}
   129  
   130  	utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err))
   131  	c.leaseQueue.AddRateLimited(key)
   132  	return true
   133  }
   134  
   135  func (c *Controller) runStorageVersionWorker(ctx context.Context) {
   136  	for c.processNextStorageVersion(ctx) {
   137  	}
   138  }
   139  
   140  func (c *Controller) processNextStorageVersion(ctx context.Context) bool {
   141  	key, quit := c.storageVersionQueue.Get()
   142  	if quit {
   143  		return false
   144  	}
   145  	defer c.storageVersionQueue.Done(key)
   146  
   147  	err := c.syncStorageVersion(ctx, key.(string))
   148  	if err == nil {
   149  		c.storageVersionQueue.Forget(key)
   150  		return true
   151  	}
   152  
   153  	utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err))
   154  	c.storageVersionQueue.AddRateLimited(key)
   155  	return true
   156  }
   157  
   158  func (c *Controller) processDeletedLease(ctx context.Context, name string) error {
   159  	_, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, name, metav1.GetOptions{})
   160  	// the lease isn't deleted, nothing we need to do here
   161  	if err == nil {
   162  		return nil
   163  	}
   164  	if !apierrors.IsNotFound(err) {
   165  		return err
   166  	}
   167  	// the frequency of this call won't be too high because we only trigger on identity lease deletions
   168  	storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(ctx, metav1.ListOptions{})
   169  	if err != nil {
   170  		return err
   171  	}
   172  
   173  	var errors []error
   174  	for _, sv := range storageVersionList.Items {
   175  		var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
   176  		hasStaleRecord := false
   177  		for _, ssv := range sv.Status.StorageVersions {
   178  			if ssv.APIServerID == name {
   179  				hasStaleRecord = true
   180  				continue
   181  			}
   182  			serverStorageVersions = append(serverStorageVersions, ssv)
   183  		}
   184  		if !hasStaleRecord {
   185  			continue
   186  		}
   187  		if err := c.updateOrDeleteStorageVersion(ctx, &sv, serverStorageVersions); err != nil {
   188  			errors = append(errors, err)
   189  		}
   190  	}
   191  
   192  	return utilerrors.NewAggregate(errors)
   193  }
   194  
   195  func (c *Controller) syncStorageVersion(ctx context.Context, name string) error {
   196  	sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(ctx, name, metav1.GetOptions{})
   197  	if apierrors.IsNotFound(err) {
   198  		// The problematic storage version that was added/updated recently is gone.
   199  		// Nothing we need to do here.
   200  		return nil
   201  	}
   202  	if err != nil {
   203  		return err
   204  	}
   205  
   206  	hasInvalidID := false
   207  	var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion
   208  	for _, v := range sv.Status.StorageVersions {
   209  		lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{})
   210  		if err != nil || lease == nil || lease.Labels == nil ||
   211  			lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
   212  			// We cannot find a corresponding identity lease from apiserver as well.
   213  			// We need to clean up this storage version.
   214  			hasInvalidID = true
   215  			continue
   216  		}
   217  		serverStorageVersions = append(serverStorageVersions, v)
   218  	}
   219  	if !hasInvalidID {
   220  		return nil
   221  	}
   222  	return c.updateOrDeleteStorageVersion(ctx, sv, serverStorageVersions)
   223  }
   224  
   225  func (c *Controller) onAddStorageVersion(logger klog.Logger, obj interface{}) {
   226  	castObj := obj.(*apiserverinternalv1alpha1.StorageVersion)
   227  	c.enqueueStorageVersion(logger, castObj)
   228  }
   229  
   230  func (c *Controller) onUpdateStorageVersion(logger klog.Logger, oldObj, newObj interface{}) {
   231  	castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion)
   232  	c.enqueueStorageVersion(logger, castNewObj)
   233  }
   234  
   235  // enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver
   236  func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverinternalv1alpha1.StorageVersion) {
   237  	for _, sv := range obj.Status.StorageVersions {
   238  		lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
   239  		if err != nil || lease == nil || lease.Labels == nil ||
   240  			lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer {
   241  			// we cannot find a corresponding identity lease in cache, enqueue the storageversion
   242  			logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name)
   243  			c.storageVersionQueue.Add(obj.Name)
   244  			return
   245  		}
   246  	}
   247  }
   248  
   249  func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) {
   250  	castObj, ok := obj.(*coordinationv1.Lease)
   251  	if !ok {
   252  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   253  		if !ok {
   254  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   255  			return
   256  		}
   257  		castObj, ok = tombstone.Obj.(*coordinationv1.Lease)
   258  		if !ok {
   259  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj))
   260  			return
   261  		}
   262  	}
   263  
   264  	if castObj.Namespace == metav1.NamespaceSystem &&
   265  		castObj.Labels != nil &&
   266  		castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer {
   267  		logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name)
   268  		c.enqueueLease(castObj)
   269  	}
   270  }
   271  
   272  func (c *Controller) enqueueLease(obj *coordinationv1.Lease) {
   273  	c.leaseQueue.Add(obj.Name)
   274  }
   275  
   276  func (c *Controller) updateOrDeleteStorageVersion(ctx context.Context, sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error {
   277  	if len(serverStorageVersions) == 0 {
   278  		return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete(
   279  			ctx, sv.Name, metav1.DeleteOptions{})
   280  	}
   281  	sv.Status.StorageVersions = serverStorageVersions
   282  	storageversion.SetCommonEncodingVersion(sv)
   283  	_, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus(
   284  		ctx, sv, metav1.UpdateOptions{})
   285  	return err
   286  }
   287  

View as plain text