    17  package label
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"sync"
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apiserver/pkg/admission"
    30  	cloudprovider "k8s.io/cloud-provider"
    31  	cloudvolume "k8s.io/cloud-provider/volume"
    32  	volumehelpers "k8s.io/cloud-provider/volume/helpers"
    33  	persistentvolume "k8s.io/component-helpers/storage/volume"
    34  	"k8s.io/klog/v2"
    35  	api "k8s.io/kubernetes/pkg/apis/core"
    36  	k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
    37  	kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
    38  )
    40  const (
    41  	// PluginName is the name of persistent volume label admission plugin
    42  	PluginName = "PersistentVolumeLabel"
    43  )
    45  // Register registers a plugin
    46  func Register(plugins *admission.Plugins) {
    47  	plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
    48  		persistentVolumeLabelAdmission := newPersistentVolumeLabel()
    49  		return persistentVolumeLabelAdmission, nil
    50  	})
    51  }
    53  var _ = admission.Interface(&persistentVolumeLabel{})
    55  type persistentVolumeLabel struct {
    56  	*admission.Handler
    58  	mutex        sync.Mutex
    59  	cloudConfig  []byte
    60  	gcePVLabeler cloudprovider.PVLabeler
    61  }
    63  var _ admission.MutationInterface = &persistentVolumeLabel{}
    64  var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
    66  // newPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
    67  // based on the labels provided by the underlying cloud provider.
    68  //
    69  // As a side effect, the cloud provider may block invalid or non-existent volumes.
    70  func newPersistentVolumeLabel() *persistentVolumeLabel {
    71  	// DEPRECATED: in a future release, we will use mutating admission webhooks to apply PV labels.
    72  	// Once the mutating admission webhook is used for GCE,
    73  	// this admission controller will be removed.
    74  	klog.Warning("PersistentVolumeLabel admission controller is deprecated. " +
    75  		"Please remove this controller from your configuration files and scripts.")
    76  	return &persistentVolumeLabel{
    77  		Handler: admission.NewHandler(admission.Create),
    78  	}
    79  }
    81  func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
    82  	l.cloudConfig = cloudConfig
    83  }
    85  func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
    86  	for _, req := range reqs {
    87  		for _, term := range terms {
    88  			for _, r := range term.MatchExpressions {
    89  				if r.Key == req.Key {
    90  					return true
    91  				}
    92  			}
    93  		}
    94  	}
    95  	return false
    96  }
    98  func (l *persistentVolumeLabel) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
    99  	if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
   100  		return nil
   101  	}
   102  	obj := a.GetObject()
   103  	if obj == nil {
   104  		return nil
   105  	}
   106  	volume, ok := obj.(*api.PersistentVolume)
   107  	if !ok {
   108  		return nil
   109  	}
   111  	volumeLabels, err := l.findVolumeLabels(volume)
   112  	if err != nil {
   113  		return admission.NewForbidden(a, err)
   114  	}
   116  	requirements := make([]api.NodeSelectorRequirement, 0)
   117  	if len(volumeLabels) != 0 {
   118  		if volume.Labels == nil {
   119  			volume.Labels = make(map[string]string)
   120  		}
   121  		for k, v := range volumeLabels {
   122  			// We (silently) replace labels if they are provided.
   123  			// This should be OK because they are in the kubernetes.io namespace
   124  			// i.e. we own them
   125  			volume.Labels[k] = v
   127  			// Set NodeSelectorRequirements based on the labels
   128  			var values []string
   129  			if k == v1.LabelTopologyZone || k == v1.LabelFailureDomainBetaZone {
   130  				zones, err := volumehelpers.LabelZonesToSet(v)
   131  				if err != nil {
   132  					return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
   133  				}
   134  				// zone values here are sorted for better testability.
   135  				values = zones.List()
   136  			} else {
   137  				values = []string{v}
   138  			}
   139  			requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
   140  		}
   142  		if volume.Spec.NodeAffinity == nil {
   143  			volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
   144  		}
   145  		if volume.Spec.NodeAffinity.Required == nil {
   146  			volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
   147  		}
   148  		if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
   149  			// Need at least one term pre-allocated whose MatchExpressions can be appended to
   150  			volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
   151  		}
   152  		if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
   153  			klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
   154  				requirements, volume.Spec.NodeAffinity)
   155  		} else {
   156  			for _, req := range requirements {
   157  				for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
   158  					volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
   159  				}
   160  			}
   161  		}
   162  	}
   164  	return nil
   165  }
   167  func (l *persistentVolumeLabel) findVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
   168  	existingLabels := volume.Labels
   170  	// All cloud providers set only these two labels.
   171  	topologyLabelGA := true
   172  	domain, domainOK := existingLabels[v1.LabelTopologyZone]
   173  	region, regionOK := existingLabels[v1.LabelTopologyRegion]
   174  	// If they don't have GA labels we should check for failuredomain beta labels
   175  	// TODO: remove this once all the cloud provider change to GA topology labels
   176  	if !domainOK || !regionOK {
   177  		topologyLabelGA = false
   178  		domain, domainOK = existingLabels[v1.LabelFailureDomainBetaZone]
   179  		region, regionOK = existingLabels[v1.LabelFailureDomainBetaRegion]
   180  	}
   182  	isDynamicallyProvisioned := metav1.HasAnnotation(volume.ObjectMeta, persistentvolume.AnnDynamicallyProvisioned)
   183  	if isDynamicallyProvisioned && domainOK && regionOK {
   184  		// PV already has all the labels and we can trust the dynamic provisioning that it provided correct values.
   185  		if topologyLabelGA {
   186  			return map[string]string{
   187  				v1.LabelTopologyZone:   domain,
   188  				v1.LabelTopologyRegion: region,
   189  			}, nil
   190  		}
   191  		return map[string]string{
   192  			v1.LabelFailureDomainBetaZone:   domain,
   193  			v1.LabelFailureDomainBetaRegion: region,
   194  		}, nil
   196  	}
   198  	// Either missing labels or we don't trust the user provided correct values.
   199  	switch {
   200  	case volume.Spec.GCEPersistentDisk != nil:
   201  		labels, err := l.findGCEPDLabels(volume)
   202  		if err != nil {
   203  			return nil, fmt.Errorf("error querying GCE PD volume %s: %v", volume.Spec.GCEPersistentDisk.PDName, err)
   204  		}
   205  		return labels, nil
   206  	}
   207  	// Unrecognized volume, do not add any labels
   208  	return nil, nil
   209  }
   211  func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
   212  	// Ignore any volumes that are being provisioned
   213  	if volume.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
   214  		return nil, nil
   215  	}
   217  	pvlabler, err := l.getGCEPVLabeler()
   218  	if err != nil {
   219  		return nil, err
   220  	}
   221  	if pvlabler == nil {
   222  		return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
   223  	}
   225  	pv := &v1.PersistentVolume{}
   226  	err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
   227  	if err != nil {
   228  		return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
   229  	}
   230  	return pvlabler.GetLabelsForVolume(context.TODO(), pv)
   231  }
   233  // getGCEPVLabeler returns the GCE implementation of PVLabeler
   234  func (l *persistentVolumeLabel) getGCEPVLabeler() (cloudprovider.PVLabeler, error) {
   235  	l.mutex.Lock()
   236  	defer l.mutex.Unlock()
   238  	if l.gcePVLabeler == nil {
   239  		var cloudConfigReader io.Reader
   240  		if len(l.cloudConfig) > 0 {
   241  			cloudConfigReader = bytes.NewReader(l.cloudConfig)
   242  		}
   244  		cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
   245  		if err != nil || cloudProvider == nil {
   246  			return nil, err
   247  		}
   249  		gcePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
   250  		if !ok {
   251  			return nil, errors.New("GCE cloud provider does not implement PV labeling")
   252  		}
   254  		l.gcePVLabeler = gcePVLabeler
   256  	}
   257  	return l.gcePVLabeler, nil
   258  }

