1
16
17 package rootcacertpublisher
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29 "k8s.io/apimachinery/pkg/util/wait"
30 coreinformers "k8s.io/client-go/informers/core/v1"
31 clientset "k8s.io/client-go/kubernetes"
32 corelisters "k8s.io/client-go/listers/core/v1"
33 "k8s.io/client-go/tools/cache"
34 "k8s.io/client-go/util/workqueue"
35 "k8s.io/klog/v2"
36 )
37
38
39
40 const (
41 RootCACertConfigMapName = "kube-root-ca.crt"
42 DescriptionAnnotation = "kubernetes.io/description"
43 Description = "Contains a CA bundle that can be used to verify the kube-apiserver when using internal endpoints such as the internal service IP or kubernetes.default.svc. " +
44 "No other usage is guaranteed across distributions of Kubernetes clusters."
45 )
46
47 func init() {
48 registerMetrics()
49 }
50
51
52
53
54 func NewPublisher(cmInformer coreinformers.ConfigMapInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, rootCA []byte) (*Publisher, error) {
55 e := &Publisher{
56 client: cl,
57 rootCA: rootCA,
58 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "root_ca_cert_publisher"),
59 }
60
61 cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
62 DeleteFunc: e.configMapDeleted,
63 UpdateFunc: e.configMapUpdated,
64 })
65 e.cmLister = cmInformer.Lister()
66 e.cmListerSynced = cmInformer.Informer().HasSynced
67
68 nsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
69 AddFunc: e.namespaceAdded,
70 UpdateFunc: e.namespaceUpdated,
71 })
72 e.nsListerSynced = nsInformer.Informer().HasSynced
73
74 e.syncHandler = e.syncNamespace
75
76 return e, nil
77
78 }
79
80
81 type Publisher struct {
82 client clientset.Interface
83 rootCA []byte
84
85
86 syncHandler func(ctx context.Context, key string) error
87
88 cmLister corelisters.ConfigMapLister
89 cmListerSynced cache.InformerSynced
90
91 nsListerSynced cache.InformerSynced
92
93 queue workqueue.RateLimitingInterface
94 }
95
96
97 func (c *Publisher) Run(ctx context.Context, workers int) {
98 defer utilruntime.HandleCrash()
99 defer c.queue.ShutDown()
100
101 logger := klog.FromContext(ctx)
102 logger.Info("Starting root CA cert publisher controller")
103 defer logger.Info("Shutting down root CA cert publisher controller")
104
105 if !cache.WaitForNamedCacheSync("crt configmap", ctx.Done(), c.cmListerSynced) {
106 return
107 }
108
109 for i := 0; i < workers; i++ {
110 go wait.UntilWithContext(ctx, c.runWorker, time.Second)
111 }
112
113 <-ctx.Done()
114 }
115
116 func (c *Publisher) configMapDeleted(obj interface{}) {
117 cm, err := convertToCM(obj)
118 if err != nil {
119 utilruntime.HandleError(err)
120 return
121 }
122 if cm.Name != RootCACertConfigMapName {
123 return
124 }
125 c.queue.Add(cm.Namespace)
126 }
127
128 func (c *Publisher) configMapUpdated(_, newObj interface{}) {
129 cm, err := convertToCM(newObj)
130 if err != nil {
131 utilruntime.HandleError(err)
132 return
133 }
134 if cm.Name != RootCACertConfigMapName {
135 return
136 }
137 c.queue.Add(cm.Namespace)
138 }
139
140 func (c *Publisher) namespaceAdded(obj interface{}) {
141 namespace := obj.(*v1.Namespace)
142 c.queue.Add(namespace.Name)
143 }
144
145 func (c *Publisher) namespaceUpdated(oldObj interface{}, newObj interface{}) {
146 newNamespace := newObj.(*v1.Namespace)
147 if newNamespace.Status.Phase != v1.NamespaceActive {
148 return
149 }
150 c.queue.Add(newNamespace.Name)
151 }
152
153 func (c *Publisher) runWorker(ctx context.Context) {
154 for c.processNextWorkItem(ctx) {
155 }
156 }
157
158
159
160 func (c *Publisher) processNextWorkItem(ctx context.Context) bool {
161 key, quit := c.queue.Get()
162 if quit {
163 return false
164 }
165 defer c.queue.Done(key)
166
167 if err := c.syncHandler(ctx, key.(string)); err != nil {
168 utilruntime.HandleError(fmt.Errorf("syncing %q failed: %v", key, err))
169 c.queue.AddRateLimited(key)
170 return true
171 }
172
173 c.queue.Forget(key)
174 return true
175 }
176
177 func (c *Publisher) syncNamespace(ctx context.Context, ns string) (err error) {
178 startTime := time.Now()
179 defer func() {
180 recordMetrics(startTime, err)
181 klog.FromContext(ctx).V(4).Info("Finished syncing namespace", "namespace", ns, "elapsedTime", time.Since(startTime))
182 }()
183
184 cm, err := c.cmLister.ConfigMaps(ns).Get(RootCACertConfigMapName)
185 switch {
186 case apierrors.IsNotFound(err):
187 _, err = c.client.CoreV1().ConfigMaps(ns).Create(ctx, &v1.ConfigMap{
188 ObjectMeta: metav1.ObjectMeta{
189 Name: RootCACertConfigMapName,
190 Annotations: map[string]string{DescriptionAnnotation: Description},
191 },
192 Data: map[string]string{
193 "ca.crt": string(c.rootCA),
194 },
195 }, metav1.CreateOptions{})
196
197 if apierrors.IsNotFound(err) || apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
198 return nil
199 }
200 return err
201 case err != nil:
202 return err
203 }
204
205 data := map[string]string{
206 "ca.crt": string(c.rootCA),
207 }
208
209
210 if reflect.DeepEqual(cm.Data, data) && len(cm.Annotations[DescriptionAnnotation]) > 0 {
211 return nil
212 }
213
214
215 cm = cm.DeepCopy()
216 cm.Data = data
217 if cm.Annotations == nil {
218 cm.Annotations = map[string]string{}
219 }
220 cm.Annotations[DescriptionAnnotation] = Description
221
222 _, err = c.client.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
223 return err
224 }
225
226 func convertToCM(obj interface{}) (*v1.ConfigMap, error) {
227 cm, ok := obj.(*v1.ConfigMap)
228 if !ok {
229 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
230 if !ok {
231 return nil, fmt.Errorf("couldn't get object from tombstone %#v", obj)
232 }
233 cm, ok = tombstone.Obj.(*v1.ConfigMap)
234 if !ok {
235 return nil, fmt.Errorf("tombstone contained object that is not a ConfigMap %#v", obj)
236 }
237 }
238 return cm, nil
239 }
240
View as plain text