1
16
17 package clusterauthenticationtrust
18
19 import (
20 "bytes"
21 "context"
22 "crypto/x509"
23 "encoding/json"
24 "encoding/pem"
25 "fmt"
26 "reflect"
27 "strings"
28 "time"
29
30 corev1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/equality"
32 apierrors "k8s.io/apimachinery/pkg/api/errors"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/apiserver/pkg/authentication/request/headerrequest"
38 "k8s.io/apiserver/pkg/server/dynamiccertificates"
39 corev1informers "k8s.io/client-go/informers/core/v1"
40 "k8s.io/client-go/kubernetes"
41 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
42 corev1listers "k8s.io/client-go/listers/core/v1"
43 "k8s.io/client-go/tools/cache"
44 "k8s.io/client-go/util/cert"
45 "k8s.io/client-go/util/workqueue"
46 "k8s.io/klog/v2"
47 )
48
49 const (
50 configMapNamespace = "kube-system"
51 configMapName = "extension-apiserver-authentication"
52 )
53
54
55 type Controller struct {
56 requiredAuthenticationData ClusterAuthenticationInfo
57
58 configMapLister corev1listers.ConfigMapLister
59 configMapClient corev1client.ConfigMapsGetter
60 namespaceClient corev1client.NamespacesGetter
61
62
63
64 queue workqueue.RateLimitingInterface
65
66
67 kubeSystemConfigMapInformer cache.SharedIndexInformer
68
69
70 preRunCaches []cache.InformerSynced
71 }
72
73
74 type ClusterAuthenticationInfo struct {
75
76 ClientCA dynamiccertificates.CAContentProvider
77
78
79 RequestHeaderUsernameHeaders headerrequest.StringSliceProvider
80
81 RequestHeaderGroupHeaders headerrequest.StringSliceProvider
82
83 RequestHeaderExtraHeaderPrefixes headerrequest.StringSliceProvider
84
85 RequestHeaderAllowedNames headerrequest.StringSliceProvider
86
87 RequestHeaderCA dynamiccertificates.CAContentProvider
88 }
89
90
91
92 func NewClusterAuthenticationTrustController(requiredAuthenticationData ClusterAuthenticationInfo, kubeClient kubernetes.Interface) *Controller {
93
94 kubeSystemConfigMapInformer := corev1informers.NewConfigMapInformer(kubeClient, configMapNamespace, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
95
96 c := &Controller{
97 requiredAuthenticationData: requiredAuthenticationData,
98 configMapLister: corev1listers.NewConfigMapLister(kubeSystemConfigMapInformer.GetIndexer()),
99 configMapClient: kubeClient.CoreV1(),
100 namespaceClient: kubeClient.CoreV1(),
101 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cluster_authentication_trust_controller"),
102 preRunCaches: []cache.InformerSynced{kubeSystemConfigMapInformer.HasSynced},
103 kubeSystemConfigMapInformer: kubeSystemConfigMapInformer,
104 }
105
106 kubeSystemConfigMapInformer.AddEventHandler(cache.FilteringResourceEventHandler{
107 FilterFunc: func(obj interface{}) bool {
108 if cast, ok := obj.(*corev1.ConfigMap); ok {
109 return cast.Namespace == configMapNamespace && cast.Name == configMapName
110 }
111 if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
112 if cast, ok := tombstone.Obj.(*corev1.ConfigMap); ok {
113 return cast.Namespace == configMapNamespace && cast.Name == configMapName
114 }
115 }
116 return true
117 },
118 Handler: cache.ResourceEventHandlerFuncs{
119
120
121 AddFunc: func(obj interface{}) {
122 c.queue.Add(keyFn())
123 },
124 UpdateFunc: func(oldObj, newObj interface{}) {
125 c.queue.Add(keyFn())
126 },
127 DeleteFunc: func(obj interface{}) {
128 c.queue.Add(keyFn())
129 },
130 },
131 })
132
133 return c
134 }
135
136 func (c *Controller) syncConfigMap() error {
137 originalAuthConfigMap, err := c.configMapLister.ConfigMaps(configMapNamespace).Get(configMapName)
138 if apierrors.IsNotFound(err) {
139 originalAuthConfigMap = &corev1.ConfigMap{
140 ObjectMeta: metav1.ObjectMeta{Namespace: configMapNamespace, Name: configMapName},
141 }
142 } else if err != nil {
143 return err
144 }
145
146 authConfigMap := originalAuthConfigMap.DeepCopy()
147
148 existingAuthenticationInfo, err := getClusterAuthenticationInfoFor(originalAuthConfigMap.Data)
149 if err != nil {
150 return err
151 }
152 combinedInfo, err := combinedClusterAuthenticationInfo(existingAuthenticationInfo, c.requiredAuthenticationData)
153 if err != nil {
154 return err
155 }
156 authConfigMap.Data, err = getConfigMapDataFor(combinedInfo)
157 if err != nil {
158 return err
159 }
160
161 if equality.Semantic.DeepEqual(authConfigMap, originalAuthConfigMap) {
162 klog.V(5).Info("no changes to configmap")
163 return nil
164 }
165 klog.V(2).Infof("writing updated authentication info to %s configmaps/%s", configMapNamespace, configMapName)
166
167 if err := createNamespaceIfNeeded(c.namespaceClient, authConfigMap.Namespace); err != nil {
168 return err
169 }
170 if err := writeConfigMap(c.configMapClient, authConfigMap); err != nil {
171 return err
172 }
173
174 return nil
175 }
176
177 func createNamespaceIfNeeded(nsClient corev1client.NamespacesGetter, ns string) error {
178 if _, err := nsClient.Namespaces().Get(context.TODO(), ns, metav1.GetOptions{}); err == nil {
179
180 return nil
181 }
182 newNs := &corev1.Namespace{
183 ObjectMeta: metav1.ObjectMeta{
184 Name: ns,
185 Namespace: "",
186 },
187 }
188 _, err := nsClient.Namespaces().Create(context.TODO(), newNs, metav1.CreateOptions{})
189 if err != nil && apierrors.IsAlreadyExists(err) {
190 err = nil
191 }
192 return err
193 }
194
195 func writeConfigMap(configMapClient corev1client.ConfigMapsGetter, required *corev1.ConfigMap) error {
196 _, err := configMapClient.ConfigMaps(required.Namespace).Update(context.TODO(), required, metav1.UpdateOptions{})
197 if apierrors.IsNotFound(err) {
198 _, err := configMapClient.ConfigMaps(required.Namespace).Create(context.TODO(), required, metav1.CreateOptions{})
199 return err
200 }
201
202
203
204
205
206
207 if apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) {
208 if deleteErr := configMapClient.ConfigMaps(required.Namespace).Delete(context.TODO(), required.Name, metav1.DeleteOptions{}); deleteErr != nil {
209 return deleteErr
210 }
211 return err
212 }
213
214 return err
215 }
216
217
218 func combinedClusterAuthenticationInfo(lhs, rhs ClusterAuthenticationInfo) (ClusterAuthenticationInfo, error) {
219 ret := ClusterAuthenticationInfo{
220 RequestHeaderAllowedNames: combineUniqueStringSlices(lhs.RequestHeaderAllowedNames, rhs.RequestHeaderAllowedNames),
221 RequestHeaderExtraHeaderPrefixes: combineUniqueStringSlices(lhs.RequestHeaderExtraHeaderPrefixes, rhs.RequestHeaderExtraHeaderPrefixes),
222 RequestHeaderGroupHeaders: combineUniqueStringSlices(lhs.RequestHeaderGroupHeaders, rhs.RequestHeaderGroupHeaders),
223 RequestHeaderUsernameHeaders: combineUniqueStringSlices(lhs.RequestHeaderUsernameHeaders, rhs.RequestHeaderUsernameHeaders),
224 }
225
226 var err error
227 ret.ClientCA, err = combineCertLists(lhs.ClientCA, rhs.ClientCA)
228 if err != nil {
229 return ClusterAuthenticationInfo{}, err
230 }
231 ret.RequestHeaderCA, err = combineCertLists(lhs.RequestHeaderCA, rhs.RequestHeaderCA)
232 if err != nil {
233 return ClusterAuthenticationInfo{}, err
234 }
235
236 return ret, nil
237 }
238
239 func getConfigMapDataFor(authenticationInfo ClusterAuthenticationInfo) (map[string]string, error) {
240 data := map[string]string{}
241 if authenticationInfo.ClientCA != nil {
242 if caBytes := authenticationInfo.ClientCA.CurrentCABundleContent(); len(caBytes) > 0 {
243 data["client-ca-file"] = string(caBytes)
244 }
245 }
246
247 if authenticationInfo.RequestHeaderCA == nil {
248 return data, nil
249 }
250
251 if caBytes := authenticationInfo.RequestHeaderCA.CurrentCABundleContent(); len(caBytes) > 0 {
252 var err error
253
254
255 data["requestheader-username-headers"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderUsernameHeaders.Value())
256 if err != nil {
257 return nil, err
258 }
259 data["requestheader-group-headers"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderGroupHeaders.Value())
260 if err != nil {
261 return nil, err
262 }
263 data["requestheader-extra-headers-prefix"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderExtraHeaderPrefixes.Value())
264 if err != nil {
265 return nil, err
266 }
267
268 data["requestheader-client-ca-file"] = string(caBytes)
269 data["requestheader-allowed-names"], err = jsonSerializeStringSlice(authenticationInfo.RequestHeaderAllowedNames.Value())
270 if err != nil {
271 return nil, err
272 }
273 }
274
275 return data, nil
276 }
277
278 func getClusterAuthenticationInfoFor(data map[string]string) (ClusterAuthenticationInfo, error) {
279 ret := ClusterAuthenticationInfo{}
280
281 var err error
282 ret.RequestHeaderGroupHeaders, err = jsonDeserializeStringSlice(data["requestheader-group-headers"])
283 if err != nil {
284 return ClusterAuthenticationInfo{}, err
285 }
286 ret.RequestHeaderExtraHeaderPrefixes, err = jsonDeserializeStringSlice(data["requestheader-extra-headers-prefix"])
287 if err != nil {
288 return ClusterAuthenticationInfo{}, err
289 }
290 ret.RequestHeaderAllowedNames, err = jsonDeserializeStringSlice(data["requestheader-allowed-names"])
291 if err != nil {
292 return ClusterAuthenticationInfo{}, err
293 }
294 ret.RequestHeaderUsernameHeaders, err = jsonDeserializeStringSlice(data["requestheader-username-headers"])
295 if err != nil {
296 return ClusterAuthenticationInfo{}, err
297 }
298
299 if caBundle := data["requestheader-client-ca-file"]; len(caBundle) > 0 {
300 ret.RequestHeaderCA, err = dynamiccertificates.NewStaticCAContent("existing", []byte(caBundle))
301 if err != nil {
302 return ClusterAuthenticationInfo{}, err
303 }
304 }
305
306 if caBundle := data["client-ca-file"]; len(caBundle) > 0 {
307 ret.ClientCA, err = dynamiccertificates.NewStaticCAContent("existing", []byte(caBundle))
308 if err != nil {
309 return ClusterAuthenticationInfo{}, err
310 }
311 }
312
313 return ret, nil
314 }
315
316 func jsonSerializeStringSlice(in []string) (string, error) {
317 out, err := json.Marshal(in)
318 if err != nil {
319 return "", err
320 }
321 return string(out), err
322 }
323
324 func jsonDeserializeStringSlice(in string) (headerrequest.StringSliceProvider, error) {
325 if len(in) == 0 {
326 return nil, nil
327 }
328
329 out := []string{}
330 if err := json.Unmarshal([]byte(in), &out); err != nil {
331 return nil, err
332 }
333 return headerrequest.StaticStringSlice(out), nil
334 }
335
336 func combineUniqueStringSlices(lhs, rhs headerrequest.StringSliceProvider) headerrequest.StringSliceProvider {
337 ret := []string{}
338 present := sets.String{}
339
340 if lhs != nil {
341 for _, curr := range lhs.Value() {
342 if present.Has(curr) {
343 continue
344 }
345 ret = append(ret, curr)
346 present.Insert(curr)
347 }
348 }
349
350 if rhs != nil {
351 for _, curr := range rhs.Value() {
352 if present.Has(curr) {
353 continue
354 }
355 ret = append(ret, curr)
356 present.Insert(curr)
357 }
358 }
359
360 return headerrequest.StaticStringSlice(ret)
361 }
362
363 func combineCertLists(lhs, rhs dynamiccertificates.CAContentProvider) (dynamiccertificates.CAContentProvider, error) {
364 certificates := []*x509.Certificate{}
365
366 if lhs != nil {
367 lhsCABytes := lhs.CurrentCABundleContent()
368 lhsCAs, err := cert.ParseCertsPEM(lhsCABytes)
369 if err != nil {
370 return nil, err
371 }
372 certificates = append(certificates, lhsCAs...)
373 }
374 if rhs != nil {
375 rhsCABytes := rhs.CurrentCABundleContent()
376 rhsCAs, err := cert.ParseCertsPEM(rhsCABytes)
377 if err != nil {
378 return nil, err
379 }
380 certificates = append(certificates, rhsCAs...)
381 }
382
383 certificates = filterExpiredCerts(certificates...)
384
385 finalCertificates := []*x509.Certificate{}
386
387 for i := range certificates {
388 found := false
389 for j := range finalCertificates {
390 if reflect.DeepEqual(certificates[i].Raw, finalCertificates[j].Raw) {
391 found = true
392 break
393 }
394 }
395 if !found {
396 finalCertificates = append(finalCertificates, certificates[i])
397 }
398 }
399
400 finalCABytes, err := encodeCertificates(finalCertificates...)
401 if err != nil {
402 return nil, err
403 }
404
405 if len(finalCABytes) == 0 {
406 return nil, nil
407 }
408
409
410 return dynamiccertificates.NewStaticCAContent("combined", finalCABytes)
411 }
412
413
414
415
416 func filterExpiredCerts(certs ...*x509.Certificate) []*x509.Certificate {
417 fiveMinutesAgo := time.Now().Add(-5 * time.Minute)
418
419 var validCerts []*x509.Certificate
420 for _, c := range certs {
421 if c.NotAfter.After(fiveMinutesAgo) {
422 validCerts = append(validCerts, c)
423 }
424 }
425
426 return validCerts
427 }
428
429
430 func (c *Controller) Enqueue() {
431 c.queue.Add(keyFn())
432 }
433
434
435 func (c *Controller) Run(ctx context.Context, workers int) {
436 defer utilruntime.HandleCrash()
437
438 defer c.queue.ShutDown()
439
440 klog.Infof("Starting cluster_authentication_trust_controller controller")
441 defer klog.Infof("Shutting down cluster_authentication_trust_controller controller")
442
443
444 go c.kubeSystemConfigMapInformer.Run(ctx.Done())
445
446
447 if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) {
448 return
449 }
450
451
452 go wait.Until(c.runWorker, time.Second, ctx.Done())
453
454
455
456 _ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) {
457 c.queue.Add(keyFn())
458 return false, nil
459 }, ctx.Done())
460
461
462 <-ctx.Done()
463 }
464
465 func (c *Controller) runWorker() {
466
467
468 for c.processNextWorkItem() {
469 }
470 }
471
472
473 func (c *Controller) processNextWorkItem() bool {
474
475 key, quit := c.queue.Get()
476 if quit {
477 return false
478 }
479
480 defer c.queue.Done(key)
481
482
483 err := c.syncConfigMap()
484 if err == nil {
485
486
487 c.queue.Forget(key)
488 return true
489 }
490
491
492
493 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
494
495
496
497
498 c.queue.AddRateLimited(key)
499
500 return true
501 }
502
503 func keyFn() string {
504
505 return configMapNamespace + "/" + configMapName
506 }
507
508 func encodeCertificates(certs ...*x509.Certificate) ([]byte, error) {
509 b := bytes.Buffer{}
510 for _, cert := range certs {
511 if err := pem.Encode(&b, &pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw}); err != nil {
512 return []byte{}, err
513 }
514 }
515 return b.Bytes(), nil
516 }
517
View as plain text