...
1
16
17 package apiserver
18
19 import (
20 "fmt"
21 "time"
22
23 apierrors "k8s.io/apimachinery/pkg/api/errors"
24 "k8s.io/apimachinery/pkg/labels"
25 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/apiserver/pkg/server/dynamiccertificates"
28 "k8s.io/client-go/tools/cache"
29 "k8s.io/client-go/util/workqueue"
30 "k8s.io/klog/v2"
31
32 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
33 informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
34 listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
35 "k8s.io/kube-aggregator/pkg/controllers"
36 )
37
38
39 type APIHandlerManager interface {
40 AddAPIService(apiService *v1.APIService) error
41 RemoveAPIService(apiServiceName string)
42 }
43
44
45 type APIServiceRegistrationController struct {
46 apiHandlerManager APIHandlerManager
47
48 apiServiceLister listers.APIServiceLister
49 apiServiceSynced cache.InformerSynced
50
51
52 syncFn func(key string) error
53
54 queue workqueue.RateLimitingInterface
55 }
56
57 var _ dynamiccertificates.Listener = &APIServiceRegistrationController{}
58
59
60 func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
61 c := &APIServiceRegistrationController{
62 apiHandlerManager: apiHandlerManager,
63 apiServiceLister: apiServiceInformer.Lister(),
64 apiServiceSynced: apiServiceInformer.Informer().HasSynced,
65 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"),
66 }
67
68 apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
69 AddFunc: c.addAPIService,
70 UpdateFunc: c.updateAPIService,
71 DeleteFunc: c.deleteAPIService,
72 })
73
74 c.syncFn = c.sync
75
76 return c
77 }
78
79 func (c *APIServiceRegistrationController) sync(key string) error {
80 apiService, err := c.apiServiceLister.Get(key)
81 if apierrors.IsNotFound(err) {
82 c.apiHandlerManager.RemoveAPIService(key)
83 return nil
84 }
85 if err != nil {
86 return err
87 }
88
89 return c.apiHandlerManager.AddAPIService(apiService)
90 }
91
92
93 func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}, handlerSyncedCh chan<- struct{}) {
94 defer utilruntime.HandleCrash()
95 defer c.queue.ShutDown()
96
97 klog.Info("Starting APIServiceRegistrationController")
98 defer klog.Info("Shutting down APIServiceRegistrationController")
99
100 if !controllers.WaitForCacheSync("APIServiceRegistrationController", stopCh, c.apiServiceSynced) {
101 return
102 }
103
104
105 if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
106 services, err := c.apiServiceLister.List(labels.Everything())
107 if err != nil {
108 utilruntime.HandleError(fmt.Errorf("failed to initially list APIServices: %v", err))
109 return false, nil
110 }
111 for _, s := range services {
112 if err := c.apiHandlerManager.AddAPIService(s); err != nil {
113 utilruntime.HandleError(fmt.Errorf("failed to initially sync APIService %s: %v", s.Name, err))
114 return false, nil
115 }
116 }
117 return true, nil
118 }, stopCh); err == wait.ErrWaitTimeout {
119 utilruntime.HandleError(fmt.Errorf("timed out waiting for proxy handler to initialize"))
120 return
121 } else if err != nil {
122 panic(fmt.Errorf("unexpected error: %v", err))
123 }
124 close(handlerSyncedCh)
125
126
127
128 go wait.Until(c.runWorker, time.Second, stopCh)
129
130 <-stopCh
131 }
132
133 func (c *APIServiceRegistrationController) runWorker() {
134 for c.processNextWorkItem() {
135 }
136 }
137
138
139 func (c *APIServiceRegistrationController) processNextWorkItem() bool {
140 key, quit := c.queue.Get()
141 if quit {
142 return false
143 }
144 defer c.queue.Done(key)
145
146 err := c.syncFn(key.(string))
147 if err == nil {
148 c.queue.Forget(key)
149 return true
150 }
151
152 utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
153 c.queue.AddRateLimited(key)
154
155 return true
156 }
157
158 func (c *APIServiceRegistrationController) enqueueInternal(obj *v1.APIService) {
159 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
160 if err != nil {
161 klog.Errorf("Couldn't get key for object %#v: %v", obj, err)
162 return
163 }
164
165 c.queue.Add(key)
166 }
167
168 func (c *APIServiceRegistrationController) addAPIService(obj interface{}) {
169 castObj := obj.(*v1.APIService)
170 klog.V(4).Infof("Adding %s", castObj.Name)
171 c.enqueueInternal(castObj)
172 }
173
174 func (c *APIServiceRegistrationController) updateAPIService(obj, _ interface{}) {
175 castObj := obj.(*v1.APIService)
176 klog.V(4).Infof("Updating %s", castObj.Name)
177 c.enqueueInternal(castObj)
178 }
179
180 func (c *APIServiceRegistrationController) deleteAPIService(obj interface{}) {
181 castObj, ok := obj.(*v1.APIService)
182 if !ok {
183 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
184 if !ok {
185 klog.Errorf("Couldn't get object from tombstone %#v", obj)
186 return
187 }
188 castObj, ok = tombstone.Obj.(*v1.APIService)
189 if !ok {
190 klog.Errorf("Tombstone contained object that is not expected %#v", obj)
191 return
192 }
193 }
194 klog.V(4).Infof("Deleting %q", castObj.Name)
195 c.enqueueInternal(castObj)
196 }
197
198
199
200 func (c *APIServiceRegistrationController) Enqueue() {
201 apiServices, err := c.apiServiceLister.List(labels.Everything())
202 if err != nil {
203 utilruntime.HandleError(err)
204 return
205 }
206 for _, apiService := range apiServices {
207 c.addAPIService(apiService)
208 }
209 }
210
View as plain text