...

Source file src/k8s.io/kubernetes/pkg/controller/storageversionmigrator/resourceversion.go

Documentation: k8s.io/kubernetes/pkg/controller/storageversionmigrator

     1  /*
     2  Copyright 2024 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storageversionmigrator
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"k8s.io/apimachinery/pkg/api/meta"
    25  	"k8s.io/apimachinery/pkg/runtime/schema"
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	"k8s.io/client-go/discovery"
    28  	"k8s.io/client-go/metadata"
    29  	"k8s.io/client-go/tools/cache"
    30  	"k8s.io/client-go/util/workqueue"
    31  	"k8s.io/klog/v2"
    32  	"k8s.io/kubernetes/pkg/controller"
    33  
    34  	svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
    35  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    38  	svminformers "k8s.io/client-go/informers/storagemigration/v1alpha1"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	svmlisters "k8s.io/client-go/listers/storagemigration/v1alpha1"
    41  )
    42  
    43  const (
    44  	// this name is guaranteed to be not present in the cluster as it not a valid namespace name
    45  	fakeSVMNamespaceName          string = "@fake:svm_ns!"
    46  	ResourceVersionControllerName string = "resource-version-controller"
    47  )
    48  
    49  // ResourceVersionController adds the resource version obtained from a randomly nonexistent namespace
    50  // to the SVM status before the migration is initiated. This resource version is utilized for checking
    51  // freshness of GC cache before the migration is initiated.
    52  type ResourceVersionController struct {
    53  	discoveryClient *discovery.DiscoveryClient
    54  	metadataClient  metadata.Interface
    55  	svmListers      svmlisters.StorageVersionMigrationLister
    56  	svmSynced       cache.InformerSynced
    57  	queue           workqueue.RateLimitingInterface
    58  	kubeClient      clientset.Interface
    59  	mapper          meta.ResettableRESTMapper
    60  }
    61  
    62  func NewResourceVersionController(
    63  	ctx context.Context,
    64  	kubeClient clientset.Interface,
    65  	discoveryClient *discovery.DiscoveryClient,
    66  	metadataClient metadata.Interface,
    67  	svmInformer svminformers.StorageVersionMigrationInformer,
    68  	mapper meta.ResettableRESTMapper,
    69  ) *ResourceVersionController {
    70  	logger := klog.FromContext(ctx)
    71  
    72  	rvController := &ResourceVersionController{
    73  		kubeClient:      kubeClient,
    74  		discoveryClient: discoveryClient,
    75  		metadataClient:  metadataClient,
    76  		svmListers:      svmInformer.Lister(),
    77  		svmSynced:       svmInformer.Informer().HasSynced,
    78  		mapper:          mapper,
    79  		queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ResourceVersionControllerName),
    80  	}
    81  
    82  	_, _ = svmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    83  		AddFunc: func(obj interface{}) {
    84  			rvController.addSVM(logger, obj)
    85  		},
    86  		UpdateFunc: func(oldObj, newObj interface{}) {
    87  			rvController.updateSVM(logger, oldObj, newObj)
    88  		},
    89  	})
    90  
    91  	return rvController
    92  }
    93  
    94  func (rv *ResourceVersionController) addSVM(logger klog.Logger, obj interface{}) {
    95  	svm := obj.(*svmv1alpha1.StorageVersionMigration)
    96  	logger.V(4).Info("Adding", "svm", klog.KObj(svm))
    97  	rv.enqueue(svm)
    98  }
    99  
   100  func (rv *ResourceVersionController) updateSVM(logger klog.Logger, oldObj, newObj interface{}) {
   101  	oldSVM := oldObj.(*svmv1alpha1.StorageVersionMigration)
   102  	newSVM := newObj.(*svmv1alpha1.StorageVersionMigration)
   103  	logger.V(4).Info("Updating", "svm", klog.KObj(oldSVM))
   104  	rv.enqueue(newSVM)
   105  }
   106  
   107  func (rv *ResourceVersionController) enqueue(svm *svmv1alpha1.StorageVersionMigration) {
   108  	key, err := controller.KeyFunc(svm)
   109  	if err != nil {
   110  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %w", svm, err))
   111  		return
   112  	}
   113  
   114  	rv.queue.Add(key)
   115  }
   116  
   117  func (rv *ResourceVersionController) Run(ctx context.Context) {
   118  	defer utilruntime.HandleCrash()
   119  	defer rv.queue.ShutDown()
   120  
   121  	logger := klog.FromContext(ctx)
   122  	logger.Info("Starting", "controller", ResourceVersionControllerName)
   123  	defer logger.Info("Shutting down", "controller", ResourceVersionControllerName)
   124  
   125  	if !cache.WaitForNamedCacheSync(ResourceVersionControllerName, ctx.Done(), rv.svmSynced) {
   126  		return
   127  	}
   128  
   129  	go wait.UntilWithContext(ctx, rv.worker, time.Second)
   130  
   131  	<-ctx.Done()
   132  }
   133  
   134  func (rv *ResourceVersionController) worker(ctx context.Context) {
   135  	for rv.processNext(ctx) {
   136  	}
   137  }
   138  
   139  func (rv *ResourceVersionController) processNext(ctx context.Context) bool {
   140  	eKey, quit := rv.queue.Get()
   141  	if quit {
   142  		return false
   143  	}
   144  	defer rv.queue.Done(eKey)
   145  
   146  	key := eKey.(string)
   147  	err := rv.sync(ctx, key)
   148  	if err == nil {
   149  		rv.queue.Forget(key)
   150  		return true
   151  	}
   152  
   153  	klog.FromContext(ctx).V(2).Info("Error syncing SVM resource, retrying", "svm", key, "err", err)
   154  	rv.queue.AddRateLimited(key)
   155  
   156  	return true
   157  }
   158  
   159  func (rv *ResourceVersionController) sync(ctx context.Context, key string) error {
   160  	logger := klog.FromContext(ctx)
   161  	startTime := time.Now()
   162  
   163  	// SVM is a cluster scoped resource so we don't care about the namespace
   164  	_, name, err := cache.SplitMetaNamespaceKey(key)
   165  	if err != nil {
   166  		return err
   167  	}
   168  
   169  	svm, err := rv.svmListers.Get(name)
   170  	if apierrors.IsNotFound(err) {
   171  		// no work to do, don't fail and requeue
   172  		return nil
   173  	}
   174  	if err != nil {
   175  		return err
   176  	}
   177  	// working with copy to avoid race condition between this and migration controller
   178  	toBeProcessedSVM := svm.DeepCopy()
   179  	gvr := getGVRFromResource(toBeProcessedSVM)
   180  
   181  	if IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded) || IsConditionTrue(toBeProcessedSVM, svmv1alpha1.MigrationFailed) {
   182  		logger.V(4).Info("Migration has already succeeded or failed previously, skipping", "svm", name)
   183  		return nil
   184  	}
   185  
   186  	if len(toBeProcessedSVM.Status.ResourceVersion) != 0 {
   187  		logger.V(4).Info("Resource version is already set", "svm", name)
   188  		return nil
   189  	}
   190  
   191  	exists, err := rv.resourceExists(gvr)
   192  	if err != nil {
   193  		return err
   194  	}
   195  	if !exists {
   196  		_, err = rv.kubeClient.StoragemigrationV1alpha1().
   197  			StorageVersionMigrations().
   198  			UpdateStatus(
   199  				ctx,
   200  				setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
   201  				metav1.UpdateOptions{},
   202  			)
   203  		if err != nil {
   204  			return err
   205  		}
   206  
   207  		return nil
   208  	}
   209  
   210  	toBeProcessedSVM.Status.ResourceVersion, err = rv.getLatestResourceVersion(gvr, ctx)
   211  	if err != nil {
   212  		return err
   213  	}
   214  
   215  	_, err = rv.kubeClient.StoragemigrationV1alpha1().
   216  		StorageVersionMigrations().
   217  		UpdateStatus(ctx, toBeProcessedSVM, metav1.UpdateOptions{})
   218  	if err != nil {
   219  		return fmt.Errorf("error updating status for %s: %w", toBeProcessedSVM.Name, err)
   220  	}
   221  
   222  	logger.V(4).Info("Resource version has been successfully added", "svm", key, "elapsed", time.Since(startTime))
   223  	return nil
   224  }
   225  
   226  func (rv *ResourceVersionController) getLatestResourceVersion(gvr schema.GroupVersionResource, ctx context.Context) (string, error) {
   227  	isResourceNamespaceScoped, err := rv.isResourceNamespaceScoped(gvr)
   228  	if err != nil {
   229  		return "", err
   230  	}
   231  
   232  	var randomList *metav1.PartialObjectMetadataList
   233  	if isResourceNamespaceScoped {
   234  		// get list resourceVersion from random non-existent namesapce for the given GVR
   235  		randomList, err = rv.metadataClient.Resource(gvr).
   236  			Namespace(fakeSVMNamespaceName).
   237  			List(ctx, metav1.ListOptions{
   238  				Limit: 1,
   239  			})
   240  	} else {
   241  		randomList, err = rv.metadataClient.Resource(gvr).
   242  			List(ctx, metav1.ListOptions{
   243  				Limit: 1,
   244  			})
   245  	}
   246  	if err != nil {
   247  		// error here is very abstract. adding additional context for better debugging
   248  		return "", fmt.Errorf("error getting latest resourceVersion for %s: %w", gvr.String(), err)
   249  	}
   250  
   251  	return randomList.GetResourceVersion(), err
   252  }
   253  
   254  func (rv *ResourceVersionController) resourceExists(gvr schema.GroupVersionResource) (bool, error) {
   255  	mapperGVRs, err := rv.mapper.ResourcesFor(gvr)
   256  	if err != nil {
   257  		return false, err
   258  	}
   259  
   260  	for _, mapperGVR := range mapperGVRs {
   261  		if mapperGVR.Group == gvr.Group &&
   262  			mapperGVR.Version == gvr.Version &&
   263  			mapperGVR.Resource == gvr.Resource {
   264  			return true, nil
   265  		}
   266  	}
   267  
   268  	return false, nil
   269  }
   270  
   271  func (rv *ResourceVersionController) isResourceNamespaceScoped(gvr schema.GroupVersionResource) (bool, error) {
   272  	resourceList, err := rv.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String())
   273  	if err != nil {
   274  		return false, err
   275  	}
   276  
   277  	for _, resource := range resourceList.APIResources {
   278  		if resource.Name == gvr.Resource {
   279  			return resource.Namespaced, nil
   280  		}
   281  	}
   282  
   283  	return false, fmt.Errorf("resource %q not found", gvr.String())
   284  }
   285  

View as plain text