    17  package ephemeral
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    24  	"k8s.io/klog/v2"
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	coreinformers "k8s.io/client-go/informers/core/v1"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/client-go/kubernetes/scheme"
    34  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    35  	corelisters "k8s.io/client-go/listers/core/v1"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/tools/record"
    38  	"k8s.io/client-go/util/workqueue"
    39  	"k8s.io/component-helpers/storage/ephemeral"
    40  	"k8s.io/kubernetes/pkg/controller/volume/common"
    41  	ephemeralvolumemetrics "k8s.io/kubernetes/pkg/controller/volume/ephemeral/metrics"
    42  	"k8s.io/kubernetes/pkg/controller/volume/events"
    43  )
    45  // Controller creates PVCs for ephemeral inline volumes in a pod spec.
    46  type Controller interface {
    47  	Run(ctx context.Context, workers int)
    48  }
    50  type ephemeralController struct {
    51  	// kubeClient is the kube API client used by volumehost to communicate with
    52  	// the API server.
    53  	kubeClient clientset.Interface
    55  	// pvcLister is the shared PVC lister used to fetch and store PVC
    56  	// objects from the API server. It is shared with other controllers and
    57  	// therefore the PVC objects in its store should be treated as immutable.
    58  	pvcLister  corelisters.PersistentVolumeClaimLister
    59  	pvcsSynced cache.InformerSynced
    61  	// podLister is the shared Pod lister used to fetch Pod
    62  	// objects from the API server. It is shared with other controllers and
    63  	// therefore the Pod objects in its store should be treated as immutable.
    64  	podLister corelisters.PodLister
    65  	podSynced cache.InformerSynced
    67  	// podIndexer has the common PodPVC indexer indexer installed To
    68  	// limit iteration over pods to those of interest.
    69  	podIndexer cache.Indexer
    71  	// recorder is used to record events in the API server
    72  	recorder record.EventRecorder
    74  	queue workqueue.RateLimitingInterface
    75  }
    77  // NewController creates an ephemeral volume controller.
    78  func NewController(
    79  	ctx context.Context,
    80  	kubeClient clientset.Interface,
    81  	podInformer coreinformers.PodInformer,
    82  	pvcInformer coreinformers.PersistentVolumeClaimInformer) (Controller, error) {
    84  	ec := &ephemeralController{
    85  		kubeClient: kubeClient,
    86  		podLister:  podInformer.Lister(),
    87  		podIndexer: podInformer.Informer().GetIndexer(),
    88  		podSynced:  podInformer.Informer().HasSynced,
    89  		pvcLister:  pvcInformer.Lister(),
    90  		pvcsSynced: pvcInformer.Informer().HasSynced,
    91  		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ephemeral_volume"),
    92  	}
    94  	ephemeralvolumemetrics.RegisterMetrics()
    96  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    97  	eventBroadcaster.StartLogging(klog.Infof)
    98  	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    99  	ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ephemeral_volume"})
   101  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   102  		AddFunc: ec.enqueuePod,
   103  		// The pod spec is immutable. Therefore the controller can ignore pod updates
   104  		// because there cannot be any changes that have to be copied into the generated
   105  		// PVC.
   106  		// Deletion of the PVC is handled through the owner reference and garbage collection.
   107  		// Therefore pod deletions also can be ignored.
   108  	})
   109  	pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   110  		DeleteFunc: ec.onPVCDelete,
   111  	})
   112  	if err := common.AddPodPVCIndexerIfNotPresent(ec.podIndexer); err != nil {
   113  		return nil, fmt.Errorf("could not initialize ephemeral volume controller: %w", err)
   114  	}
   116  	return ec, nil
   117  }
   119  func (ec *ephemeralController) enqueuePod(obj interface{}) {
   120  	pod, ok := obj.(*v1.Pod)
   121  	if !ok {
   122  		return
   123  	}
   125  	// Ignore pods which are already getting deleted.
   126  	if pod.DeletionTimestamp != nil {
   127  		return
   128  	}
   130  	for _, vol := range pod.Spec.Volumes {
   131  		if vol.Ephemeral != nil {
   132  			// It has at least one ephemeral inline volume, work on it.
   133  			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod)
   134  			if err != nil {
   135  				runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", pod, err))
   136  				return
   137  			}
   138  			ec.queue.Add(key)
   139  			break
   140  		}
   141  	}
   142  }
   144  func (ec *ephemeralController) onPVCDelete(obj interface{}) {
   145  	pvc, ok := obj.(*v1.PersistentVolumeClaim)
   146  	if !ok {
   147  		return
   148  	}
   150  	// Someone deleted a PVC, either intentionally or
   151  	// accidentally. If there is a pod referencing it because of
   152  	// an ephemeral volume, then we should re-create the PVC.
   153  	// The common indexer does some prefiltering for us by
   154  	// limiting the list to those pods which reference
   155  	// the PVC.
   156  	objs, err := ec.podIndexer.ByIndex(common.PodPVCIndex, fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name))
   157  	if err != nil {
   158  		runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
   159  		return
   160  	}
   161  	for _, obj := range objs {
   162  		ec.enqueuePod(obj)
   163  	}
   164  }
   166  func (ec *ephemeralController) Run(ctx context.Context, workers int) {
   167  	defer runtime.HandleCrash()
   168  	defer ec.queue.ShutDown()
   169  	logger := klog.FromContext(ctx)
   170  	logger.Info("Starting ephemeral volume controller")
   171  	defer logger.Info("Shutting down ephemeral volume controller")
   173  	if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.pvcsSynced) {
   174  		return
   175  	}
   177  	for i := 0; i < workers; i++ {
   178  		go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
   179  	}
   181  	<-ctx.Done()
   182  }
   184  func (ec *ephemeralController) runWorker(ctx context.Context) {
   185  	for ec.processNextWorkItem(ctx) {
   186  	}
   187  }
   189  func (ec *ephemeralController) processNextWorkItem(ctx context.Context) bool {
   190  	key, shutdown := ec.queue.Get()
   191  	if shutdown {
   192  		return false
   193  	}
   194  	defer ec.queue.Done(key)
   196  	err := ec.syncHandler(ctx, key.(string))
   197  	if err == nil {
   198  		ec.queue.Forget(key)
   199  		return true
   200  	}
   202  	runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   203  	ec.queue.AddRateLimited(key)
   205  	return true
   206  }
   208  // syncHandler is invoked for each pod which might need to be processed.
   209  // If an error is returned from this function, the pod will be requeued.
   210  func (ec *ephemeralController) syncHandler(ctx context.Context, key string) error {
   211  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   212  	if err != nil {
   213  		return err
   214  	}
   215  	pod, err := ec.podLister.Pods(namespace).Get(name)
   216  	logger := klog.FromContext(ctx)
   217  	if err != nil {
   218  		if errors.IsNotFound(err) {
   219  			logger.V(5).Info("Ephemeral: nothing to do for pod, it is gone", "podKey", key)
   220  			return nil
   221  		}
   222  		logger.V(5).Info("Error getting pod from informer", "pod", klog.KObj(pod), "podUID", pod.UID, "err", err)
   223  		return err
   224  	}
   226  	// Ignore pods which are already getting deleted.
   227  	if pod.DeletionTimestamp != nil {
   228  		logger.V(5).Info("Ephemeral: nothing to do for pod, it is marked for deletion", "podKey", key)
   229  		return nil
   230  	}
   232  	for _, vol := range pod.Spec.Volumes {
   233  		if err := ec.handleVolume(ctx, pod, vol); err != nil {
   234  			ec.recorder.Event(pod, v1.EventTypeWarning, events.FailedBinding, fmt.Sprintf("ephemeral volume %s: %v", vol.Name, err))
   235  			return fmt.Errorf("pod %s, ephemeral volume %s: %v", key, vol.Name, err)
   236  		}
   237  	}
   239  	return nil
   240  }
   242  // handleEphemeralVolume is invoked for each volume of a pod.
   243  func (ec *ephemeralController) handleVolume(ctx context.Context, pod *v1.Pod, vol v1.Volume) error {
   244  	logger := klog.FromContext(ctx)
   245  	logger.V(5).Info("Ephemeral: checking volume", "volumeName", vol.Name)
   246  	if vol.Ephemeral == nil {
   247  		return nil
   248  	}
   250  	pvcName := ephemeral.VolumeClaimName(pod, &vol)
   251  	pvc, err := ec.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
   252  	if err != nil && !errors.IsNotFound(err) {
   253  		return err
   254  	}
   255  	if pvc != nil {
   256  		if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
   257  			return err
   258  		}
   259  		// Already created, nothing more to do.
   260  		logger.V(5).Info("Ephemeral: PVC already created", "volumeName", vol.Name, "PVC", klog.KObj(pvc))
   261  		return nil
   262  	}
   264  	// Create the PVC with pod as owner.
   265  	isTrue := true
   266  	pvc = &v1.PersistentVolumeClaim{
   267  		ObjectMeta: metav1.ObjectMeta{
   268  			Name: pvcName,
   269  			OwnerReferences: []metav1.OwnerReference{
   270  				{
   271  					APIVersion:         "v1",
   272  					Kind:               "Pod",
   273  					Name:               pod.Name,
   274  					UID:                pod.UID,
   275  					Controller:         &isTrue,
   276  					BlockOwnerDeletion: &isTrue,
   277  				},
   278  			},
   279  			Annotations: vol.Ephemeral.VolumeClaimTemplate.Annotations,
   280  			Labels:      vol.Ephemeral.VolumeClaimTemplate.Labels,
   281  		},
   282  		Spec: vol.Ephemeral.VolumeClaimTemplate.Spec,
   283  	}
   284  	ephemeralvolumemetrics.EphemeralVolumeCreateAttempts.Inc()
   285  	_, err = ec.kubeClient.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   286  	if err != nil {
   287  		ephemeralvolumemetrics.EphemeralVolumeCreateFailures.Inc()
   288  		return fmt.Errorf("create PVC %s: %v", pvcName, err)
   289  	}
   290  	return nil
   291  }

