1
16
17 package podnodeselector
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "reflect"
24
25 "k8s.io/klog/v2"
26
27 corev1 "k8s.io/api/core/v1"
28 "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/util/yaml"
32 "k8s.io/apiserver/pkg/admission"
33 genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
34 "k8s.io/client-go/informers"
35 "k8s.io/client-go/kubernetes"
36 corev1listers "k8s.io/client-go/listers/core/v1"
37 api "k8s.io/kubernetes/pkg/apis/core"
38 )
39
40
41
42
43 var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
44
45
46 const PluginName = "PodNodeSelector"
47
48
49 func Register(plugins *admission.Plugins) {
50 plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
51
52 pluginConfig := readConfig(config)
53 plugin := NewPodNodeSelector(pluginConfig.PodNodeSelectorPluginConfig)
54 return plugin, nil
55 })
56 }
57
58
59 type Plugin struct {
60 *admission.Handler
61 client kubernetes.Interface
62 namespaceLister corev1listers.NamespaceLister
63
64 clusterNodeSelectors map[string]string
65 }
66
67 var _ = genericadmissioninitializer.WantsExternalKubeClientSet(&Plugin{})
68 var _ = genericadmissioninitializer.WantsExternalKubeInformerFactory(&Plugin{})
69
70 type pluginConfig struct {
71 PodNodeSelectorPluginConfig map[string]string
72 }
73
74
75
76
77
78
79
80
81
82
83 func readConfig(config io.Reader) *pluginConfig {
84 defaultConfig := &pluginConfig{}
85 if config == nil || reflect.ValueOf(config).IsNil() {
86 return defaultConfig
87 }
88 d := yaml.NewYAMLOrJSONDecoder(config, 4096)
89 for {
90 if err := d.Decode(defaultConfig); err != nil {
91 if err != io.EOF {
92 continue
93 }
94 }
95 break
96 }
97 return defaultConfig
98 }
99
100
101 func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
102 if shouldIgnore(a) {
103 return nil
104 }
105 if !p.WaitForReady() {
106 return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
107 }
108
109 resource := a.GetResource().GroupResource()
110 pod := a.GetObject().(*api.Pod)
111 namespaceNodeSelector, err := p.getNamespaceNodeSelectorMap(a.GetNamespace())
112 if err != nil {
113 return err
114 }
115
116 if labels.Conflicts(namespaceNodeSelector, labels.Set(pod.Spec.NodeSelector)) {
117 return errors.NewForbidden(resource, pod.Name, fmt.Errorf("pod node label selector conflicts with its namespace node label selector"))
118 }
119
120
121
122 podNodeSelectorLabels := labels.Merge(namespaceNodeSelector, pod.Spec.NodeSelector)
123 pod.Spec.NodeSelector = map[string]string(podNodeSelectorLabels)
124 return p.Validate(ctx, a, o)
125 }
126
127
128 func (p *Plugin) Validate(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
129 if shouldIgnore(a) {
130 return nil
131 }
132 if !p.WaitForReady() {
133 return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
134 }
135
136 resource := a.GetResource().GroupResource()
137 pod := a.GetObject().(*api.Pod)
138
139 namespaceNodeSelector, err := p.getNamespaceNodeSelectorMap(a.GetNamespace())
140 if err != nil {
141 return err
142 }
143 if labels.Conflicts(namespaceNodeSelector, labels.Set(pod.Spec.NodeSelector)) {
144 return errors.NewForbidden(resource, pod.Name, fmt.Errorf("pod node label selector conflicts with its namespace node label selector"))
145 }
146
147
148 whitelist, err := labels.ConvertSelectorToLabelsMap(p.clusterNodeSelectors[a.GetNamespace()])
149 if err != nil {
150 return err
151 }
152 if !isSubset(pod.Spec.NodeSelector, whitelist) {
153 return errors.NewForbidden(resource, pod.Name, fmt.Errorf("pod node label selector labels conflict with its namespace whitelist"))
154 }
155
156 return nil
157 }
158
159 func (p *Plugin) getNamespaceNodeSelectorMap(namespaceName string) (labels.Set, error) {
160 namespace, err := p.namespaceLister.Get(namespaceName)
161 if errors.IsNotFound(err) {
162 namespace, err = p.defaultGetNamespace(namespaceName)
163 if err != nil {
164 if errors.IsNotFound(err) {
165 return nil, err
166 }
167 return nil, errors.NewInternalError(err)
168 }
169 } else if err != nil {
170 return nil, errors.NewInternalError(err)
171 }
172
173 return p.getNodeSelectorMap(namespace)
174 }
175
176 func shouldIgnore(a admission.Attributes) bool {
177 resource := a.GetResource().GroupResource()
178 if resource != api.Resource("pods") {
179 return true
180 }
181 if a.GetSubresource() != "" {
182
183 return true
184 }
185
186 _, ok := a.GetObject().(*api.Pod)
187 if !ok {
188 klog.Errorf("expected pod but got %s", a.GetKind().Kind)
189 return true
190 }
191
192 return false
193 }
194
195
196 func NewPodNodeSelector(clusterNodeSelectors map[string]string) *Plugin {
197 return &Plugin{
198 Handler: admission.NewHandler(admission.Create),
199 clusterNodeSelectors: clusterNodeSelectors,
200 }
201 }
202
203
204 func (p *Plugin) SetExternalKubeClientSet(client kubernetes.Interface) {
205 p.client = client
206 }
207
208
209 func (p *Plugin) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
210 namespaceInformer := f.Core().V1().Namespaces()
211 p.namespaceLister = namespaceInformer.Lister()
212 p.SetReadyFunc(namespaceInformer.Informer().HasSynced)
213 }
214
215
216 func (p *Plugin) ValidateInitialization() error {
217 if p.namespaceLister == nil {
218 return fmt.Errorf("missing namespaceLister")
219 }
220 if p.client == nil {
221 return fmt.Errorf("missing client")
222 }
223 return nil
224 }
225
226 func (p *Plugin) defaultGetNamespace(name string) (*corev1.Namespace, error) {
227 namespace, err := p.client.CoreV1().Namespaces().Get(context.TODO(), name, metav1.GetOptions{})
228 if err != nil {
229 return nil, fmt.Errorf("namespace %s does not exist", name)
230 }
231 return namespace, nil
232 }
233
234 func (p *Plugin) getNodeSelectorMap(namespace *corev1.Namespace) (labels.Set, error) {
235 selector := labels.Set{}
236 var err error
237 found := false
238 if len(namespace.ObjectMeta.Annotations) > 0 {
239 for _, annotation := range NamespaceNodeSelectors {
240 if ns, ok := namespace.ObjectMeta.Annotations[annotation]; ok {
241 labelsMap, err := labels.ConvertSelectorToLabelsMap(ns)
242 if err != nil {
243 return labels.Set{}, err
244 }
245
246 if labels.Conflicts(selector, labelsMap) {
247 nsName := namespace.ObjectMeta.Name
248 return labels.Set{}, fmt.Errorf("%s annotations' node label selectors conflict", nsName)
249 }
250 selector = labels.Merge(selector, labelsMap)
251 found = true
252 }
253 }
254 }
255 if !found {
256 selector, err = labels.ConvertSelectorToLabelsMap(p.clusterNodeSelectors["clusterDefaultNodeSelector"])
257 if err != nil {
258 return labels.Set{}, err
259 }
260 }
261 return selector, nil
262 }
263
264 func isSubset(subSet, superSet labels.Set) bool {
265 if len(superSet) == 0 {
266 return true
267 }
268
269 for k, v := range subSet {
270 value, ok := superSet[k]
271 if !ok {
272 return false
273 }
274 if value != v {
275 return false
276 }
277 }
278 return true
279 }
280
View as plain text