1
16
17 package bootstrap
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/apimachinery/pkg/util/wait"
29 coreinformers "k8s.io/client-go/informers/core/v1"
30 clientset "k8s.io/client-go/kubernetes"
31 corelisters "k8s.io/client-go/listers/core/v1"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/util/workqueue"
34 bootstrapapi "k8s.io/cluster-bootstrap/token/api"
35 bootstrapsecretutil "k8s.io/cluster-bootstrap/util/secrets"
36 "k8s.io/klog/v2"
37 api "k8s.io/kubernetes/pkg/apis/core"
38 "k8s.io/kubernetes/pkg/controller"
39 )
40
41
42 type TokenCleanerOptions struct {
43
44 TokenSecretNamespace string
45
46
47
48 SecretResync time.Duration
49 }
50
51
52
53 func DefaultTokenCleanerOptions() TokenCleanerOptions {
54 return TokenCleanerOptions{
55 TokenSecretNamespace: api.NamespaceSystem,
56 }
57 }
58
59
60 type TokenCleaner struct {
61 tokenSecretNamespace string
62
63 client clientset.Interface
64
65
66 secretLister corelisters.SecretLister
67
68
69 secretSynced cache.InformerSynced
70
71 queue workqueue.RateLimitingInterface
72 }
73
74
75 func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInformer, options TokenCleanerOptions) (*TokenCleaner, error) {
76 e := &TokenCleaner{
77 client: cl,
78 secretLister: secrets.Lister(),
79 secretSynced: secrets.Informer().HasSynced,
80 tokenSecretNamespace: options.TokenSecretNamespace,
81 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"),
82 }
83
84 secrets.Informer().AddEventHandlerWithResyncPeriod(
85 cache.FilteringResourceEventHandler{
86 FilterFunc: func(obj interface{}) bool {
87 switch t := obj.(type) {
88 case *v1.Secret:
89 return t.Type == bootstrapapi.SecretTypeBootstrapToken && t.Namespace == e.tokenSecretNamespace
90 default:
91 utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
92 return false
93 }
94 },
95 Handler: cache.ResourceEventHandlerFuncs{
96 AddFunc: e.enqueueSecrets,
97 UpdateFunc: func(oldSecret, newSecret interface{}) { e.enqueueSecrets(newSecret) },
98 },
99 },
100 options.SecretResync,
101 )
102
103 return e, nil
104 }
105
106
107 func (tc *TokenCleaner) Run(ctx context.Context) {
108 defer utilruntime.HandleCrash()
109 defer tc.queue.ShutDown()
110
111 logger := klog.FromContext(ctx)
112 logger.Info("Starting token cleaner controller")
113 defer logger.Info("Shutting down token cleaner controller")
114
115 if !cache.WaitForNamedCacheSync("token_cleaner", ctx.Done(), tc.secretSynced) {
116 return
117 }
118
119 go wait.UntilWithContext(ctx, tc.worker, 10*time.Second)
120
121 <-ctx.Done()
122 }
123
124 func (tc *TokenCleaner) enqueueSecrets(obj interface{}) {
125 key, err := controller.KeyFunc(obj)
126 if err != nil {
127 utilruntime.HandleError(err)
128 return
129 }
130 tc.queue.Add(key)
131 }
132
133
134 func (tc *TokenCleaner) worker(ctx context.Context) {
135 for tc.processNextWorkItem(ctx) {
136 }
137 }
138
139
140 func (tc *TokenCleaner) processNextWorkItem(ctx context.Context) bool {
141 key, quit := tc.queue.Get()
142 if quit {
143 return false
144 }
145 defer tc.queue.Done(key)
146
147 if err := tc.syncFunc(ctx, key.(string)); err != nil {
148 tc.queue.AddRateLimited(key)
149 utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err))
150 return true
151 }
152
153 tc.queue.Forget(key)
154 return true
155 }
156
157 func (tc *TokenCleaner) syncFunc(ctx context.Context, key string) error {
158 logger := klog.FromContext(ctx)
159 startTime := time.Now()
160 defer func() {
161 logger.V(4).Info("Finished syncing secret", "secret", key, "elapsedTime", time.Since(startTime))
162 }()
163
164 namespace, name, err := cache.SplitMetaNamespaceKey(key)
165 if err != nil {
166 return err
167 }
168
169 ret, err := tc.secretLister.Secrets(namespace).Get(name)
170 if apierrors.IsNotFound(err) {
171 logger.V(3).Info("Secret has been deleted", "secret", key)
172 return nil
173 }
174
175 if err != nil {
176 return err
177 }
178
179 if ret.Type == bootstrapapi.SecretTypeBootstrapToken {
180 tc.evalSecret(ctx, ret)
181 }
182 return nil
183 }
184
185 func (tc *TokenCleaner) evalSecret(ctx context.Context, o interface{}) {
186 logger := klog.FromContext(ctx)
187 secret := o.(*v1.Secret)
188 ttl, alreadyExpired := bootstrapsecretutil.GetExpiration(secret, time.Now())
189 if alreadyExpired {
190 logger.V(3).Info("Deleting expired secret", "secret", klog.KObj(secret))
191 var options metav1.DeleteOptions
192 if len(secret.UID) > 0 {
193 options.Preconditions = &metav1.Preconditions{UID: &secret.UID}
194 }
195 err := tc.client.CoreV1().Secrets(secret.Namespace).Delete(ctx, secret.Name, options)
196
197
198 if err != nil && !apierrors.IsConflict(err) && !apierrors.IsNotFound(err) {
199 logger.V(3).Info("Error deleting secret", "err", err)
200 }
201 } else if ttl > 0 {
202 key, err := controller.KeyFunc(o)
203 if err != nil {
204 utilruntime.HandleError(err)
205 return
206 }
207 tc.queue.AddAfter(key, ttl)
208 }
209 }
210
View as plain text