...
1
16
17 package openapiv3
18
19 import (
20 "fmt"
21 "net/http"
22 "time"
23
24 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25 "k8s.io/apimachinery/pkg/util/wait"
26 "k8s.io/client-go/util/workqueue"
27 "k8s.io/klog/v2"
28 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
29 "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
30 )
31
32 const (
33 successfulUpdateDelay = time.Minute
34 successfulUpdateDelayLocal = time.Second
35 failedUpdateMaxExpDelay = time.Hour
36 )
37
38 type syncAction int
39
40 const (
41 syncRequeue syncAction = iota
42 syncRequeueRateLimited
43 syncNothing
44 )
45
46
47 type AggregationController struct {
48 openAPIAggregationManager aggregator.SpecProxier
49 queue workqueue.RateLimitingInterface
50
51
52 syncHandler func(key string) (syncAction, error)
53 }
54
55
56 func NewAggregationController(openAPIAggregationManager aggregator.SpecProxier) *AggregationController {
57 c := &AggregationController{
58 openAPIAggregationManager: openAPIAggregationManager,
59 queue: workqueue.NewNamedRateLimitingQueue(
60 workqueue.NewItemExponentialFailureRateLimiter(successfulUpdateDelay, failedUpdateMaxExpDelay),
61 "open_api_v3_aggregation_controller",
62 ),
63 }
64
65 c.syncHandler = c.sync
66
67
68 for _, name := range openAPIAggregationManager.GetAPIServiceNames() {
69 c.queue.AddAfter(name, time.Second)
70 }
71
72 return c
73 }
74
75
76 func (c *AggregationController) Run(stopCh <-chan struct{}) {
77 defer utilruntime.HandleCrash()
78 defer c.queue.ShutDown()
79
80 klog.Info("Starting OpenAPI V3 AggregationController")
81 defer klog.Info("Shutting down OpenAPI V3 AggregationController")
82
83 go wait.Until(c.runWorker, time.Second, stopCh)
84
85 <-stopCh
86 }
87
88 func (c *AggregationController) runWorker() {
89 for c.processNextWorkItem() {
90 }
91 }
92
93
94 func (c *AggregationController) processNextWorkItem() bool {
95 key, quit := c.queue.Get()
96 defer c.queue.Done(key)
97 if quit {
98 return false
99 }
100
101 if aggregator.IsLocalAPIService(key.(string)) {
102
103
104 klog.V(6).Infof("OpenAPI AggregationController: Processing item %s", key)
105 } else {
106 klog.V(4).Infof("OpenAPI AggregationController: Processing item %s", key)
107 }
108
109 action, err := c.syncHandler(key.(string))
110 if err == nil {
111 c.queue.Forget(key)
112 } else {
113 utilruntime.HandleError(fmt.Errorf("loading OpenAPI spec for %q failed with: %v", key, err))
114 }
115
116 switch action {
117 case syncRequeue:
118 if aggregator.IsLocalAPIService(key.(string)) {
119 klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal)
120 c.queue.AddAfter(key, successfulUpdateDelayLocal)
121 } else {
122 klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
123 c.queue.AddAfter(key, successfulUpdateDelay)
124 }
125 case syncRequeueRateLimited:
126 klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
127 c.queue.AddRateLimited(key)
128 case syncNothing:
129 klog.Infof("OpenAPI AggregationController: action for item %s: Nothing (removed from the queue).", key)
130 }
131
132 return true
133 }
134
135 func (c *AggregationController) sync(key string) (syncAction, error) {
136 if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(key); err != nil {
137 if err == aggregator.ErrAPIServiceNotFound {
138 return syncNothing, nil
139 }
140 return syncRequeueRateLimited, err
141 }
142 return syncRequeue, nil
143 }
144
145
146 func (c *AggregationController) AddAPIService(handler http.Handler, apiService *v1.APIService) {
147 if apiService.Spec.Service == nil {
148 return
149 }
150 c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService)
151 c.queue.AddAfter(apiService.Name, time.Second)
152 }
153
154
155 func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
156 if apiService.Spec.Service == nil {
157 return
158 }
159 c.openAPIAggregationManager.AddUpdateAPIService(handler, apiService)
160 key := apiService.Name
161 if c.queue.NumRequeues(key) > 0 {
162
163
164 c.queue.Forget(key)
165 c.queue.AddAfter(key, time.Second)
166 }
167 }
168
169
170 func (c *AggregationController) RemoveAPIService(apiServiceName string) {
171 c.openAPIAggregationManager.RemoveAPIServiceSpec(apiServiceName)
172
173
174 c.queue.Forget(apiServiceName)
175 }
176
View as plain text