...

Source file src/k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/label/admission.go

Documentation: k8s.io/kubernetes/plugin/pkg/admission/storage/persistentvolume/label

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package label
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"sync"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apiserver/pkg/admission"
    30  	cloudprovider "k8s.io/cloud-provider"
    31  	cloudvolume "k8s.io/cloud-provider/volume"
    32  	volumehelpers "k8s.io/cloud-provider/volume/helpers"
    33  	persistentvolume "k8s.io/component-helpers/storage/volume"
    34  	"k8s.io/klog/v2"
    35  	api "k8s.io/kubernetes/pkg/apis/core"
    36  	k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
    37  	kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
    38  )
    39  
    40  const (
    41  	// PluginName is the name of persistent volume label admission plugin
    42  	PluginName = "PersistentVolumeLabel"
    43  )
    44  
    45  // Register registers a plugin
    46  func Register(plugins *admission.Plugins) {
    47  	plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
    48  		persistentVolumeLabelAdmission := newPersistentVolumeLabel()
    49  		return persistentVolumeLabelAdmission, nil
    50  	})
    51  }
    52  
    53  var _ = admission.Interface(&persistentVolumeLabel{})
    54  
    55  type persistentVolumeLabel struct {
    56  	*admission.Handler
    57  
    58  	mutex        sync.Mutex
    59  	cloudConfig  []byte
    60  	gcePVLabeler cloudprovider.PVLabeler
    61  }
    62  
    63  var _ admission.MutationInterface = &persistentVolumeLabel{}
    64  var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
    65  
    66  // newPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
    67  // based on the labels provided by the underlying cloud provider.
    68  //
    69  // As a side effect, the cloud provider may block invalid or non-existent volumes.
    70  func newPersistentVolumeLabel() *persistentVolumeLabel {
    71  	// DEPRECATED: in a future release, we will use mutating admission webhooks to apply PV labels.
    72  	// Once the mutating admission webhook is used for GCE,
    73  	// this admission controller will be removed.
    74  	klog.Warning("PersistentVolumeLabel admission controller is deprecated. " +
    75  		"Please remove this controller from your configuration files and scripts.")
    76  	return &persistentVolumeLabel{
    77  		Handler: admission.NewHandler(admission.Create),
    78  	}
    79  }
    80  
    81  func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
    82  	l.cloudConfig = cloudConfig
    83  }
    84  
    85  func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
    86  	for _, req := range reqs {
    87  		for _, term := range terms {
    88  			for _, r := range term.MatchExpressions {
    89  				if r.Key == req.Key {
    90  					return true
    91  				}
    92  			}
    93  		}
    94  	}
    95  	return false
    96  }
    97  
    98  func (l *persistentVolumeLabel) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
    99  	if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
   100  		return nil
   101  	}
   102  	obj := a.GetObject()
   103  	if obj == nil {
   104  		return nil
   105  	}
   106  	volume, ok := obj.(*api.PersistentVolume)
   107  	if !ok {
   108  		return nil
   109  	}
   110  
   111  	volumeLabels, err := l.findVolumeLabels(volume)
   112  	if err != nil {
   113  		return admission.NewForbidden(a, err)
   114  	}
   115  
   116  	requirements := make([]api.NodeSelectorRequirement, 0)
   117  	if len(volumeLabels) != 0 {
   118  		if volume.Labels == nil {
   119  			volume.Labels = make(map[string]string)
   120  		}
   121  		for k, v := range volumeLabels {
   122  			// We (silently) replace labels if they are provided.
   123  			// This should be OK because they are in the kubernetes.io namespace
   124  			// i.e. we own them
   125  			volume.Labels[k] = v
   126  
   127  			// Set NodeSelectorRequirements based on the labels
   128  			var values []string
   129  			if k == v1.LabelTopologyZone || k == v1.LabelFailureDomainBetaZone {
   130  				zones, err := volumehelpers.LabelZonesToSet(v)
   131  				if err != nil {
   132  					return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
   133  				}
   134  				// zone values here are sorted for better testability.
   135  				values = zones.List()
   136  			} else {
   137  				values = []string{v}
   138  			}
   139  			requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
   140  		}
   141  
   142  		if volume.Spec.NodeAffinity == nil {
   143  			volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
   144  		}
   145  		if volume.Spec.NodeAffinity.Required == nil {
   146  			volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
   147  		}
   148  		if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
   149  			// Need at least one term pre-allocated whose MatchExpressions can be appended to
   150  			volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
   151  		}
   152  		if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
   153  			klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
   154  				requirements, volume.Spec.NodeAffinity)
   155  		} else {
   156  			for _, req := range requirements {
   157  				for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
   158  					volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
   159  				}
   160  			}
   161  		}
   162  	}
   163  
   164  	return nil
   165  }
   166  
   167  func (l *persistentVolumeLabel) findVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
   168  	existingLabels := volume.Labels
   169  
   170  	// All cloud providers set only these two labels.
   171  	topologyLabelGA := true
   172  	domain, domainOK := existingLabels[v1.LabelTopologyZone]
   173  	region, regionOK := existingLabels[v1.LabelTopologyRegion]
   174  	// If they don't have GA labels we should check for failuredomain beta labels
   175  	// TODO: remove this once all the cloud provider change to GA topology labels
   176  	if !domainOK || !regionOK {
   177  		topologyLabelGA = false
   178  		domain, domainOK = existingLabels[v1.LabelFailureDomainBetaZone]
   179  		region, regionOK = existingLabels[v1.LabelFailureDomainBetaRegion]
   180  	}
   181  
   182  	isDynamicallyProvisioned := metav1.HasAnnotation(volume.ObjectMeta, persistentvolume.AnnDynamicallyProvisioned)
   183  	if isDynamicallyProvisioned && domainOK && regionOK {
   184  		// PV already has all the labels and we can trust the dynamic provisioning that it provided correct values.
   185  		if topologyLabelGA {
   186  			return map[string]string{
   187  				v1.LabelTopologyZone:   domain,
   188  				v1.LabelTopologyRegion: region,
   189  			}, nil
   190  		}
   191  		return map[string]string{
   192  			v1.LabelFailureDomainBetaZone:   domain,
   193  			v1.LabelFailureDomainBetaRegion: region,
   194  		}, nil
   195  
   196  	}
   197  
   198  	// Either missing labels or we don't trust the user provided correct values.
   199  	switch {
   200  	case volume.Spec.GCEPersistentDisk != nil:
   201  		labels, err := l.findGCEPDLabels(volume)
   202  		if err != nil {
   203  			return nil, fmt.Errorf("error querying GCE PD volume %s: %v", volume.Spec.GCEPersistentDisk.PDName, err)
   204  		}
   205  		return labels, nil
   206  	}
   207  	// Unrecognized volume, do not add any labels
   208  	return nil, nil
   209  }
   210  
   211  func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
   212  	// Ignore any volumes that are being provisioned
   213  	if volume.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
   214  		return nil, nil
   215  	}
   216  
   217  	pvlabler, err := l.getGCEPVLabeler()
   218  	if err != nil {
   219  		return nil, err
   220  	}
   221  	if pvlabler == nil {
   222  		return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
   223  	}
   224  
   225  	pv := &v1.PersistentVolume{}
   226  	err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
   227  	if err != nil {
   228  		return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
   229  	}
   230  	return pvlabler.GetLabelsForVolume(context.TODO(), pv)
   231  }
   232  
   233  // getGCEPVLabeler returns the GCE implementation of PVLabeler
   234  func (l *persistentVolumeLabel) getGCEPVLabeler() (cloudprovider.PVLabeler, error) {
   235  	l.mutex.Lock()
   236  	defer l.mutex.Unlock()
   237  
   238  	if l.gcePVLabeler == nil {
   239  		var cloudConfigReader io.Reader
   240  		if len(l.cloudConfig) > 0 {
   241  			cloudConfigReader = bytes.NewReader(l.cloudConfig)
   242  		}
   243  
   244  		cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
   245  		if err != nil || cloudProvider == nil {
   246  			return nil, err
   247  		}
   248  
   249  		gcePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
   250  		if !ok {
   251  			return nil, errors.New("GCE cloud provider does not implement PV labeling")
   252  		}
   253  
   254  		l.gcePVLabeler = gcePVLabeler
   255  
   256  	}
   257  	return l.gcePVLabeler, nil
   258  }
   259  

View as plain text