1
16
17 package label
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "io"
25 "sync"
26
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apiserver/pkg/admission"
30 cloudprovider "k8s.io/cloud-provider"
31 cloudvolume "k8s.io/cloud-provider/volume"
32 volumehelpers "k8s.io/cloud-provider/volume/helpers"
33 persistentvolume "k8s.io/component-helpers/storage/volume"
34 "k8s.io/klog/v2"
35 api "k8s.io/kubernetes/pkg/apis/core"
36 k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
37 kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
38 )
39
40 const (
41
42 PluginName = "PersistentVolumeLabel"
43 )
44
45
46 func Register(plugins *admission.Plugins) {
47 plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
48 persistentVolumeLabelAdmission := newPersistentVolumeLabel()
49 return persistentVolumeLabelAdmission, nil
50 })
51 }
52
53 var _ = admission.Interface(&persistentVolumeLabel{})
54
55 type persistentVolumeLabel struct {
56 *admission.Handler
57
58 mutex sync.Mutex
59 cloudConfig []byte
60 gcePVLabeler cloudprovider.PVLabeler
61 }
62
63 var _ admission.MutationInterface = &persistentVolumeLabel{}
64 var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
65
66
67
68
69
70 func newPersistentVolumeLabel() *persistentVolumeLabel {
71
72
73
74 klog.Warning("PersistentVolumeLabel admission controller is deprecated. " +
75 "Please remove this controller from your configuration files and scripts.")
76 return &persistentVolumeLabel{
77 Handler: admission.NewHandler(admission.Create),
78 }
79 }
80
81 func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
82 l.cloudConfig = cloudConfig
83 }
84
85 func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
86 for _, req := range reqs {
87 for _, term := range terms {
88 for _, r := range term.MatchExpressions {
89 if r.Key == req.Key {
90 return true
91 }
92 }
93 }
94 }
95 return false
96 }
97
98 func (l *persistentVolumeLabel) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
99 if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
100 return nil
101 }
102 obj := a.GetObject()
103 if obj == nil {
104 return nil
105 }
106 volume, ok := obj.(*api.PersistentVolume)
107 if !ok {
108 return nil
109 }
110
111 volumeLabels, err := l.findVolumeLabels(volume)
112 if err != nil {
113 return admission.NewForbidden(a, err)
114 }
115
116 requirements := make([]api.NodeSelectorRequirement, 0)
117 if len(volumeLabels) != 0 {
118 if volume.Labels == nil {
119 volume.Labels = make(map[string]string)
120 }
121 for k, v := range volumeLabels {
122
123
124
125 volume.Labels[k] = v
126
127
128 var values []string
129 if k == v1.LabelTopologyZone || k == v1.LabelFailureDomainBetaZone {
130 zones, err := volumehelpers.LabelZonesToSet(v)
131 if err != nil {
132 return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
133 }
134
135 values = zones.List()
136 } else {
137 values = []string{v}
138 }
139 requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
140 }
141
142 if volume.Spec.NodeAffinity == nil {
143 volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
144 }
145 if volume.Spec.NodeAffinity.Required == nil {
146 volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
147 }
148 if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
149
150 volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
151 }
152 if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
153 klog.V(4).Infof("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
154 requirements, volume.Spec.NodeAffinity)
155 } else {
156 for _, req := range requirements {
157 for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
158 volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
159 }
160 }
161 }
162 }
163
164 return nil
165 }
166
167 func (l *persistentVolumeLabel) findVolumeLabels(volume *api.PersistentVolume) (map[string]string, error) {
168 existingLabels := volume.Labels
169
170
171 topologyLabelGA := true
172 domain, domainOK := existingLabels[v1.LabelTopologyZone]
173 region, regionOK := existingLabels[v1.LabelTopologyRegion]
174
175
176 if !domainOK || !regionOK {
177 topologyLabelGA = false
178 domain, domainOK = existingLabels[v1.LabelFailureDomainBetaZone]
179 region, regionOK = existingLabels[v1.LabelFailureDomainBetaRegion]
180 }
181
182 isDynamicallyProvisioned := metav1.HasAnnotation(volume.ObjectMeta, persistentvolume.AnnDynamicallyProvisioned)
183 if isDynamicallyProvisioned && domainOK && regionOK {
184
185 if topologyLabelGA {
186 return map[string]string{
187 v1.LabelTopologyZone: domain,
188 v1.LabelTopologyRegion: region,
189 }, nil
190 }
191 return map[string]string{
192 v1.LabelFailureDomainBetaZone: domain,
193 v1.LabelFailureDomainBetaRegion: region,
194 }, nil
195
196 }
197
198
199 switch {
200 case volume.Spec.GCEPersistentDisk != nil:
201 labels, err := l.findGCEPDLabels(volume)
202 if err != nil {
203 return nil, fmt.Errorf("error querying GCE PD volume %s: %v", volume.Spec.GCEPersistentDisk.PDName, err)
204 }
205 return labels, nil
206 }
207
208 return nil, nil
209 }
210
211 func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
212
213 if volume.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
214 return nil, nil
215 }
216
217 pvlabler, err := l.getGCEPVLabeler()
218 if err != nil {
219 return nil, err
220 }
221 if pvlabler == nil {
222 return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
223 }
224
225 pv := &v1.PersistentVolume{}
226 err = k8s_api_v1.Convert_core_PersistentVolume_To_v1_PersistentVolume(volume, pv, nil)
227 if err != nil {
228 return nil, fmt.Errorf("failed to convert PersistentVolume to core/v1: %q", err)
229 }
230 return pvlabler.GetLabelsForVolume(context.TODO(), pv)
231 }
232
233
234 func (l *persistentVolumeLabel) getGCEPVLabeler() (cloudprovider.PVLabeler, error) {
235 l.mutex.Lock()
236 defer l.mutex.Unlock()
237
238 if l.gcePVLabeler == nil {
239 var cloudConfigReader io.Reader
240 if len(l.cloudConfig) > 0 {
241 cloudConfigReader = bytes.NewReader(l.cloudConfig)
242 }
243
244 cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
245 if err != nil || cloudProvider == nil {
246 return nil, err
247 }
248
249 gcePVLabeler, ok := cloudProvider.(cloudprovider.PVLabeler)
250 if !ok {
251 return nil, errors.New("GCE cloud provider does not implement PV labeling")
252 }
253
254 l.gcePVLabeler = gcePVLabeler
255
256 }
257 return l.gcePVLabeler, nil
258 }
259
View as plain text