1
16
17 package legacytokentracking
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "golang.org/x/time/rate"
25
26 corev1 "k8s.io/api/core/v1"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/fields"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 corev1informers "k8s.io/client-go/informers/core/v1"
33 "k8s.io/client-go/kubernetes"
34 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
35 "k8s.io/client-go/tools/cache"
36 "k8s.io/client-go/util/workqueue"
37 "k8s.io/klog/v2"
38 "k8s.io/utils/clock"
39 )
40
41 const (
42 ConfigMapName = "kube-apiserver-legacy-service-account-token-tracking"
43 ConfigMapDataKey = "since"
44 dateFormat = "2006-01-02"
45 )
46
47 var (
48 queueKey = metav1.NamespaceSystem + "/" + ConfigMapName
49 )
50
51
52
53
54
55
56 type Controller struct {
57 configMapClient corev1client.ConfigMapsGetter
58 configMapInformer cache.SharedIndexInformer
59 configMapCache cache.Indexer
60 configMapSynced cache.InformerSynced
61 queue workqueue.RateLimitingInterface
62
63
64
65
66
67
68 creationRatelimiter *rate.Limiter
69 clock clock.Clock
70 }
71
72
73 func NewController(cs kubernetes.Interface) *Controller {
74 return newController(cs, clock.RealClock{}, rate.NewLimiter(rate.Every(30*time.Minute), 1))
75 }
76
77 func newController(cs kubernetes.Interface, cl clock.Clock, limiter *rate.Limiter) *Controller {
78 informer := corev1informers.NewFilteredConfigMapInformer(cs, metav1.NamespaceSystem, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(options *metav1.ListOptions) {
79 options.FieldSelector = fields.OneTermEqualSelector("metadata.name", ConfigMapName).String()
80 })
81
82 c := &Controller{
83 configMapClient: cs.CoreV1(),
84 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "legacy_token_tracking_controller"),
85 configMapInformer: informer,
86 configMapCache: informer.GetIndexer(),
87 configMapSynced: informer.HasSynced,
88 creationRatelimiter: limiter,
89 clock: cl,
90 }
91
92 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
93 AddFunc: func(obj interface{}) {
94 c.enqueue()
95 },
96 UpdateFunc: func(oldObj, newObj interface{}) {
97 c.enqueue()
98 },
99 DeleteFunc: func(obj interface{}) {
100 c.enqueue()
101 },
102 })
103
104 return c
105 }
106
107 func (c *Controller) enqueue() {
108 c.queue.Add(queueKey)
109 }
110
111
112 func (c *Controller) Run(stopCh <-chan struct{}) {
113 defer utilruntime.HandleCrash()
114 defer c.queue.ShutDown()
115
116 klog.Info("Starting legacy_token_tracking_controller")
117 defer klog.Infof("Shutting down legacy_token_tracking_controller")
118
119 go c.configMapInformer.Run(stopCh)
120 if !cache.WaitForNamedCacheSync("configmaps", stopCh, c.configMapSynced) {
121 return
122 }
123
124 go wait.Until(c.runWorker, time.Second, stopCh)
125
126 c.queue.Add(queueKey)
127
128 <-stopCh
129 klog.Info("Ending legacy_token_tracking_controller")
130 }
131
132 func (c *Controller) runWorker() {
133 for c.processNext() {
134 }
135 }
136
137 func (c *Controller) processNext() bool {
138 key, quit := c.queue.Get()
139 if quit {
140 return false
141 }
142 defer c.queue.Done(key)
143
144 if err := c.syncConfigMap(); err != nil {
145 utilruntime.HandleError(fmt.Errorf("while syncing ConfigMap %q, err: %w", key, err))
146 c.queue.AddRateLimited(key)
147 return true
148 }
149 c.queue.Forget(key)
150 return true
151 }
152
153 func (c *Controller) syncConfigMap() error {
154 obj, exists, err := c.configMapCache.GetByKey(queueKey)
155 if err != nil {
156 return err
157 }
158
159 now := c.clock.Now()
160 if !exists {
161 r := c.creationRatelimiter.ReserveN(now, 1)
162 if delay := r.DelayFrom(now); delay > 0 {
163 c.queue.AddAfter(queueKey, delay)
164 r.CancelAt(now)
165 return nil
166 }
167
168 if _, err = c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &corev1.ConfigMap{
169 ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: ConfigMapName},
170 Data: map[string]string{ConfigMapDataKey: now.UTC().Format(dateFormat)},
171 }, metav1.CreateOptions{}); err != nil {
172 if apierrors.IsAlreadyExists(err) {
173 return nil
174 }
175
176 r.CancelAt(now)
177 return err
178 }
179 } else {
180 configMap := obj.(*corev1.ConfigMap)
181 if _, err = time.Parse(dateFormat, configMap.Data[ConfigMapDataKey]); err != nil {
182 configMap := configMap.DeepCopy()
183 if configMap.Data == nil {
184 configMap.Data = map[string]string{}
185 }
186 configMap.Data[ConfigMapDataKey] = now.UTC().Format(dateFormat)
187 if _, err = c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
188 if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
189 return nil
190 }
191 return err
192 }
193 }
194 }
195
196 return nil
197 }
198
View as plain text