1
16
17
18
19 package certificates
20
21 import (
22 "context"
23 "fmt"
24 "time"
25
26 "golang.org/x/time/rate"
27
28 certificates "k8s.io/api/certificates/v1"
29 "k8s.io/apimachinery/pkg/api/errors"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 certificatesinformers "k8s.io/client-go/informers/certificates/v1"
33 clientset "k8s.io/client-go/kubernetes"
34 certificateslisters "k8s.io/client-go/listers/certificates/v1"
35 "k8s.io/client-go/tools/cache"
36 "k8s.io/client-go/util/workqueue"
37 "k8s.io/klog/v2"
38 "k8s.io/kubernetes/pkg/controller"
39 )
40
41 type CertificateController struct {
42
43 name string
44
45 kubeClient clientset.Interface
46
47 csrLister certificateslisters.CertificateSigningRequestLister
48 csrsSynced cache.InformerSynced
49
50 handler func(context.Context, *certificates.CertificateSigningRequest) error
51
52 queue workqueue.RateLimitingInterface
53 }
54
55 func NewCertificateController(
56 ctx context.Context,
57 name string,
58 kubeClient clientset.Interface,
59 csrInformer certificatesinformers.CertificateSigningRequestInformer,
60 handler func(context.Context, *certificates.CertificateSigningRequest) error,
61 ) *CertificateController {
62 logger := klog.FromContext(ctx)
63 cc := &CertificateController{
64 name: name,
65 kubeClient: kubeClient,
66 queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
67 workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
68
69 &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
70 ), "certificate"),
71 handler: handler,
72 }
73
74
75 csrInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
76 AddFunc: func(obj interface{}) {
77 csr := obj.(*certificates.CertificateSigningRequest)
78 logger.V(4).Info("Adding certificate request", "csr", csr.Name)
79 cc.enqueueCertificateRequest(obj)
80 },
81 UpdateFunc: func(old, new interface{}) {
82 oldCSR := old.(*certificates.CertificateSigningRequest)
83 logger.V(4).Info("Updating certificate request", "old", oldCSR.Name)
84 cc.enqueueCertificateRequest(new)
85 },
86 DeleteFunc: func(obj interface{}) {
87 csr, ok := obj.(*certificates.CertificateSigningRequest)
88 if !ok {
89 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
90 if !ok {
91 logger.V(2).Info("Couldn't get object from tombstone", "object", obj)
92 return
93 }
94 csr, ok = tombstone.Obj.(*certificates.CertificateSigningRequest)
95 if !ok {
96 logger.V(2).Info("Tombstone contained object that is not a CSR", "object", obj)
97 return
98 }
99 }
100 logger.V(4).Info("Deleting certificate request", "csr", csr.Name)
101 cc.enqueueCertificateRequest(obj)
102 },
103 })
104 cc.csrLister = csrInformer.Lister()
105 cc.csrsSynced = csrInformer.Informer().HasSynced
106 return cc
107 }
108
109
110 func (cc *CertificateController) Run(ctx context.Context, workers int) {
111 defer utilruntime.HandleCrash()
112 defer cc.queue.ShutDown()
113
114 logger := klog.FromContext(ctx)
115 logger.Info("Starting certificate controller", "name", cc.name)
116 defer logger.Info("Shutting down certificate controller", "name", cc.name)
117
118 if !cache.WaitForNamedCacheSync(fmt.Sprintf("certificate-%s", cc.name), ctx.Done(), cc.csrsSynced) {
119 return
120 }
121
122 for i := 0; i < workers; i++ {
123 go wait.UntilWithContext(ctx, cc.worker, time.Second)
124 }
125
126 <-ctx.Done()
127 }
128
129
130 func (cc *CertificateController) worker(ctx context.Context) {
131 for cc.processNextWorkItem(ctx) {
132 }
133 }
134
135
136 func (cc *CertificateController) processNextWorkItem(ctx context.Context) bool {
137 cKey, quit := cc.queue.Get()
138 if quit {
139 return false
140 }
141 defer cc.queue.Done(cKey)
142
143 if err := cc.syncFunc(ctx, cKey.(string)); err != nil {
144 cc.queue.AddRateLimited(cKey)
145 if _, ignorable := err.(ignorableError); !ignorable {
146 utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
147 } else {
148 klog.FromContext(ctx).V(4).Info("Sync certificate request failed", "csr", cKey, "err", err)
149 }
150 return true
151 }
152
153 cc.queue.Forget(cKey)
154 return true
155
156 }
157
158 func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
159 key, err := controller.KeyFunc(obj)
160 if err != nil {
161 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
162 return
163 }
164 cc.queue.Add(key)
165 }
166
167 func (cc *CertificateController) syncFunc(ctx context.Context, key string) error {
168 logger := klog.FromContext(ctx)
169 startTime := time.Now()
170 defer func() {
171 logger.V(4).Info("Finished syncing certificate request", "csr", key, "elapsedTime", time.Since(startTime))
172 }()
173 csr, err := cc.csrLister.Get(key)
174 if errors.IsNotFound(err) {
175 logger.V(3).Info("csr has been deleted", "csr", key)
176 return nil
177 }
178 if err != nil {
179 return err
180 }
181
182 if len(csr.Status.Certificate) > 0 {
183
184 return nil
185 }
186
187
188 csr = csr.DeepCopy()
189 return cc.handler(ctx, csr)
190 }
191
192
193
194
195
196 func IgnorableError(s string, args ...interface{}) ignorableError {
197 return ignorableError(fmt.Sprintf(s, args...))
198 }
199
200 type ignorableError string
201
202 func (e ignorableError) Error() string {
203 return string(e)
204 }
205
View as plain text