...

Source file src/k8s.io/kubernetes/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/clusterroleaggregation

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package clusterroleaggregation
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  	"time"
    24  
    25  	rbacv1ac "k8s.io/client-go/applyconfigurations/rbac/v1"
    26  	"k8s.io/klog/v2"
    27  
    28  	rbacv1 "k8s.io/api/rbac/v1"
    29  	"k8s.io/apimachinery/pkg/api/equality"
    30  	"k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/labels"
    33  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	rbacinformers "k8s.io/client-go/informers/rbac/v1"
    36  	rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
    37  	rbaclisters "k8s.io/client-go/listers/rbac/v1"
    38  	"k8s.io/client-go/tools/cache"
    39  	"k8s.io/client-go/util/workqueue"
    40  
    41  	"k8s.io/kubernetes/pkg/controller"
    42  )
    43  
    44  // ClusterRoleAggregationController is a controller to combine cluster roles
    45  type ClusterRoleAggregationController struct {
    46  	clusterRoleClient  rbacclient.ClusterRolesGetter
    47  	clusterRoleLister  rbaclisters.ClusterRoleLister
    48  	clusterRolesSynced cache.InformerSynced
    49  
    50  	syncHandler func(ctx context.Context, key string) error
    51  	queue       workqueue.RateLimitingInterface
    52  }
    53  
    54  // NewClusterRoleAggregation creates a new controller
    55  func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInformer, clusterRoleClient rbacclient.ClusterRolesGetter) *ClusterRoleAggregationController {
    56  	c := &ClusterRoleAggregationController{
    57  		clusterRoleClient:  clusterRoleClient,
    58  		clusterRoleLister:  clusterRoleInformer.Lister(),
    59  		clusterRolesSynced: clusterRoleInformer.Informer().HasSynced,
    60  
    61  		queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"),
    62  	}
    63  	c.syncHandler = c.syncClusterRole
    64  
    65  	clusterRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    66  		AddFunc: func(obj interface{}) {
    67  			c.enqueue()
    68  		},
    69  		UpdateFunc: func(old, cur interface{}) {
    70  			c.enqueue()
    71  		},
    72  		DeleteFunc: func(uncast interface{}) {
    73  			c.enqueue()
    74  		},
    75  	})
    76  	return c
    77  }
    78  
    79  func (c *ClusterRoleAggregationController) syncClusterRole(ctx context.Context, key string) error {
    80  	_, name, err := cache.SplitMetaNamespaceKey(key)
    81  	if err != nil {
    82  		return err
    83  	}
    84  	sharedClusterRole, err := c.clusterRoleLister.Get(name)
    85  	if errors.IsNotFound(err) {
    86  		return nil
    87  	}
    88  	if err != nil {
    89  		return err
    90  	}
    91  	if sharedClusterRole.AggregationRule == nil {
    92  		return nil
    93  	}
    94  
    95  	newPolicyRules := []rbacv1.PolicyRule{}
    96  	for i := range sharedClusterRole.AggregationRule.ClusterRoleSelectors {
    97  		selector := sharedClusterRole.AggregationRule.ClusterRoleSelectors[i]
    98  		runtimeLabelSelector, err := metav1.LabelSelectorAsSelector(&selector)
    99  		if err != nil {
   100  			return err
   101  		}
   102  		clusterRoles, err := c.clusterRoleLister.List(runtimeLabelSelector)
   103  		if err != nil {
   104  			return err
   105  		}
   106  		sort.Sort(byName(clusterRoles))
   107  
   108  		for i := range clusterRoles {
   109  			if clusterRoles[i].Name == sharedClusterRole.Name {
   110  				continue
   111  			}
   112  
   113  			for j := range clusterRoles[i].Rules {
   114  				currRule := clusterRoles[i].Rules[j]
   115  				if !ruleExists(newPolicyRules, currRule) {
   116  					newPolicyRules = append(newPolicyRules, currRule)
   117  				}
   118  			}
   119  		}
   120  	}
   121  
   122  	if equality.Semantic.DeepEqual(newPolicyRules, sharedClusterRole.Rules) {
   123  		return nil
   124  	}
   125  
   126  	err = c.applyClusterRoles(ctx, sharedClusterRole.Name, newPolicyRules)
   127  	if errors.IsUnsupportedMediaType(err) { // TODO: Remove this fallback at least one release after ServerSideApply GA
   128  		// When Server Side Apply is not enabled, fallback to Update. This is required when running
   129  		// 1.21 since api-server can be 1.20 during the upgrade/downgrade.
   130  		// Since Server Side Apply is enabled by default in Beta, this fallback only kicks in
   131  		// if the feature has been disabled using its feature flag.
   132  		err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules)
   133  	}
   134  	return err
   135  }
   136  
   137  func (c *ClusterRoleAggregationController) applyClusterRoles(ctx context.Context, name string, newPolicyRules []rbacv1.PolicyRule) error {
   138  	clusterRoleApply := rbacv1ac.ClusterRole(name).
   139  		WithRules(toApplyPolicyRules(newPolicyRules)...)
   140  
   141  	opts := metav1.ApplyOptions{FieldManager: "clusterrole-aggregation-controller", Force: true}
   142  	_, err := c.clusterRoleClient.ClusterRoles().Apply(ctx, clusterRoleApply, opts)
   143  	return err
   144  }
   145  
   146  func (c *ClusterRoleAggregationController) updateClusterRoles(ctx context.Context, sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error {
   147  	clusterRole := sharedClusterRole.DeepCopy()
   148  	clusterRole.Rules = nil
   149  	for _, rule := range newPolicyRules {
   150  		clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy())
   151  	}
   152  	_, err := c.clusterRoleClient.ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{})
   153  	return err
   154  }
   155  
   156  func toApplyPolicyRules(rules []rbacv1.PolicyRule) []*rbacv1ac.PolicyRuleApplyConfiguration {
   157  	var result []*rbacv1ac.PolicyRuleApplyConfiguration
   158  	for _, rule := range rules {
   159  		result = append(result, toApplyPolicyRule(rule))
   160  	}
   161  	return result
   162  }
   163  
   164  func toApplyPolicyRule(rule rbacv1.PolicyRule) *rbacv1ac.PolicyRuleApplyConfiguration {
   165  	result := rbacv1ac.PolicyRule()
   166  	result.Resources = rule.Resources
   167  	result.ResourceNames = rule.ResourceNames
   168  	result.APIGroups = rule.APIGroups
   169  	result.NonResourceURLs = rule.NonResourceURLs
   170  	result.Verbs = rule.Verbs
   171  	return result
   172  }
   173  
   174  func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
   175  	for _, curr := range haystack {
   176  		if equality.Semantic.DeepEqual(curr, needle) {
   177  			return true
   178  		}
   179  	}
   180  	return false
   181  }
   182  
   183  // Run starts the controller and blocks until stopCh is closed.
   184  func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) {
   185  	defer utilruntime.HandleCrash()
   186  	defer c.queue.ShutDown()
   187  
   188  	logger := klog.FromContext(ctx)
   189  	logger.Info("Starting ClusterRoleAggregator controller")
   190  	defer logger.Info("Shutting down ClusterRoleAggregator controller")
   191  
   192  	if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", ctx.Done(), c.clusterRolesSynced) {
   193  		return
   194  	}
   195  
   196  	for i := 0; i < workers; i++ {
   197  		go wait.UntilWithContext(ctx, c.runWorker, time.Second)
   198  	}
   199  
   200  	<-ctx.Done()
   201  }
   202  
   203  func (c *ClusterRoleAggregationController) runWorker(ctx context.Context) {
   204  	for c.processNextWorkItem(ctx) {
   205  	}
   206  }
   207  
   208  func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Context) bool {
   209  	dsKey, quit := c.queue.Get()
   210  	if quit {
   211  		return false
   212  	}
   213  	defer c.queue.Done(dsKey)
   214  
   215  	err := c.syncHandler(ctx, dsKey.(string))
   216  	if err == nil {
   217  		c.queue.Forget(dsKey)
   218  		return true
   219  	}
   220  
   221  	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
   222  	c.queue.AddRateLimited(dsKey)
   223  
   224  	return true
   225  }
   226  
   227  func (c *ClusterRoleAggregationController) enqueue() {
   228  	// this is unusual, but since the set of all clusterroles is small and we don't know the dependency
   229  	// graph, just queue up every thing each time.  This allows errors to be selectively retried if there
   230  	// is a problem updating a single role
   231  	allClusterRoles, err := c.clusterRoleLister.List(labels.Everything())
   232  	if err != nil {
   233  		utilruntime.HandleError(fmt.Errorf("Couldn't list all objects %v", err))
   234  		return
   235  	}
   236  	for _, clusterRole := range allClusterRoles {
   237  		// only queue ones that we may need to aggregate
   238  		if clusterRole.AggregationRule == nil {
   239  			continue
   240  		}
   241  		key, err := controller.KeyFunc(clusterRole)
   242  		if err != nil {
   243  			utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", clusterRole, err))
   244  			return
   245  		}
   246  		c.queue.Add(key)
   247  	}
   248  }
   249  
   250  type byName []*rbacv1.ClusterRole
   251  
   252  func (a byName) Len() int           { return len(a) }
   253  func (a byName) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
   254  func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
   255  

View as plain text