1
16
17
18
19
20 package app
21
22 import (
23 "fmt"
24 "net/http"
25 "strings"
26 "sync"
27
28 "k8s.io/klog/v2"
29
30 apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/runtime/schema"
34 "k8s.io/apimachinery/pkg/util/sets"
35 "k8s.io/apiserver/pkg/admission"
36 genericfeatures "k8s.io/apiserver/pkg/features"
37 genericapiserver "k8s.io/apiserver/pkg/server"
38 "k8s.io/apiserver/pkg/server/healthz"
39 utilfeature "k8s.io/apiserver/pkg/util/feature"
40 utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
41 kubeexternalinformers "k8s.io/client-go/informers"
42 "k8s.io/client-go/tools/cache"
43 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
44 v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
45 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
46 aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
47 aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
48 apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
49 informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
50 "k8s.io/kube-aggregator/pkg/controllers/autoregister"
51 controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
52 "k8s.io/kubernetes/pkg/controlplane/controller/crdregistration"
53 )
54
55 func createAggregatorConfig(
56 kubeAPIServerConfig genericapiserver.Config,
57 commandOptions controlplaneapiserver.CompletedOptions,
58 externalInformers kubeexternalinformers.SharedInformerFactory,
59 serviceResolver aggregatorapiserver.ServiceResolver,
60 proxyTransport *http.Transport,
61 peerProxy utilpeerproxy.Interface,
62 pluginInitializers []admission.PluginInitializer,
63 ) (*aggregatorapiserver.Config, error) {
64
65
66 genericConfig := kubeAPIServerConfig
67 genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
68 genericConfig.RESTOptionsGetter = nil
69
70
71 genericConfig.SkipOpenAPIInstallation = true
72
73 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
74 utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
75
76
77
78 genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
79 }
80
81 if peerProxy != nil {
82 originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
83 genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
84
85
86 apiHandler = peerProxy.WrapHandler(apiHandler)
87 return originalHandlerChainBuilder(apiHandler, c)
88 }
89 }
90
91
92
93
94 etcdOptions := *commandOptions.Etcd
95 etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)
96 etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
97 etcdOptions.SkipHealthEndpoints = true
98 if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
99 return nil, err
100 }
101
102
103 if err := commandOptions.APIEnablement.ApplyTo(
104 &genericConfig,
105 aggregatorapiserver.DefaultAPIResourceConfigSource(),
106 aggregatorscheme.Scheme); err != nil {
107 return nil, err
108 }
109
110 aggregatorConfig := &aggregatorapiserver.Config{
111 GenericConfig: &genericapiserver.RecommendedConfig{
112 Config: genericConfig,
113 SharedInformerFactory: externalInformers,
114 },
115 ExtraConfig: aggregatorapiserver.ExtraConfig{
116 ProxyClientCertFile: commandOptions.ProxyClientCertFile,
117 ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
118 PeerCAFile: commandOptions.PeerCAFile,
119 PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress,
120 ServiceResolver: serviceResolver,
121 ProxyTransport: proxyTransport,
122 RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
123 },
124 }
125
126
127 aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
128
129 return aggregatorConfig, nil
130 }
131
132 func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
133 aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
134 if err != nil {
135 return nil, err
136 }
137
138
139 apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
140 if err != nil {
141 return nil, err
142 }
143 autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
144 apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
145 crdRegistrationController := crdregistration.NewCRDRegistrationController(
146 apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
147 autoRegistrationController)
148
149
150 if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
151 for gv, entry := range apiVersionPriorities {
152 aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))
153 }
154 }
155
156 err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
157 go crdRegistrationController.Run(5, context.StopCh)
158 go func() {
159
160
161
162 if crdAPIEnabled {
163 klog.Infof("waiting for initial CRD sync...")
164 crdRegistrationController.WaitForInitialSync()
165 klog.Infof("initial CRD sync complete...")
166 } else {
167 klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
168 }
169 autoRegistrationController.Run(5, context.StopCh)
170 }()
171 return nil
172 })
173 if err != nil {
174 return nil, err
175 }
176
177 err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
178 makeAPIServiceAvailableHealthCheck(
179 "autoregister-completion",
180 apiServices,
181 aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
182 ),
183 )
184 if err != nil {
185 return nil, err
186 }
187
188 return aggregatorServer, nil
189 }
190
191 func makeAPIService(gv schema.GroupVersion) *v1.APIService {
192 apiServicePriority, ok := apiVersionPriorities[gv]
193 if !ok {
194
195
196 klog.Infof("Skipping APIService creation for %v", gv)
197 return nil
198 }
199 return &v1.APIService{
200 ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
201 Spec: v1.APIServiceSpec{
202 Group: gv.Group,
203 Version: gv.Version,
204 GroupPriorityMinimum: apiServicePriority.group,
205 VersionPriority: apiServicePriority.version,
206 },
207 }
208 }
209
210
211
212 func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker {
213
214 pendingServiceNamesLock := &sync.RWMutex{}
215 pendingServiceNames := sets.NewString()
216 for _, service := range apiServices {
217 pendingServiceNames.Insert(service.Name)
218 }
219
220
221 handleAPIServiceChange := func(service *v1.APIService) {
222 pendingServiceNamesLock.Lock()
223 defer pendingServiceNamesLock.Unlock()
224 if !pendingServiceNames.Has(service.Name) {
225 return
226 }
227 if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
228 pendingServiceNames.Delete(service.Name)
229 }
230 }
231
232
233 apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
234 AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
235 UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
236 })
237
238
239 return healthz.NamedCheck(name, func(r *http.Request) error {
240 pendingServiceNamesLock.RLock()
241 defer pendingServiceNamesLock.RUnlock()
242 if pendingServiceNames.Len() > 0 {
243 return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
244 }
245 return nil
246 })
247 }
248
249
250
251 type priority struct {
252
253 group int32
254
255 version int32
256 }
257
258
259
260
261
262 var apiVersionPriorities = map[schema.GroupVersion]priority{
263 {Group: "", Version: "v1"}: {group: 18000, version: 1},
264
265 {Group: "apps", Version: "v1"}: {group: 17800, version: 15},
266 {Group: "events.k8s.io", Version: "v1"}: {group: 17750, version: 15},
267 {Group: "events.k8s.io", Version: "v1beta1"}: {group: 17750, version: 5},
268 {Group: "authentication.k8s.io", Version: "v1"}: {group: 17700, version: 15},
269 {Group: "authentication.k8s.io", Version: "v1beta1"}: {group: 17700, version: 9},
270 {Group: "authentication.k8s.io", Version: "v1alpha1"}: {group: 17700, version: 1},
271 {Group: "authorization.k8s.io", Version: "v1"}: {group: 17600, version: 15},
272 {Group: "autoscaling", Version: "v1"}: {group: 17500, version: 15},
273 {Group: "autoscaling", Version: "v2"}: {group: 17500, version: 30},
274 {Group: "autoscaling", Version: "v2beta1"}: {group: 17500, version: 9},
275 {Group: "autoscaling", Version: "v2beta2"}: {group: 17500, version: 1},
276 {Group: "batch", Version: "v1"}: {group: 17400, version: 15},
277 {Group: "batch", Version: "v1beta1"}: {group: 17400, version: 9},
278 {Group: "batch", Version: "v2alpha1"}: {group: 17400, version: 9},
279 {Group: "certificates.k8s.io", Version: "v1"}: {group: 17300, version: 15},
280 {Group: "certificates.k8s.io", Version: "v1alpha1"}: {group: 17300, version: 1},
281 {Group: "networking.k8s.io", Version: "v1"}: {group: 17200, version: 15},
282 {Group: "networking.k8s.io", Version: "v1alpha1"}: {group: 17200, version: 1},
283 {Group: "policy", Version: "v1"}: {group: 17100, version: 15},
284 {Group: "policy", Version: "v1beta1"}: {group: 17100, version: 9},
285 {Group: "rbac.authorization.k8s.io", Version: "v1"}: {group: 17000, version: 15},
286 {Group: "storage.k8s.io", Version: "v1"}: {group: 16800, version: 15},
287 {Group: "storage.k8s.io", Version: "v1beta1"}: {group: 16800, version: 9},
288 {Group: "storage.k8s.io", Version: "v1alpha1"}: {group: 16800, version: 1},
289 {Group: "apiextensions.k8s.io", Version: "v1"}: {group: 16700, version: 15},
290 {Group: "admissionregistration.k8s.io", Version: "v1"}: {group: 16700, version: 15},
291 {Group: "admissionregistration.k8s.io", Version: "v1beta1"}: {group: 16700, version: 12},
292 {Group: "admissionregistration.k8s.io", Version: "v1alpha1"}: {group: 16700, version: 9},
293 {Group: "scheduling.k8s.io", Version: "v1"}: {group: 16600, version: 15},
294 {Group: "coordination.k8s.io", Version: "v1"}: {group: 16500, version: 15},
295 {Group: "node.k8s.io", Version: "v1"}: {group: 16300, version: 15},
296 {Group: "node.k8s.io", Version: "v1alpha1"}: {group: 16300, version: 1},
297 {Group: "node.k8s.io", Version: "v1beta1"}: {group: 16300, version: 9},
298 {Group: "discovery.k8s.io", Version: "v1"}: {group: 16200, version: 15},
299 {Group: "discovery.k8s.io", Version: "v1beta1"}: {group: 16200, version: 12},
300 {Group: "flowcontrol.apiserver.k8s.io", Version: "v1"}: {group: 16100, version: 21},
301 {Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta3"}: {group: 16100, version: 18},
302 {Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2"}: {group: 16100, version: 15},
303 {Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta1"}: {group: 16100, version: 12},
304 {Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1"}: {group: 16100, version: 9},
305 {Group: "internal.apiserver.k8s.io", Version: "v1alpha1"}: {group: 16000, version: 9},
306 {Group: "resource.k8s.io", Version: "v1alpha2"}: {group: 15900, version: 9},
307 {Group: "storagemigration.k8s.io", Version: "v1alpha1"}: {group: 15800, version: 9},
308
309
310
311 }
312
313 func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService {
314 apiServices := []*v1.APIService{}
315
316 for _, curr := range delegateAPIServer.ListedPaths() {
317 if curr == "/api/v1" {
318 apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
319 registration.AddAPIServiceToSyncOnStart(apiService)
320 apiServices = append(apiServices, apiService)
321 continue
322 }
323
324 if !strings.HasPrefix(curr, "/apis/") {
325 continue
326 }
327
328 tokens := strings.Split(curr, "/")
329 if len(tokens) != 4 {
330 continue
331 }
332
333 apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
334 if apiService == nil {
335 continue
336 }
337 registration.AddAPIServiceToSyncOnStart(apiService)
338 apiServices = append(apiServices, apiService)
339 }
340
341 return apiServices
342 }
343
View as plain text