...

Source file src/k8s.io/kubernetes/pkg/controller/volume/ephemeral/controller.go

Documentation: k8s.io/kubernetes/pkg/controller/volume/ephemeral

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package ephemeral
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"k8s.io/klog/v2"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	coreinformers "k8s.io/client-go/informers/core/v1"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/client-go/kubernetes/scheme"
    34  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    35  	corelisters "k8s.io/client-go/listers/core/v1"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/tools/record"
    38  	"k8s.io/client-go/util/workqueue"
    39  	"k8s.io/component-helpers/storage/ephemeral"
    40  	"k8s.io/kubernetes/pkg/controller/volume/common"
    41  	ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics"
    42  	"k8s.io/kubernetes/pkg/controller/volume/events"
    43  )
    44  
    45  // Controller creates PVCs for ephemeral inline volumes in a pod spec.
    46  type Controller interface {
    47  	Run(ctx context.Context, workers int)
    48  }
    49  
    50  type ephemeralController struct {
    51  	// kubeClient is the kube API client used by volumehost to communicate with
    52  	// the API server.
    53  	kubeClient clientset.Interface
    54  
    55  	// pvcLister is the shared PVC lister used to fetch and store PVC
    56  	// objects from the API server. It is shared with other controllers and
    57  	// therefore the PVC objects in its store should be treated as immutable.
    58  	pvcLister  corelisters.PersistentVolumeClaimLister
    59  	pvcsSynced cache.InformerSynced
    60  
    61  	// podLister is the shared Pod lister used to fetch Pod
    62  	// objects from the API server. It is shared with other controllers and
    63  	// therefore the Pod objects in its store should be treated as immutable.
    64  	podLister corelisters.PodLister
    65  	podSynced cache.InformerSynced
    66  
    67  	// podIndexer has the common PodPVC indexer indexer installed To
    68  	// limit iteration over pods to those of interest.
    69  	podIndexer cache.Indexer
    70  
    71  	// recorder is used to record events in the API server
    72  	recorder record.EventRecorder
    73  
    74  	queue workqueue.RateLimitingInterface
    75  }
    76  
    77  // NewController creates an ephemeral volume controller.
    78  func NewController(
    79  	ctx context.Context,
    80  	kubeClient clientset.Interface,
    81  	podInformer coreinformers.PodInformer,
    82  	pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) {
    83  
    84  	ec := &ephemeralController{
    85  		kubeClient: kubeClient,
    86  		podLister:  podInformer.Lister(),
    87  		podIndexer: podInformer.Informer().GetIndexer(),
    88  		podSynced:  podInformer.Informer().HasSynced,
    89  		pvcLister:  pvcInformer.Lister(),
    90  		pvcsSynced: pvcInformer.Informer().HasSynced,
    91  		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"),
    92  	}
    93  
    94  	ephemeralvolumemetrics.RegisterMetrics()
    95  
    96  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    97  	eventBroadcaster.StartLogging(klog.Infof)
    98  	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    99  	ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"})
   100  
   101  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   102  		AddFunc: ec.enqueuePod,
   103  		// The pod spec is immutable. Therefore the controller can ignore pod updates
   104  		// because there cannot be any changes that have to be copied into the generated
   105  		// PVC.
   106  		// Deletion of the PVC is handled through the owner reference and garbage collection.
   107  		// Therefore pod deletions also can be ignored.
   108  	})
   109  	pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   110  		DeleteFunc: ec.onPVCDelete,
   111  	})
   112  	if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil {
   113  		return nil, fmt.Errorf("could not initialize ephemeral volume controller: %w", err)
   114  	}
   115  
   116  	return ec, nil
   117  }
   118  
   119  func (ec *ephemeralController) enqueuePod(obj interface{}) {
   120  	pod, ok := obj.(*v1.Pod)
   121  	if !ok {
   122  		return
   123  	}
   124  
   125  	// Ignore pods which are already getting deleted.
   126  	if pod.DeletionTimestamp != nil {
   127  		return
   128  	}
   129  
   130  	for _, vol := range pod.Spec.Volumes {
   131  		if vol.Ephemeral != nil {
   132  			// It has at least one ephemeral inline volume, work on it.
   133  			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod)
   134  			if err != nil {
   135  				runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err))
   136  				return
   137  			}
   138  			ec.queue.Add(key)
   139  			break
   140  		}
   141  	}
   142  }
   143  
   144  func (ec *ephemeralController) onPVCDelete(obj interface{}) {
   145  	pvc, ok := obj.(*v1.PersistentVolumeClaim)
   146  	if !ok {
   147  		return
   148  	}
   149  
   150  	// Someone deleted a PVC, either intentionally or
   151  	// accidentally. If there is a pod referencing it because of
   152  	// an ephemeral volume, then we should re-create the PVC.
   153  	// The common indexer does some prefiltering for us by
   154  	// limiting the list to those pods which reference
   155  	// the PVC.
   156  	objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
   157  	if err != nil {
   158  		runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
   159  		return
   160  	}
   161  	for _, obj := range objs {
   162  		ec.enqueuePod(obj)
   163  	}
   164  }
   165  
   166  func (ec *ephemeralController) Run(ctx context.Context, workers int) {
   167  	defer runtime.HandleCrash()
   168  	defer ec.queue.ShutDown()
   169  	logger := klog.FromContext(ctx)
   170  	logger.Info("Starting ephemeral volume controller")
   171  	defer logger.Info("Shutting down ephemeral volume controller")
   172  
   173  	if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) {
   174  		return
   175  	}
   176  
   177  	for i := 0; i < workers; i++ {
   178  		go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
   179  	}
   180  
   181  	<-ctx.Done()
   182  }
   183  
   184  func (ec *ephemeralController) runWorker(ctx context.Context) {
   185  	for ec.processNextWorkItem(ctx) {
   186  	}
   187  }
   188  
   189  func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool {
   190  	key, shutdown := ec.queue.Get()
   191  	if shutdown {
   192  		return false
   193  	}
   194  	defer ec.queue.Done(key)
   195  
   196  	err := ec.syncHandler(ctx, key.(string))
   197  	if err == nil {
   198  		ec.queue.Forget(key)
   199  		return true
   200  	}
   201  
   202  	runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   203  	ec.queue.AddRateLimited(key)
   204  
   205  	return true
   206  }
   207  
   208  // syncHandler is invoked for each pod which might need to be processed.
   209  // If an error is returned from this function, the pod will be requeued.
   210  func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error {
   211  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   212  	if err != nil {
   213  		return err
   214  	}
   215  	pod, err := ec.podLister.Pods(namespace).Get(name)
   216  	logger := klog.FromContext(ctx)
   217  	if err != nil {
   218  		if errors.IsNotFound(err) {
   219  			logger.V(5).Info("Ephemeral: nothing to do for pod, it is gone", "podKey", key)
   220  			return nil
   221  		}
   222  		logger.V(5).Info("Error getting pod from informer", "pod", klog.KObj(pod), "podUID", pod.UID, "err", err)
   223  		return err
   224  	}
   225  
   226  	// Ignore pods which are already getting deleted.
   227  	if pod.DeletionTimestamp != nil {
   228  		logger.V(5).Info("Ephemeral: nothing to do for pod, it is marked for deletion", "podKey", key)
   229  		return nil
   230  	}
   231  
   232  	for _, vol := range pod.Spec.Volumes {
   233  		if err := ec.handleVolume(ctx, pod, vol); err != nil {
   234  			ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err))
   235  			return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err)
   236  		}
   237  	}
   238  
   239  	return nil
   240  }
   241  
   242  // handleEphemeralVolume is invoked for each volume of a pod.
   243  func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error {
   244  	logger := klog.FromContext(ctx)
   245  	logger.V(5).Info("Ephemeral: checking volume", "volumeName", vol.Name)
   246  	if vol.Ephemeral == nil {
   247  		return nil
   248  	}
   249  
   250  	pvcName := ephemeral.VolumeClaimName(pod, &vol)
   251  	pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
   252  	if err != nil && !errors.IsNotFound(err) {
   253  		return err
   254  	}
   255  	if pvc != nil {
   256  		if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
   257  			return err
   258  		}
   259  		// Already created, nothing more to do.
   260  		logger.V(5).Info("Ephemeral: PVC already created", "volumeName", vol.Name, "PVC", klog.KObj(pvc))
   261  		return nil
   262  	}
   263  
   264  	// Create the PVC with pod as owner.
   265  	isTrue := true
   266  	pvc = &v1.PersistentVolumeClaim{
   267  		ObjectMeta: metav1.ObjectMeta{
   268  			Name: pvcName,
   269  			OwnerReferences: []metav1.OwnerReference{
   270  				{
   271  					APIVersion:         "v1",
   272  					Kind:               "Pod",
   273  					Name:               pod.Name,
   274  					UID:                pod.UID,
   275  					Controller:         &isTrue,
   276  					BlockOwnerDeletion: &isTrue,
   277  				},
   278  			},
   279  			Annotations: vol.Ephemeral.VolumeClaimTemplate.Annotations,
   280  			Labels:      vol.Ephemeral.VolumeClaimTemplate.Labels,
   281  		},
   282  		Spec: vol.Ephemeral.VolumeClaimTemplate.Spec,
   283  	}
   284  	ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc()
   285  	_, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   286  	if err != nil {
   287  		ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc()
   288  		return fmt.Errorf("create PVC %s: %v", pvcName, err)
   289  	}
   290  	return nil
   291  }
   292  

View as plain text