...

Source file src/k8s.io/kubernetes/pkg/controller/volume/pvprotection/pv_protection_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/volume/pvprotection

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pvprotection
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	coreinformers "k8s.io/client-go/informers/core/v1"
    30  	clientset "k8s.io/client-go/kubernetes"
    31  	corelisters "k8s.io/client-go/listers/core/v1"
    32  	"k8s.io/client-go/tools/cache"
    33  	"k8s.io/client-go/util/workqueue"
    34  	"k8s.io/klog/v2"
    35  	"k8s.io/kubernetes/pkg/controller/volume/protectionutil"
    36  	"k8s.io/kubernetes/pkg/util/slice"
    37  	volumeutil "k8s.io/kubernetes/pkg/volume/util"
    38  )
    39  
    40  // Controller is controller that removes PVProtectionFinalizer
    41  // from PVs that are not bound to PVCs.
    42  type Controller struct {
    43  	client clientset.Interface
    44  
    45  	pvLister       corelisters.PersistentVolumeLister
    46  	pvListerSynced cache.InformerSynced
    47  
    48  	queue workqueue.RateLimitingInterface
    49  }
    50  
    51  // NewPVProtectionController returns a new *Controller.
    52  func NewPVProtectionController(logger klog.Logger, pvInformer coreinformers.PersistentVolumeInformer, cl clientset.Interface) *Controller {
    53  	e := &Controller{
    54  		client: cl,
    55  		queue:  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvprotection"),
    56  	}
    57  
    58  	e.pvLister = pvInformer.Lister()
    59  	e.pvListerSynced = pvInformer.Informer().HasSynced
    60  	pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    61  		AddFunc: func(obj interface{}) {
    62  			e.pvAddedUpdated(logger, obj)
    63  		},
    64  		UpdateFunc: func(old, new interface{}) {
    65  			e.pvAddedUpdated(logger, new)
    66  		},
    67  	})
    68  
    69  	return e
    70  }
    71  
    72  // Run runs the controller goroutines.
    73  func (c *Controller) Run(ctx context.Context, workers int) {
    74  	defer utilruntime.HandleCrash()
    75  	defer c.queue.ShutDown()
    76  
    77  	logger := klog.FromContext(ctx)
    78  	logger.Info("Starting PV protection controller")
    79  	defer logger.Info("Shutting down PV protection controller")
    80  
    81  	if !cache.WaitForNamedCacheSync("PV protection", ctx.Done(), c.pvListerSynced) {
    82  		return
    83  	}
    84  
    85  	for i := 0; i < workers; i++ {
    86  		go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    87  	}
    88  
    89  	<-ctx.Done()
    90  }
    91  
    92  func (c *Controller) runWorker(ctx context.Context) {
    93  	for c.processNextWorkItem(ctx) {
    94  	}
    95  }
    96  
    97  // processNextWorkItem deals with one pvcKey off the queue.  It returns false when it's time to quit.
    98  func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    99  	pvKey, quit := c.queue.Get()
   100  	if quit {
   101  		return false
   102  	}
   103  	defer c.queue.Done(pvKey)
   104  
   105  	pvName := pvKey.(string)
   106  
   107  	err := c.processPV(ctx, pvName)
   108  	if err == nil {
   109  		c.queue.Forget(pvKey)
   110  		return true
   111  	}
   112  
   113  	utilruntime.HandleError(fmt.Errorf("PV %v failed with : %v", pvKey, err))
   114  	c.queue.AddRateLimited(pvKey)
   115  
   116  	return true
   117  }
   118  
   119  func (c *Controller) processPV(ctx context.Context, pvName string) error {
   120  	logger := klog.FromContext(ctx)
   121  	logger.V(4).Info("Processing PV", "PV", klog.KRef("", pvName))
   122  	startTime := time.Now()
   123  	defer func() {
   124  		logger.V(4).Info("Finished processing PV", "PV", klog.KRef("", pvName), "cost", time.Since(startTime))
   125  	}()
   126  
   127  	pv, err := c.pvLister.Get(pvName)
   128  	if apierrors.IsNotFound(err) {
   129  		logger.V(4).Info("PV not found, ignoring", "PV", klog.KRef("", pvName))
   130  		return nil
   131  	}
   132  	if err != nil {
   133  		return err
   134  	}
   135  
   136  	if protectionutil.IsDeletionCandidate(pv, volumeutil.PVProtectionFinalizer) {
   137  		// PV should be deleted. Check if it's used and remove finalizer if
   138  		// it's not.
   139  		isUsed := c.isBeingUsed(pv)
   140  		if !isUsed {
   141  			return c.removeFinalizer(ctx, pv)
   142  		}
   143  		logger.V(4).Info("Keeping PV because it is being used", "PV", klog.KRef("", pvName))
   144  	}
   145  
   146  	if protectionutil.NeedToAddFinalizer(pv, volumeutil.PVProtectionFinalizer) {
   147  		// PV is not being deleted -> it should have the finalizer. The
   148  		// finalizer should be added by admission plugin, this is just to add
   149  		// the finalizer to old PVs that were created before the admission
   150  		// plugin was enabled.
   151  		return c.addFinalizer(ctx, pv)
   152  	}
   153  	return nil
   154  }
   155  
   156  func (c *Controller) addFinalizer(ctx context.Context, pv *v1.PersistentVolume) error {
   157  	pvClone := pv.DeepCopy()
   158  	pvClone.ObjectMeta.Finalizers = append(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer)
   159  	_, err := c.client.CoreV1().PersistentVolumes().Update(ctx, pvClone, metav1.UpdateOptions{})
   160  	logger := klog.FromContext(ctx)
   161  	if err != nil {
   162  		logger.V(3).Info("Error adding protection finalizer to PV", "PV", klog.KObj(pv), "err", err)
   163  		return err
   164  	}
   165  	logger.V(3).Info("Added protection finalizer to PV", "PV", klog.KObj(pv))
   166  	return nil
   167  }
   168  
   169  func (c *Controller) removeFinalizer(ctx context.Context, pv *v1.PersistentVolume) error {
   170  	pvClone := pv.DeepCopy()
   171  	pvClone.ObjectMeta.Finalizers = slice.RemoveString(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil)
   172  	_, err := c.client.CoreV1().PersistentVolumes().Update(ctx, pvClone, metav1.UpdateOptions{})
   173  	logger := klog.FromContext(ctx)
   174  	if err != nil {
   175  		logger.V(3).Info("Error removing protection finalizer from PV", "PV", klog.KObj(pv), "err", err)
   176  		return err
   177  	}
   178  	logger.V(3).Info("Removed protection finalizer from PV", "PV", klog.KObj(pv))
   179  	return nil
   180  }
   181  
   182  func (c *Controller) isBeingUsed(pv *v1.PersistentVolume) bool {
   183  	// check if PV is being bound to a PVC by its status
   184  	// the status will be updated by PV controller
   185  	if pv.Status.Phase == v1.VolumeBound {
   186  		// the PV is being used now
   187  		return true
   188  	}
   189  
   190  	return false
   191  }
   192  
   193  // pvAddedUpdated reacts to pv added/updated events
   194  func (c *Controller) pvAddedUpdated(logger klog.Logger, obj interface{}) {
   195  	pv, ok := obj.(*v1.PersistentVolume)
   196  	if !ok {
   197  		utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj))
   198  		return
   199  	}
   200  	logger.V(4).Info("Got event on PV", "PV", klog.KObj(pv))
   201  
   202  	if protectionutil.NeedToAddFinalizer(pv, volumeutil.PVProtectionFinalizer) || protectionutil.IsDeletionCandidate(pv, volumeutil.PVProtectionFinalizer) {
   203  		c.queue.Add(pv.Name)
   204  	}
   205  }
   206  

View as plain text