1
16
17 package serviceaccount
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 utilerrors "k8s.io/apimachinery/pkg/util/errors"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29 "k8s.io/apimachinery/pkg/util/wait"
30 coreinformers "k8s.io/client-go/informers/core/v1"
31 clientset "k8s.io/client-go/kubernetes"
32 corelisters "k8s.io/client-go/listers/core/v1"
33 "k8s.io/client-go/tools/cache"
34 "k8s.io/client-go/util/workqueue"
35 "k8s.io/klog/v2"
36 )
37
38
39 type ServiceAccountsControllerOptions struct {
40
41 ServiceAccounts []v1.ServiceAccount
42
43
44
45
46 ServiceAccountResync time.Duration
47
48
49
50
51 NamespaceResync time.Duration
52 }
53
54
55 func DefaultServiceAccountsControllerOptions() ServiceAccountsControllerOptions {
56 return ServiceAccountsControllerOptions{
57 ServiceAccounts: []v1.ServiceAccount{
58 {ObjectMeta: metav1.ObjectMeta{Name: "default"}},
59 },
60 }
61 }
62
63
64 func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, options ServiceAccountsControllerOptions) (*ServiceAccountsController, error) {
65 e := &ServiceAccountsController{
66 client: cl,
67 serviceAccountsToEnsure: options.ServiceAccounts,
68 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"),
69 }
70
71 saHandler, _ := saInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
72 DeleteFunc: e.serviceAccountDeleted,
73 }, options.ServiceAccountResync)
74 e.saLister = saInformer.Lister()
75 e.saListerSynced = saHandler.HasSynced
76
77 nsHandler, _ := nsInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
78 AddFunc: e.namespaceAdded,
79 UpdateFunc: e.namespaceUpdated,
80 }, options.NamespaceResync)
81 e.nsLister = nsInformer.Lister()
82 e.nsListerSynced = nsHandler.HasSynced
83
84 e.syncHandler = e.syncNamespace
85
86 return e, nil
87 }
88
89
90 type ServiceAccountsController struct {
91 client clientset.Interface
92 serviceAccountsToEnsure []v1.ServiceAccount
93
94
95 syncHandler func(ctx context.Context, key string) error
96
97 saLister corelisters.ServiceAccountLister
98 saListerSynced cache.InformerSynced
99
100 nsLister corelisters.NamespaceLister
101 nsListerSynced cache.InformerSynced
102
103 queue workqueue.RateLimitingInterface
104 }
105
106
107 func (c *ServiceAccountsController) Run(ctx context.Context, workers int) {
108 defer utilruntime.HandleCrash()
109 defer c.queue.ShutDown()
110
111 klog.FromContext(ctx).Info("Starting service account controller")
112 defer klog.FromContext(ctx).Info("Shutting down service account controller")
113
114 if !cache.WaitForNamedCacheSync("service account", ctx.Done(), c.saListerSynced, c.nsListerSynced) {
115 return
116 }
117
118 for i := 0; i < workers; i++ {
119 go wait.UntilWithContext(ctx, c.runWorker, time.Second)
120 }
121
122 <-ctx.Done()
123 }
124
125
126 func (c *ServiceAccountsController) serviceAccountDeleted(obj interface{}) {
127 sa, ok := obj.(*v1.ServiceAccount)
128 if !ok {
129 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
130 if !ok {
131 utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
132 return
133 }
134 sa, ok = tombstone.Obj.(*v1.ServiceAccount)
135 if !ok {
136 utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ServiceAccount %#v", obj))
137 return
138 }
139 }
140 c.queue.Add(sa.Namespace)
141 }
142
143
144 func (c *ServiceAccountsController) namespaceAdded(obj interface{}) {
145 namespace := obj.(*v1.Namespace)
146 c.queue.Add(namespace.Name)
147 }
148
149
150 func (c *ServiceAccountsController) namespaceUpdated(oldObj interface{}, newObj interface{}) {
151 newNamespace := newObj.(*v1.Namespace)
152 c.queue.Add(newNamespace.Name)
153 }
154
155 func (c *ServiceAccountsController) runWorker(ctx context.Context) {
156 for c.processNextWorkItem(ctx) {
157 }
158 }
159
160
161 func (c *ServiceAccountsController) processNextWorkItem(ctx context.Context) bool {
162 key, quit := c.queue.Get()
163 if quit {
164 return false
165 }
166 defer c.queue.Done(key)
167
168 err := c.syncHandler(ctx, key.(string))
169 if err == nil {
170 c.queue.Forget(key)
171 return true
172 }
173
174 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
175 c.queue.AddRateLimited(key)
176
177 return true
178 }
179 func (c *ServiceAccountsController) syncNamespace(ctx context.Context, key string) error {
180 startTime := time.Now()
181 defer func() {
182 klog.FromContext(ctx).V(4).Info("Finished syncing namespace", "namespace", key, "duration", time.Since(startTime))
183 }()
184
185 ns, err := c.nsLister.Get(key)
186 if apierrors.IsNotFound(err) {
187 return nil
188 }
189 if err != nil {
190 return err
191 }
192 if ns.Status.Phase != v1.NamespaceActive {
193
194 return nil
195 }
196
197 createFailures := []error{}
198 for _, sa := range c.serviceAccountsToEnsure {
199 switch _, err := c.saLister.ServiceAccounts(ns.Name).Get(sa.Name); {
200 case err == nil:
201 continue
202 case apierrors.IsNotFound(err):
203 case err != nil:
204 return err
205 }
206
207
208 sa.Namespace = ns.Name
209
210 if _, err := c.client.CoreV1().ServiceAccounts(ns.Name).Create(ctx, &sa, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
211
212 if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
213 createFailures = append(createFailures, err)
214 }
215 }
216 }
217
218 return utilerrors.Flatten(utilerrors.NewAggregate(createFailures))
219 }
220
View as plain text