1
16
17 package apiserver
18
19 import (
20 "fmt"
21 "net/http"
22 "time"
23
24 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
25 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/install"
26 v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
27 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
28 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
29 externalinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
30 "k8s.io/apiextensions-apiserver/pkg/controller/apiapproval"
31 "k8s.io/apiextensions-apiserver/pkg/controller/establish"
32 "k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
33 "k8s.io/apiextensions-apiserver/pkg/controller/nonstructuralschema"
34 openapicontroller "k8s.io/apiextensions-apiserver/pkg/controller/openapi"
35 openapiv3controller "k8s.io/apiextensions-apiserver/pkg/controller/openapiv3"
36 "k8s.io/apiextensions-apiserver/pkg/controller/status"
37 "k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/runtime/serializer"
42 "k8s.io/apimachinery/pkg/util/wait"
43 "k8s.io/apimachinery/pkg/version"
44 "k8s.io/apiserver/pkg/endpoints/discovery"
45 "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
46 genericregistry "k8s.io/apiserver/pkg/registry/generic"
47 "k8s.io/apiserver/pkg/registry/rest"
48 genericapiserver "k8s.io/apiserver/pkg/server"
49 serverstorage "k8s.io/apiserver/pkg/server/storage"
50 "k8s.io/apiserver/pkg/util/webhook"
51 )
52
53 var (
54 Scheme = runtime.NewScheme()
55 Codecs = serializer.NewCodecFactory(Scheme)
56
57
58 unversionedVersion = schema.GroupVersion{Group: "", Version: "v1"}
59 unversionedTypes = []runtime.Object{
60 &metav1.Status{},
61 &metav1.WatchEvent{},
62 &metav1.APIVersions{},
63 &metav1.APIGroupList{},
64 &metav1.APIGroup{},
65 &metav1.APIResourceList{},
66 }
67 )
68
69 func init() {
70 install.Install(Scheme)
71
72
73 metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Group: "", Version: "v1"})
74
75 Scheme.AddUnversionedTypes(unversionedVersion, unversionedTypes...)
76 }
77
78 type ExtraConfig struct {
79 CRDRESTOptionsGetter genericregistry.RESTOptionsGetter
80
81
82
83 MasterCount int
84
85
86 ServiceResolver webhook.ServiceResolver
87
88 AuthResolverWrapper webhook.AuthenticationInfoResolverWrapper
89 }
90
91 type Config struct {
92 GenericConfig *genericapiserver.RecommendedConfig
93 ExtraConfig ExtraConfig
94 }
95
96 type completedConfig struct {
97 GenericConfig genericapiserver.CompletedConfig
98 ExtraConfig *ExtraConfig
99 }
100
101 type CompletedConfig struct {
102
103 *completedConfig
104 }
105
106 type CustomResourceDefinitions struct {
107 GenericAPIServer *genericapiserver.GenericAPIServer
108
109
110 Informers externalinformers.SharedInformerFactory
111 }
112
113
114 func (cfg *Config) Complete() CompletedConfig {
115 c := completedConfig{
116 cfg.GenericConfig.Complete(),
117 &cfg.ExtraConfig,
118 }
119
120 c.GenericConfig.EnableDiscovery = false
121 if c.GenericConfig.Version == nil {
122 c.GenericConfig.Version = &version.Info{
123 Major: "0",
124 Minor: "1",
125 }
126 }
127
128 return CompletedConfig{&c}
129 }
130
131
132 func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
133 genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
134 if err != nil {
135 return nil, err
136 }
137
138
139
140 hasCRDInformerSyncedSignal := make(chan struct{})
141 if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {
142 return nil, err
143 }
144
145 s := &CustomResourceDefinitions{
146 GenericAPIServer: genericServer,
147 }
148
149 apiResourceConfig := c.GenericConfig.MergedResourceConfig
150 apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
151 storage := map[string]rest.Storage{}
152
153 if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
154 customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
155 if err != nil {
156 return nil, err
157 }
158 storage[resource] = customResourceDefinitionStorage
159 storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
160 }
161 if len(storage) > 0 {
162 apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
163 }
164
165 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
166 return nil, err
167 }
168
169 crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
170 if err != nil {
171
172
173 return nil, fmt.Errorf("failed to create clientset: %v", err)
174 }
175 s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
176
177 delegateHandler := delegationTarget.UnprotectedHandler()
178 if delegateHandler == nil {
179 delegateHandler = http.NotFoundHandler()
180 }
181
182 versionDiscoveryHandler := &versionDiscoveryHandler{
183 discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
184 delegate: delegateHandler,
185 }
186 groupDiscoveryHandler := &groupDiscoveryHandler{
187 discovery: map[string]*discovery.APIGroupHandler{},
188 delegate: delegateHandler,
189 }
190 establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
191 crdHandler, err := NewCustomResourceDefinitionHandler(
192 versionDiscoveryHandler,
193 groupDiscoveryHandler,
194 s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
195 delegateHandler,
196 c.ExtraConfig.CRDRESTOptionsGetter,
197 c.GenericConfig.AdmissionControl,
198 establishingController,
199 c.ExtraConfig.ServiceResolver,
200 c.ExtraConfig.AuthResolverWrapper,
201 c.ExtraConfig.MasterCount,
202 s.GenericAPIServer.Authorizer,
203 c.GenericConfig.RequestTimeout,
204 time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
205 apiGroupInfo.StaticOpenAPISpec,
206 c.GenericConfig.MaxRequestBodyBytes,
207 )
208 if err != nil {
209 return nil, err
210 }
211 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
212 s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
213 s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)
214
215 aggregatedDiscoveryManager := genericServer.AggregatedDiscoveryGroupManager
216 if aggregatedDiscoveryManager != nil {
217 aggregatedDiscoveryManager = aggregatedDiscoveryManager.WithSource(aggregated.CRDSource)
218 }
219 discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, aggregatedDiscoveryManager)
220 namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
221 nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
222 apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
223 finalizingController := finalizer.NewCRDFinalizer(
224 s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
225 crdClient.ApiextensionsV1(),
226 crdHandler,
227 )
228
229 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
230 s.Informers.Start(context.StopCh)
231 return nil
232 })
233 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
234
235
236
237
238 if s.GenericAPIServer.StaticOpenAPISpec != nil {
239 if s.GenericAPIServer.OpenAPIVersionedService != nil {
240 openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
241 go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
242 }
243
244 if s.GenericAPIServer.OpenAPIV3VersionedService != nil {
245 openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
246 go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
247 }
248 }
249
250 go namingController.Run(context.StopCh)
251 go establishingController.Run(context.StopCh)
252 go nonStructuralSchemaController.Run(5, context.StopCh)
253 go apiApprovalController.Run(5, context.StopCh)
254 go finalizingController.Run(5, context.StopCh)
255
256 discoverySyncedCh := make(chan struct{})
257 go discoveryController.Run(context.StopCh, discoverySyncedCh)
258 select {
259 case <-context.StopCh:
260 case <-discoverySyncedCh:
261 }
262
263 return nil
264 })
265
266
267
268 s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
269 return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
270 if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
271 close(hasCRDInformerSyncedSignal)
272 return true, nil
273 }
274 return false, nil
275 }, context.StopCh)
276 })
277
278 return s, nil
279 }
280
281 func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
282 ret := serverstorage.NewResourceConfig()
283
284 ret.EnableVersions(
285 v1beta1.SchemeGroupVersion,
286 v1.SchemeGroupVersion,
287 )
288
289 return ret
290 }
291
View as plain text