1
16
17 package clusterroleaggregation
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23 "time"
24
25 rbacv1ac "k8s.io/client-go/applyconfigurations/rbac/v1"
26 "k8s.io/klog/v2"
27
28 rbacv1 "k8s.io/api/rbac/v1"
29 "k8s.io/apimachinery/pkg/api/equality"
30 "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/labels"
33 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
34 "k8s.io/apimachinery/pkg/util/wait"
35 rbacinformers "k8s.io/client-go/informers/rbac/v1"
36 rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1"
37 rbaclisters "k8s.io/client-go/listers/rbac/v1"
38 "k8s.io/client-go/tools/cache"
39 "k8s.io/client-go/util/workqueue"
40
41 "k8s.io/kubernetes/pkg/controller"
42 )
43
44
45 type ClusterRoleAggregationController struct {
46 clusterRoleClient rbacclient.ClusterRolesGetter
47 clusterRoleLister rbaclisters.ClusterRoleLister
48 clusterRolesSynced cache.InformerSynced
49
50 syncHandler func(ctx context.Context, key string) error
51 queue workqueue.RateLimitingInterface
52 }
53
54
55 func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInformer, clusterRoleClient rbacclient.ClusterRolesGetter) *ClusterRoleAggregationController {
56 c := &ClusterRoleAggregationController{
57 clusterRoleClient: clusterRoleClient,
58 clusterRoleLister: clusterRoleInformer.Lister(),
59 clusterRolesSynced: clusterRoleInformer.Informer().HasSynced,
60
61 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"),
62 }
63 c.syncHandler = c.syncClusterRole
64
65 clusterRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
66 AddFunc: func(obj interface{}) {
67 c.enqueue()
68 },
69 UpdateFunc: func(old, cur interface{}) {
70 c.enqueue()
71 },
72 DeleteFunc: func(uncast interface{}) {
73 c.enqueue()
74 },
75 })
76 return c
77 }
78
79 func (c *ClusterRoleAggregationController) syncClusterRole(ctx context.Context, key string) error {
80 _, name, err := cache.SplitMetaNamespaceKey(key)
81 if err != nil {
82 return err
83 }
84 sharedClusterRole, err := c.clusterRoleLister.Get(name)
85 if errors.IsNotFound(err) {
86 return nil
87 }
88 if err != nil {
89 return err
90 }
91 if sharedClusterRole.AggregationRule == nil {
92 return nil
93 }
94
95 newPolicyRules := []rbacv1.PolicyRule{}
96 for i := range sharedClusterRole.AggregationRule.ClusterRoleSelectors {
97 selector := sharedClusterRole.AggregationRule.ClusterRoleSelectors[i]
98 runtimeLabelSelector, err := metav1.LabelSelectorAsSelector(&selector)
99 if err != nil {
100 return err
101 }
102 clusterRoles, err := c.clusterRoleLister.List(runtimeLabelSelector)
103 if err != nil {
104 return err
105 }
106 sort.Sort(byName(clusterRoles))
107
108 for i := range clusterRoles {
109 if clusterRoles[i].Name == sharedClusterRole.Name {
110 continue
111 }
112
113 for j := range clusterRoles[i].Rules {
114 currRule := clusterRoles[i].Rules[j]
115 if !ruleExists(newPolicyRules, currRule) {
116 newPolicyRules = append(newPolicyRules, currRule)
117 }
118 }
119 }
120 }
121
122 if equality.Semantic.DeepEqual(newPolicyRules, sharedClusterRole.Rules) {
123 return nil
124 }
125
126 err = c.applyClusterRoles(ctx, sharedClusterRole.Name, newPolicyRules)
127 if errors.IsUnsupportedMediaType(err) {
128
129
130
131
132 err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules)
133 }
134 return err
135 }
136
137 func (c *ClusterRoleAggregationController) applyClusterRoles(ctx context.Context, name string, newPolicyRules []rbacv1.PolicyRule) error {
138 clusterRoleApply := rbacv1ac.ClusterRole(name).
139 WithRules(toApplyPolicyRules(newPolicyRules)...)
140
141 opts := metav1.ApplyOptions{FieldManager: "clusterrole-aggregation-controller", Force: true}
142 _, err := c.clusterRoleClient.ClusterRoles().Apply(ctx, clusterRoleApply, opts)
143 return err
144 }
145
146 func (c *ClusterRoleAggregationController) updateClusterRoles(ctx context.Context, sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error {
147 clusterRole := sharedClusterRole.DeepCopy()
148 clusterRole.Rules = nil
149 for _, rule := range newPolicyRules {
150 clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy())
151 }
152 _, err := c.clusterRoleClient.ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{})
153 return err
154 }
155
156 func toApplyPolicyRules(rules []rbacv1.PolicyRule) []*rbacv1ac.PolicyRuleApplyConfiguration {
157 var result []*rbacv1ac.PolicyRuleApplyConfiguration
158 for _, rule := range rules {
159 result = append(result, toApplyPolicyRule(rule))
160 }
161 return result
162 }
163
164 func toApplyPolicyRule(rule rbacv1.PolicyRule) *rbacv1ac.PolicyRuleApplyConfiguration {
165 result := rbacv1ac.PolicyRule()
166 result.Resources = rule.Resources
167 result.ResourceNames = rule.ResourceNames
168 result.APIGroups = rule.APIGroups
169 result.NonResourceURLs = rule.NonResourceURLs
170 result.Verbs = rule.Verbs
171 return result
172 }
173
174 func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
175 for _, curr := range haystack {
176 if equality.Semantic.DeepEqual(curr, needle) {
177 return true
178 }
179 }
180 return false
181 }
182
183
184 func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) {
185 defer utilruntime.HandleCrash()
186 defer c.queue.ShutDown()
187
188 logger := klog.FromContext(ctx)
189 logger.Info("Starting ClusterRoleAggregator controller")
190 defer logger.Info("Shutting down ClusterRoleAggregator controller")
191
192 if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", ctx.Done(), c.clusterRolesSynced) {
193 return
194 }
195
196 for i := 0; i < workers; i++ {
197 go wait.UntilWithContext(ctx, c.runWorker, time.Second)
198 }
199
200 <-ctx.Done()
201 }
202
203 func (c *ClusterRoleAggregationController) runWorker(ctx context.Context) {
204 for c.processNextWorkItem(ctx) {
205 }
206 }
207
208 func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Context) bool {
209 dsKey, quit := c.queue.Get()
210 if quit {
211 return false
212 }
213 defer c.queue.Done(dsKey)
214
215 err := c.syncHandler(ctx, dsKey.(string))
216 if err == nil {
217 c.queue.Forget(dsKey)
218 return true
219 }
220
221 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
222 c.queue.AddRateLimited(dsKey)
223
224 return true
225 }
226
227 func (c *ClusterRoleAggregationController) enqueue() {
228
229
230
231 allClusterRoles, err := c.clusterRoleLister.List(labels.Everything())
232 if err != nil {
233 utilruntime.HandleError(fmt.Errorf("Couldn't list all objects %v", err))
234 return
235 }
236 for _, clusterRole := range allClusterRoles {
237
238 if clusterRole.AggregationRule == nil {
239 continue
240 }
241 key, err := controller.KeyFunc(clusterRole)
242 if err != nil {
243 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", clusterRole, err))
244 return
245 }
246 c.queue.Add(key)
247 }
248 }
249
250 type byName []*rbacv1.ClusterRole
251
252 func (a byName) Len() int { return len(a) }
253 func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
254 func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
255
View as plain text