...
1
16
17 package validatingadmissionpolicystatus
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "k8s.io/api/admissionregistration/v1"
25 kerrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/apimachinery/pkg/util/wait"
29 validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
30 admissionregistrationv1apply "k8s.io/client-go/applyconfigurations/admissionregistration/v1"
31 informerv1 "k8s.io/client-go/informers/admissionregistration/v1"
32 admissionregistrationv1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1"
33 "k8s.io/client-go/tools/cache"
34 "k8s.io/client-go/util/workqueue"
35 )
36
37
38 const ControllerName = "validatingadmissionpolicy-status"
39
40
41
42 type Controller struct {
43 policyInformer informerv1.ValidatingAdmissionPolicyInformer
44 policyQueue workqueue.RateLimitingInterface
45 policySynced cache.InformerSynced
46 policyClient admissionregistrationv1.ValidatingAdmissionPolicyInterface
47
48
49
50
51 typeChecker *validatingadmissionpolicy.TypeChecker
52 }
53
54 func (c *Controller) Run(ctx context.Context, workers int) {
55 defer utilruntime.HandleCrash()
56
57 if !cache.WaitForNamedCacheSync(ControllerName, ctx.Done(), c.policySynced) {
58 return
59 }
60
61 defer c.policyQueue.ShutDown()
62 for i := 0; i < workers; i++ {
63 go wait.UntilWithContext(ctx, c.runWorker, time.Second)
64 }
65
66 <-ctx.Done()
67 }
68
69 func NewController(policyInformer informerv1.ValidatingAdmissionPolicyInformer, policyClient admissionregistrationv1.ValidatingAdmissionPolicyInterface, typeChecker *validatingadmissionpolicy.TypeChecker) (*Controller, error) {
70 c := &Controller{
71 policyInformer: policyInformer,
72 policyQueue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: ControllerName}),
73 policyClient: policyClient,
74 typeChecker: typeChecker,
75 }
76 reg, err := policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
77 AddFunc: func(obj interface{}) {
78 c.enqueuePolicy(obj)
79 },
80 UpdateFunc: func(oldObj, newObj interface{}) {
81 c.enqueuePolicy(newObj)
82 },
83 })
84 if err != nil {
85 return nil, err
86 }
87 c.policySynced = reg.HasSynced
88 return c, nil
89 }
90
91 func (c *Controller) enqueuePolicy(policy any) {
92 if policy, ok := policy.(*v1.ValidatingAdmissionPolicy); ok {
93
94 key := policy.ObjectMeta.Name
95 if key == "" {
96 utilruntime.HandleError(fmt.Errorf("cannot get name of object %v", policy))
97 }
98 c.policyQueue.Add(key)
99 }
100 }
101
102 func (c *Controller) runWorker(ctx context.Context) {
103 for c.processNextWorkItem(ctx) {
104 }
105 }
106
107 func (c *Controller) processNextWorkItem(ctx context.Context) bool {
108 key, shutdown := c.policyQueue.Get()
109 if shutdown {
110 return false
111 }
112 defer c.policyQueue.Done(key)
113
114 err := func() error {
115 key, ok := key.(string)
116 if !ok {
117 return fmt.Errorf("expect a string but got %v", key)
118 }
119 policy, err := c.policyInformer.Lister().Get(key)
120 if err != nil {
121 if kerrors.IsNotFound(err) {
122
123 return nil
124 }
125 return err
126 }
127 return c.reconcile(ctx, policy)
128 }()
129
130 if err == nil {
131 c.policyQueue.Forget(key)
132 return true
133 }
134
135 utilruntime.HandleError(err)
136 c.policyQueue.AddRateLimited(key)
137
138 return true
139 }
140
141 func (c *Controller) reconcile(ctx context.Context, policy *v1.ValidatingAdmissionPolicy) error {
142 if policy == nil {
143 return nil
144 }
145 if policy.Generation <= policy.Status.ObservedGeneration {
146 return nil
147 }
148 warnings := c.typeChecker.Check(policy)
149 warningsConfig := make([]*admissionregistrationv1apply.ExpressionWarningApplyConfiguration, 0, len(warnings))
150 for _, warning := range warnings {
151 warningsConfig = append(warningsConfig, admissionregistrationv1apply.ExpressionWarning().
152 WithFieldRef(warning.FieldRef).
153 WithWarning(warning.Warning))
154 }
155 applyConfig := admissionregistrationv1apply.ValidatingAdmissionPolicy(policy.Name).
156 WithStatus(admissionregistrationv1apply.ValidatingAdmissionPolicyStatus().
157 WithObservedGeneration(policy.Generation).
158 WithTypeChecking(admissionregistrationv1apply.TypeChecking().
159 WithExpressionWarnings(warningsConfig...)))
160 _, err := c.policyClient.ApplyStatus(ctx, applyConfig, metav1.ApplyOptions{FieldManager: ControllerName, Force: true})
161 return err
162 }
163
View as plain text