1
16
17 package rest
18
19 import (
20 "crypto/tls"
21 goerrors "errors"
22 "fmt"
23 "net"
24 "net/http"
25 "sync"
26 "time"
27
28 corev1 "k8s.io/api/core/v1"
29 "k8s.io/apimachinery/pkg/runtime/schema"
30 utilnet "k8s.io/apimachinery/pkg/util/net"
31 "k8s.io/apiserver/pkg/registry/generic"
32 "k8s.io/apiserver/pkg/registry/rest"
33 genericapiserver "k8s.io/apiserver/pkg/server"
34 serverstorage "k8s.io/apiserver/pkg/server/storage"
35 utilfeature "k8s.io/apiserver/pkg/util/feature"
36 "k8s.io/client-go/kubernetes"
37 networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
38 policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
39 "k8s.io/kubernetes/pkg/api/legacyscheme"
40 api "k8s.io/kubernetes/pkg/apis/core"
41 "k8s.io/kubernetes/pkg/cluster/ports"
42 "k8s.io/kubernetes/pkg/features"
43 kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
44 "k8s.io/kubernetes/pkg/registry/core/componentstatus"
45 endpointsstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage"
46 limitrangestore "k8s.io/kubernetes/pkg/registry/core/limitrange/storage"
47 nodestore "k8s.io/kubernetes/pkg/registry/core/node/storage"
48 pvstore "k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage"
49 pvcstore "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage"
50 podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
51 podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage"
52 "k8s.io/kubernetes/pkg/registry/core/rangeallocation"
53 controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
54 "k8s.io/kubernetes/pkg/registry/core/service/allocator"
55 serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
56 "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
57 serviceipallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
58 "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
59 portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
60 servicestore "k8s.io/kubernetes/pkg/registry/core/service/storage"
61 serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
62 kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
63 "k8s.io/kubernetes/pkg/util/async"
64 netutils "k8s.io/utils/net"
65 )
66
67
68 type Config struct {
69 GenericConfig
70
71 Proxy ProxyConfig
72 Services ServicesConfig
73 }
74
75 type ProxyConfig struct {
76 Transport http.RoundTripper
77 KubeletClientConfig kubeletclient.KubeletClientConfig
78 }
79
80 type ServicesConfig struct {
81
82 ClusterIPRange net.IPNet
83 SecondaryClusterIPRange net.IPNet
84 NodePortRange utilnet.PortRange
85
86 IPRepairInterval time.Duration
87 }
88
89 type rangeRegistries struct {
90 clusterIP rangeallocation.RangeRegistry
91 secondaryClusterIP rangeallocation.RangeRegistry
92 nodePort rangeallocation.RangeRegistry
93 }
94
95 type legacyProvider struct {
96 Config
97
98 primaryServiceClusterIPAllocator ipallocator.Interface
99 serviceClusterIPAllocators map[api.IPFamily]ipallocator.Interface
100 serviceNodePortAllocator *portallocator.PortAllocator
101
102 startServiceNodePortsRepair, startServiceClusterIPRepair func(onFirstSuccess func(), stopCh chan struct{})
103 }
104
105 func New(c Config) (*legacyProvider, error) {
106 rangeRegistries, serviceClusterIPAllocator, serviceIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
107 if err != nil {
108 return nil, err
109 }
110
111 p := &legacyProvider{
112 Config: c,
113
114 primaryServiceClusterIPAllocator: serviceClusterIPAllocator,
115 serviceClusterIPAllocators: serviceIPAllocators,
116 serviceNodePortAllocator: serviceNodePortAllocator,
117 }
118
119
120 client, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
121 if err != nil {
122 return nil, err
123 }
124 p.startServiceNodePortsRepair = portallocatorcontroller.NewRepair(c.Services.IPRepairInterval, client.CoreV1(), client.EventsV1(), c.Services.NodePortRange, rangeRegistries.nodePort).RunUntil
125
126
127 if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
128 p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepair(
129 c.Services.IPRepairInterval,
130 client.CoreV1(),
131 client.EventsV1(),
132 &c.Services.ClusterIPRange,
133 rangeRegistries.clusterIP,
134 &c.Services.SecondaryClusterIPRange,
135 rangeRegistries.secondaryClusterIP,
136 ).RunUntil
137 } else {
138 p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress(
139 c.Services.IPRepairInterval,
140 client,
141 c.Informers.Core().V1().Services(),
142 c.Informers.Networking().V1alpha1().ServiceCIDRs(),
143 c.Informers.Networking().V1alpha1().IPAddresses(),
144 ).RunUntil
145 }
146
147 return p, nil
148 }
149
150 func (p *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
151 apiGroupInfo, err := p.GenericConfig.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
152 if err != nil {
153 return genericapiserver.APIGroupInfo{}, err
154 }
155
156 podDisruptionClient, err := policyclient.NewForConfig(p.LoopbackClientConfig)
157 if err != nil {
158 return genericapiserver.APIGroupInfo{}, err
159 }
160
161 podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
162 if err != nil {
163 return genericapiserver.APIGroupInfo{}, err
164 }
165
166 limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
167 if err != nil {
168 return genericapiserver.APIGroupInfo{}, err
169 }
170
171 persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
172 if err != nil {
173 return genericapiserver.APIGroupInfo{}, err
174 }
175 persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
176 if err != nil {
177 return genericapiserver.APIGroupInfo{}, err
178 }
179
180 endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
181 if err != nil {
182 return genericapiserver.APIGroupInfo{}, err
183 }
184
185 nodeStorage, err := nodestore.NewStorage(restOptionsGetter, p.Proxy.KubeletClientConfig, p.Proxy.Transport)
186 if err != nil {
187 return genericapiserver.APIGroupInfo{}, err
188 }
189
190 podStorage, err := podstore.NewStorage(
191 restOptionsGetter,
192 nodeStorage.KubeletConnectionInfo,
193 p.Proxy.Transport,
194 podDisruptionClient,
195 )
196 if err != nil {
197 return genericapiserver.APIGroupInfo{}, err
198 }
199
200 serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
201 restOptionsGetter,
202 p.primaryServiceClusterIPAllocator.IPFamily(),
203 p.serviceClusterIPAllocators,
204 p.serviceNodePortAllocator,
205 endpointsStorage,
206 podStorage.Pod,
207 p.Proxy.Transport)
208 if err != nil {
209 return genericapiserver.APIGroupInfo{}, err
210 }
211
212 storage := apiGroupInfo.VersionedResourcesStorageMap["v1"]
213 if storage == nil {
214 storage = map[string]rest.Storage{}
215 }
216
217
218 var serviceAccountStorage *serviceaccountstore.REST
219 if p.ServiceAccountIssuer != nil {
220 var nodeGetter rest.Getter
221 if utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountTokenNodeBinding) ||
222 utilfeature.DefaultFeatureGate.Enabled(features.ServiceAccountTokenPodNodeInfo) {
223 nodeGetter = nodeStorage.Node.Store
224 }
225 serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, p.ServiceAccountIssuer, p.APIAudiences, p.ServiceAccountMaxExpiration, podStorage.Pod.Store, storage["secrets"].(rest.Getter), nodeGetter, p.ExtendExpiration)
226 if err != nil {
227 return genericapiserver.APIGroupInfo{}, err
228 }
229 }
230
231 if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
232 storage[resource] = podStorage.Pod
233 storage[resource+"/attach"] = podStorage.Attach
234 storage[resource+"/status"] = podStorage.Status
235 storage[resource+"/log"] = podStorage.Log
236 storage[resource+"/exec"] = podStorage.Exec
237 storage[resource+"/portforward"] = podStorage.PortForward
238 storage[resource+"/proxy"] = podStorage.Proxy
239 storage[resource+"/binding"] = podStorage.Binding
240 if podStorage.Eviction != nil {
241 storage[resource+"/eviction"] = podStorage.Eviction
242 }
243 storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
244 }
245 if resource := "bindings"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
246 storage[resource] = podStorage.LegacyBinding
247 }
248
249 if resource := "podtemplates"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
250 storage[resource] = podTemplateStorage
251 }
252
253 if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
254 controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
255 if err != nil {
256 return genericapiserver.APIGroupInfo{}, err
257 }
258
259 storage[resource] = controllerStorage.Controller
260 storage[resource+"/status"] = controllerStorage.Status
261 if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
262 storage[resource+"/scale"] = controllerStorage.Scale
263 }
264 }
265
266
267 if resource := "serviceaccounts"; serviceAccountStorage != nil && apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
268
269 storage[resource].Destroy()
270 if storage[resource+"/token"] != nil {
271 storage[resource+"/token"].Destroy()
272 }
273
274 storage[resource] = serviceAccountStorage
275 if serviceAccountStorage.Token != nil {
276 storage[resource+"/token"] = serviceAccountStorage.Token
277 }
278 }
279
280 if resource := "services"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
281 storage[resource] = serviceRESTStorage
282 storage[resource+"/proxy"] = serviceRESTProxy
283 storage[resource+"/status"] = serviceStatusStorage
284 }
285
286 if resource := "endpoints"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
287 storage[resource] = endpointsStorage
288 }
289
290 if resource := "nodes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
291 storage[resource] = nodeStorage.Node
292 storage[resource+"/proxy"] = nodeStorage.Proxy
293 storage[resource+"/status"] = nodeStorage.Status
294 }
295
296 if resource := "limitranges"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
297 storage[resource] = limitRangeStorage
298 }
299
300 if resource := "persistentvolumes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
301 storage[resource] = persistentVolumeStorage
302 storage[resource+"/status"] = persistentVolumeStatusStorage
303 }
304
305 if resource := "persistentvolumeclaims"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
306 storage[resource] = persistentVolumeClaimStorage
307 storage[resource+"/status"] = persistentVolumeClaimStatusStorage
308 }
309
310 if resource := "componentstatuses"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
311 storage[resource] = componentstatus.NewStorage(componentStatusStorage{p.StorageFactory}.serversToValidate)
312 }
313
314 if len(storage) > 0 {
315 apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
316 }
317
318 return apiGroupInfo, nil
319 }
320
321 func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
322 clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{}
323
324 serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
325 if err != nil {
326 return rangeRegistries{}, nil, nil, nil, err
327 }
328
329 serviceClusterIPRange := c.Services.ClusterIPRange
330 if serviceClusterIPRange.IP == nil {
331 return rangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing")
332 }
333
334 if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
335 primaryClusterIPAllocator, err = ipallocator.New(&serviceClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
336 var mem allocator.Snapshottable
337 mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
338
339 etcd, err := serviceallocator.NewEtcd(mem, "/ranges/serviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
340 if err != nil {
341 return nil, err
342 }
343 registries.clusterIP = etcd
344 return etcd, nil
345 })
346 if err != nil {
347 return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
348 }
349 } else {
350 networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
351 if err != nil {
352 return rangeRegistries{}, nil, nil, nil, err
353 }
354
355
356
357
358 primaryClusterIPAllocator, err = ipallocator.NewMetaAllocator(
359 networkingv1alphaClient,
360 c.Informers.Networking().V1alpha1().ServiceCIDRs(),
361 c.Informers.Networking().V1alpha1().IPAddresses(),
362 netutils.IsIPv6CIDR(&serviceClusterIPRange),
363 )
364 if err != nil {
365 return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
366 }
367 }
368 primaryClusterIPAllocator.EnableMetrics()
369 clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator
370
371 var secondaryClusterIPAllocator ipallocator.Interface
372 if c.Services.SecondaryClusterIPRange.IP != nil {
373 if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
374 var err error
375 secondaryClusterIPAllocator, err = ipallocator.New(&c.Services.SecondaryClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
376 var mem allocator.Snapshottable
377 mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
378
379 etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", serviceStorageConfig.ForResource(api.Resource("serviceipallocations")))
380 if err != nil {
381 return nil, err
382 }
383 registries.secondaryClusterIP = etcd
384 return etcd, nil
385 })
386 if err != nil {
387 return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
388 }
389 } else {
390 networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
391 if err != nil {
392 return rangeRegistries{}, nil, nil, nil, err
393 }
394
395
396
397
398 secondaryClusterIPAllocator, err = ipallocator.NewMetaAllocator(
399 networkingv1alphaClient,
400 c.Informers.Networking().V1alpha1().ServiceCIDRs(),
401 c.Informers.Networking().V1alpha1().IPAddresses(),
402 netutils.IsIPv6CIDR(&c.Services.SecondaryClusterIPRange),
403 )
404 if err != nil {
405 return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
406 }
407 }
408 secondaryClusterIPAllocator.EnableMetrics()
409 clusterIPAllocators[secondaryClusterIPAllocator.IPFamily()] = secondaryClusterIPAllocator
410 }
411
412 nodePortAllocator, err = portallocator.New(c.Services.NodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
413 mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
414
415 etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
416 if err != nil {
417 return nil, err
418 }
419 registries.nodePort = etcd
420 return etcd, nil
421 })
422 if err != nil {
423 return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err)
424 }
425 nodePortAllocator.EnableMetrics()
426
427 return
428 }
429
430 var _ genericapiserver.PostStartHookProvider = &legacyProvider{}
431
432 func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
433 return "start-service-ip-repair-controllers", func(context genericapiserver.PostStartHookContext) error {
434
435
436
437
438
439
440
441
442
443 wg := sync.WaitGroup{}
444 wg.Add(2)
445 runner := async.NewRunner(
446 func(stopCh chan struct{}) { p.startServiceClusterIPRepair(wg.Done, stopCh) },
447 func(stopCh chan struct{}) { p.startServiceNodePortsRepair(wg.Done, stopCh) },
448 )
449 runner.Start()
450 go func() {
451 defer runner.Stop()
452 <-context.StopCh
453 }()
454
455
456
457
458 done := make(chan struct{})
459 go func() {
460 defer close(done)
461 wg.Wait()
462 }()
463 select {
464 case <-done:
465 case <-time.After(time.Minute):
466 return goerrors.New("unable to perform initial IP and Port allocation check")
467 }
468
469 return nil
470 }, nil
471 }
472
473 func (p *legacyProvider) GroupName() string {
474 return api.GroupName
475 }
476
477 type componentStatusStorage struct {
478 storageFactory serverstorage.StorageFactory
479 }
480
481 func (s componentStatusStorage) serversToValidate() map[string]componentstatus.Server {
482
483
484 serversToValidate := map[string]componentstatus.Server{
485 "controller-manager": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: ports.KubeControllerManagerPort, Path: "/healthz"},
486 "scheduler": &componentstatus.HttpServer{EnableHTTPS: true, TLSConfig: &tls.Config{InsecureSkipVerify: true}, Addr: "127.0.0.1", Port: kubeschedulerconfig.DefaultKubeSchedulerPort, Path: "/healthz"},
487 }
488
489 for ix, cfg := range s.storageFactory.Configs() {
490 serversToValidate[fmt.Sprintf("etcd-%d", ix)] = &componentstatus.EtcdServer{Config: cfg}
491 }
492 return serversToValidate
493 }
494
View as plain text