1
16
17
18
19
20 package app
21
22 import (
23 "context"
24 "errors"
25 "fmt"
26 "net"
27 "strings"
28 "time"
29
30 "k8s.io/klog/v2"
31
32 v1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/runtime/schema"
34 genericfeatures "k8s.io/apiserver/pkg/features"
35 "k8s.io/apiserver/pkg/quota/v1/generic"
36 utilfeature "k8s.io/apiserver/pkg/util/feature"
37 clientset "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/metadata"
39 restclient "k8s.io/client-go/rest"
40 cloudnodelifecyclecontroller "k8s.io/cloud-provider/controllers/nodelifecycle"
41 routecontroller "k8s.io/cloud-provider/controllers/route"
42 servicecontroller "k8s.io/cloud-provider/controllers/service"
43 cpnames "k8s.io/cloud-provider/names"
44 "k8s.io/component-base/featuregate"
45 "k8s.io/controller-manager/controller"
46 csitrans "k8s.io/csi-translation-lib"
47 "k8s.io/kubernetes/cmd/kube-controller-manager/names"
48 pkgcontroller "k8s.io/kubernetes/pkg/controller"
49 endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
50 "k8s.io/kubernetes/pkg/controller/garbagecollector"
51 namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
52 nodeipamcontroller "k8s.io/kubernetes/pkg/controller/nodeipam"
53 nodeipamconfig "k8s.io/kubernetes/pkg/controller/nodeipam/config"
54 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
55 lifecyclecontroller "k8s.io/kubernetes/pkg/controller/nodelifecycle"
56 "k8s.io/kubernetes/pkg/controller/podgc"
57 replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
58 "k8s.io/kubernetes/pkg/controller/resourceclaim"
59 resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
60 serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
61 "k8s.io/kubernetes/pkg/controller/storageversiongc"
62 "k8s.io/kubernetes/pkg/controller/tainteviction"
63 ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
64 "k8s.io/kubernetes/pkg/controller/ttlafterfinished"
65 "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
66 "k8s.io/kubernetes/pkg/controller/volume/ephemeral"
67 "k8s.io/kubernetes/pkg/controller/volume/expand"
68 persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
69 "k8s.io/kubernetes/pkg/controller/volume/pvcprotection"
70 "k8s.io/kubernetes/pkg/controller/volume/pvprotection"
71 "k8s.io/kubernetes/pkg/features"
72 quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
73 "k8s.io/kubernetes/pkg/volume/csimigration"
74 "k8s.io/utils/clock"
75 netutils "k8s.io/utils/net"
76 )
77
78 const (
79
80 defaultNodeMaskCIDRIPv4 = 24
81
82 defaultNodeMaskCIDRIPv6 = 64
83 )
84
85 func newServiceLBControllerDescriptor() *ControllerDescriptor {
86 return &ControllerDescriptor{
87 name: cpnames.ServiceLBController,
88 aliases: []string{"service"},
89 initFunc: startServiceLBController,
90 isCloudProviderController: true,
91 }
92 }
93
94 func startServiceLBController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
95 serviceController, err := servicecontroller.New(
96 controllerContext.Cloud,
97 controllerContext.ClientBuilder.ClientOrDie("service-controller"),
98 controllerContext.InformerFactory.Core().V1().Services(),
99 controllerContext.InformerFactory.Core().V1().Nodes(),
100 controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
101 utilfeature.DefaultFeatureGate,
102 )
103 if err != nil {
104
105 klog.FromContext(ctx).Error(err, "Failed to start service controller")
106 return nil, false, nil
107 }
108 go serviceController.Run(ctx, int(controllerContext.ComponentConfig.ServiceController.ConcurrentServiceSyncs), controllerContext.ControllerManagerMetrics)
109 return nil, true, nil
110 }
111 func newNodeIpamControllerDescriptor() *ControllerDescriptor {
112 return &ControllerDescriptor{
113 name: names.NodeIpamController,
114 aliases: []string{"nodeipam"},
115 initFunc: startNodeIpamController,
116 }
117 }
118
119 func startNodeIpamController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
120 var serviceCIDR *net.IPNet
121 var secondaryServiceCIDR *net.IPNet
122 logger := klog.FromContext(ctx)
123
124
125 if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
126 return nil, false, nil
127 }
128
129 if controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType == string(ipam.CloudAllocatorType) {
130
131 if controllerContext.Cloud == nil {
132 return nil, false, errors.New("--cidr-allocator-type is set to 'CloudAllocator' but cloud provider is not configured")
133 }
134
135 klog.Warningf("DEPRECATED: 'CloudAllocator' bas been deprecated and will be removed in a future release.")
136 }
137
138 clusterCIDRs, err := validateCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR)
139 if err != nil {
140 return nil, false, err
141 }
142
143
144 if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)) != 0 {
145 _, serviceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR)
146 if err != nil {
147 logger.Info("Warning: unsuccessful parsing of service CIDR", "CIDR", controllerContext.ComponentConfig.NodeIPAMController.ServiceCIDR, "err", err)
148 }
149 }
150
151 if len(strings.TrimSpace(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)) != 0 {
152 _, secondaryServiceCIDR, err = netutils.ParseCIDRSloppy(controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR)
153 if err != nil {
154 logger.Info("Warning: unsuccessful parsing of service CIDR", "CIDR", controllerContext.ComponentConfig.NodeIPAMController.SecondaryServiceCIDR, "err", err)
155 }
156 }
157
158
159 if serviceCIDR != nil && secondaryServiceCIDR != nil {
160
161 dualstackServiceCIDR, err := netutils.IsDualStackCIDRs([]*net.IPNet{serviceCIDR, secondaryServiceCIDR})
162 if err != nil {
163 return nil, false, fmt.Errorf("failed to perform dualstack check on serviceCIDR and secondaryServiceCIDR error:%v", err)
164 }
165 if !dualstackServiceCIDR {
166 return nil, false, fmt.Errorf("serviceCIDR and secondaryServiceCIDR are not dualstack (from different IPfamiles)")
167 }
168 }
169
170
171
172 nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(controllerContext.ComponentConfig.NodeIPAMController, clusterCIDRs)
173 if err != nil {
174 return nil, false, err
175 }
176
177 nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
178 ctx,
179 controllerContext.InformerFactory.Core().V1().Nodes(),
180 controllerContext.Cloud,
181 controllerContext.ClientBuilder.ClientOrDie("node-controller"),
182 clusterCIDRs,
183 serviceCIDR,
184 secondaryServiceCIDR,
185 nodeCIDRMaskSizes,
186 ipam.CIDRAllocatorType(controllerContext.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
187 )
188 if err != nil {
189 return nil, true, err
190 }
191 go nodeIpamController.RunWithMetrics(ctx, controllerContext.ControllerManagerMetrics)
192 return nil, true, nil
193 }
194
195 func newNodeLifecycleControllerDescriptor() *ControllerDescriptor {
196 return &ControllerDescriptor{
197 name: names.NodeLifecycleController,
198 aliases: []string{"nodelifecycle"},
199 initFunc: startNodeLifecycleController,
200 }
201 }
202
203 func startNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
204 lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
205 ctx,
206 controllerContext.InformerFactory.Coordination().V1().Leases(),
207 controllerContext.InformerFactory.Core().V1().Pods(),
208 controllerContext.InformerFactory.Core().V1().Nodes(),
209 controllerContext.InformerFactory.Apps().V1().DaemonSets(),
210
211 controllerContext.ClientBuilder.ClientOrDie("node-controller"),
212 controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
213 controllerContext.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
214 controllerContext.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
215 controllerContext.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
216 controllerContext.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
217 controllerContext.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
218 controllerContext.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
219 )
220 if err != nil {
221 return nil, true, err
222 }
223 go lifecycleController.Run(ctx)
224 return nil, true, nil
225 }
226
227 func newTaintEvictionControllerDescriptor() *ControllerDescriptor {
228 return &ControllerDescriptor{
229 name: names.TaintEvictionController,
230 initFunc: startTaintEvictionController,
231 requiredFeatureGates: []featuregate.Feature{
232 features.SeparateTaintEvictionController,
233 },
234 }
235 }
236
237 func startTaintEvictionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
238 taintEvictionController, err := tainteviction.New(
239 ctx,
240
241 controllerContext.ClientBuilder.ClientOrDie("node-controller"),
242 controllerContext.InformerFactory.Core().V1().Pods(),
243 controllerContext.InformerFactory.Core().V1().Nodes(),
244 controllerName,
245 )
246 if err != nil {
247 return nil, false, err
248 }
249 go taintEvictionController.Run(ctx)
250 return nil, true, nil
251 }
252
253 func newCloudNodeLifecycleControllerDescriptor() *ControllerDescriptor {
254 return &ControllerDescriptor{
255 name: cpnames.CloudNodeLifecycleController,
256 aliases: []string{"cloud-node-lifecycle"},
257 initFunc: startCloudNodeLifecycleController,
258 isCloudProviderController: true,
259 }
260 }
261
262 func startCloudNodeLifecycleController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
263 logger := klog.FromContext(ctx)
264 cloudNodeLifecycleController, err := cloudnodelifecyclecontroller.NewCloudNodeLifecycleController(
265 controllerContext.InformerFactory.Core().V1().Nodes(),
266
267 controllerContext.ClientBuilder.ClientOrDie("node-controller"),
268 controllerContext.Cloud,
269 controllerContext.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
270 )
271 if err != nil {
272
273
274 logger.Error(err, "Failed to start cloud node lifecycle controller")
275 return nil, false, nil
276 }
277
278 go cloudNodeLifecycleController.Run(ctx, controllerContext.ControllerManagerMetrics)
279 return nil, true, nil
280 }
281
282 func newNodeRouteControllerDescriptor() *ControllerDescriptor {
283 return &ControllerDescriptor{
284 name: cpnames.NodeRouteController,
285 aliases: []string{"route"},
286 initFunc: startNodeRouteController,
287 isCloudProviderController: true,
288 }
289 }
290
291 func startNodeRouteController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
292 logger := klog.FromContext(ctx)
293 if !controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs || !controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
294 logger.Info("Will not configure cloud provider routes for allocate-node-cidrs", "CIDRs", controllerContext.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs, "routes", controllerContext.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
295 return nil, false, nil
296 }
297 if controllerContext.Cloud == nil {
298 logger.Info("Warning: configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
299 return nil, false, nil
300 }
301 routes, ok := controllerContext.Cloud.Routes()
302 if !ok {
303 logger.Info("Warning: configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
304 return nil, false, nil
305 }
306
307 clusterCIDRs, err := validateCIDRs(controllerContext.ComponentConfig.KubeCloudShared.ClusterCIDR)
308 if err != nil {
309 return nil, false, err
310 }
311
312 routeController := routecontroller.New(routes,
313 controllerContext.ClientBuilder.ClientOrDie("route-controller"),
314 controllerContext.InformerFactory.Core().V1().Nodes(),
315 controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
316 clusterCIDRs)
317 go routeController.Run(ctx, controllerContext.ComponentConfig.KubeCloudShared.RouteReconciliationPeriod.Duration, controllerContext.ControllerManagerMetrics)
318 return nil, true, nil
319 }
320
321 func newPersistentVolumeBinderControllerDescriptor() *ControllerDescriptor {
322 return &ControllerDescriptor{
323 name: names.PersistentVolumeBinderController,
324 aliases: []string{"persistentvolume-binder"},
325 initFunc: startPersistentVolumeBinderController,
326 }
327 }
328
329 func startPersistentVolumeBinderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
330 logger := klog.FromContext(ctx)
331 plugins, err := ProbeControllerVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
332 if err != nil {
333 return nil, true, fmt.Errorf("failed to probe volume plugins when starting persistentvolume controller: %v", err)
334 }
335
336 params := persistentvolumecontroller.ControllerParameters{
337 KubeClient: controllerContext.ClientBuilder.ClientOrDie("persistent-volume-binder"),
338 SyncPeriod: controllerContext.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
339 VolumePlugins: plugins,
340 Cloud: controllerContext.Cloud,
341 ClusterName: controllerContext.ComponentConfig.KubeCloudShared.ClusterName,
342 VolumeInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
343 ClaimInformer: controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
344 ClassInformer: controllerContext.InformerFactory.Storage().V1().StorageClasses(),
345 PodInformer: controllerContext.InformerFactory.Core().V1().Pods(),
346 NodeInformer: controllerContext.InformerFactory.Core().V1().Nodes(),
347 EnableDynamicProvisioning: controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
348 }
349 volumeController, volumeControllerErr := persistentvolumecontroller.NewController(ctx, params)
350 if volumeControllerErr != nil {
351 return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
352 }
353 go volumeController.Run(ctx)
354 return nil, true, nil
355 }
356
357 func newPersistentVolumeAttachDetachControllerDescriptor() *ControllerDescriptor {
358 return &ControllerDescriptor{
359 name: names.PersistentVolumeAttachDetachController,
360 aliases: []string{"attachdetach"},
361 initFunc: startPersistentVolumeAttachDetachController,
362 }
363 }
364
365 func startPersistentVolumeAttachDetachController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
366 logger := klog.FromContext(ctx)
367 csiNodeInformer := controllerContext.InformerFactory.Storage().V1().CSINodes()
368 csiDriverInformer := controllerContext.InformerFactory.Storage().V1().CSIDrivers()
369
370 plugins, err := ProbeAttachableVolumePlugins(logger)
371 if err != nil {
372 return nil, true, fmt.Errorf("failed to probe volume plugins when starting attach/detach controller: %v", err)
373 }
374
375 ctx = klog.NewContext(ctx, logger)
376 attachDetachController, attachDetachControllerErr :=
377 attachdetach.NewAttachDetachController(
378 ctx,
379 controllerContext.ClientBuilder.ClientOrDie("attachdetach-controller"),
380 controllerContext.InformerFactory.Core().V1().Pods(),
381 controllerContext.InformerFactory.Core().V1().Nodes(),
382 controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
383 controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
384 csiNodeInformer,
385 csiDriverInformer,
386 controllerContext.InformerFactory.Storage().V1().VolumeAttachments(),
387 controllerContext.Cloud,
388 plugins,
389 GetDynamicPluginProber(controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
390 controllerContext.ComponentConfig.AttachDetachController.DisableAttachDetachReconcilerSync,
391 controllerContext.ComponentConfig.AttachDetachController.ReconcilerSyncLoopPeriod.Duration,
392 controllerContext.ComponentConfig.AttachDetachController.DisableForceDetachOnTimeout,
393 attachdetach.DefaultTimerConfig,
394 )
395 if attachDetachControllerErr != nil {
396 return nil, true, fmt.Errorf("failed to start attach/detach controller: %v", attachDetachControllerErr)
397 }
398 go attachDetachController.Run(ctx)
399 return nil, true, nil
400 }
401
402 func newPersistentVolumeExpanderControllerDescriptor() *ControllerDescriptor {
403 return &ControllerDescriptor{
404 name: names.PersistentVolumeExpanderController,
405 aliases: []string{"persistentvolume-expander"},
406 initFunc: startPersistentVolumeExpanderController,
407 }
408 }
409
410 func startPersistentVolumeExpanderController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
411 logger := klog.FromContext(ctx)
412 plugins, err := ProbeExpandableVolumePlugins(logger, controllerContext.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration)
413 if err != nil {
414 return nil, true, fmt.Errorf("failed to probe volume plugins when starting volume expand controller: %v", err)
415 }
416 csiTranslator := csitrans.New()
417
418 expandController, expandControllerErr := expand.NewExpandController(
419 ctx,
420 controllerContext.ClientBuilder.ClientOrDie("expand-controller"),
421 controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
422 controllerContext.Cloud,
423 plugins,
424 csiTranslator,
425 csimigration.NewPluginManager(csiTranslator, utilfeature.DefaultFeatureGate),
426 )
427
428 if expandControllerErr != nil {
429 return nil, true, fmt.Errorf("failed to start volume expand controller: %v", expandControllerErr)
430 }
431 go expandController.Run(ctx)
432 return nil, true, nil
433 }
434
435 func newEphemeralVolumeControllerDescriptor() *ControllerDescriptor {
436 return &ControllerDescriptor{
437 name: names.EphemeralVolumeController,
438 aliases: []string{"ephemeral-volume"},
439 initFunc: startEphemeralVolumeController,
440 }
441 }
442
443 func startEphemeralVolumeController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
444 ephemeralController, err := ephemeral.NewController(
445 ctx,
446 controllerContext.ClientBuilder.ClientOrDie("ephemeral-volume-controller"),
447 controllerContext.InformerFactory.Core().V1().Pods(),
448 controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims())
449 if err != nil {
450 return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
451 }
452 go ephemeralController.Run(ctx, int(controllerContext.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs))
453 return nil, true, nil
454 }
455
456 const defaultResourceClaimControllerWorkers = 10
457
458 func newResourceClaimControllerDescriptor() *ControllerDescriptor {
459 return &ControllerDescriptor{
460 name: names.ResourceClaimController,
461 aliases: []string{"resource-claim-controller"},
462 initFunc: startResourceClaimController,
463 requiredFeatureGates: []featuregate.Feature{
464 features.DynamicResourceAllocation,
465 },
466 }
467 }
468
469 func startResourceClaimController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
470 ephemeralController, err := resourceclaim.NewController(
471 klog.FromContext(ctx),
472 controllerContext.ClientBuilder.ClientOrDie("resource-claim-controller"),
473 controllerContext.InformerFactory.Core().V1().Pods(),
474 controllerContext.InformerFactory.Resource().V1alpha2().PodSchedulingContexts(),
475 controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaims(),
476 controllerContext.InformerFactory.Resource().V1alpha2().ResourceClaimTemplates())
477 if err != nil {
478 return nil, true, fmt.Errorf("failed to start resource claim controller: %v", err)
479 }
480 go ephemeralController.Run(ctx, defaultResourceClaimControllerWorkers)
481 return nil, true, nil
482 }
483
484 func newEndpointsControllerDescriptor() *ControllerDescriptor {
485 return &ControllerDescriptor{
486 name: names.EndpointsController,
487 aliases: []string{"endpoint"},
488 initFunc: startEndpointsController,
489 }
490 }
491
492 func startEndpointsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
493 go endpointcontroller.NewEndpointController(
494 ctx,
495 controllerContext.InformerFactory.Core().V1().Pods(),
496 controllerContext.InformerFactory.Core().V1().Services(),
497 controllerContext.InformerFactory.Core().V1().Endpoints(),
498 controllerContext.ClientBuilder.ClientOrDie("endpoint-controller"),
499 controllerContext.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
500 ).Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))
501 return nil, true, nil
502 }
503
504 func newReplicationControllerDescriptor() *ControllerDescriptor {
505 return &ControllerDescriptor{
506 name: names.ReplicationControllerController,
507 aliases: []string{"replicationcontroller"},
508 initFunc: startReplicationController,
509 }
510 }
511
512 func startReplicationController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
513 go replicationcontroller.NewReplicationManager(
514 ctx,
515 controllerContext.InformerFactory.Core().V1().Pods(),
516 controllerContext.InformerFactory.Core().V1().ReplicationControllers(),
517 controllerContext.ClientBuilder.ClientOrDie("replication-controller"),
518 replicationcontroller.BurstReplicas,
519 ).Run(ctx, int(controllerContext.ComponentConfig.ReplicationController.ConcurrentRCSyncs))
520 return nil, true, nil
521 }
522
523 func newPodGarbageCollectorControllerDescriptor() *ControllerDescriptor {
524 return &ControllerDescriptor{
525 name: names.PodGarbageCollectorController,
526 aliases: []string{"podgc"},
527 initFunc: startPodGarbageCollectorController,
528 }
529 }
530
531 func startPodGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
532 go podgc.NewPodGC(
533 ctx,
534 controllerContext.ClientBuilder.ClientOrDie("pod-garbage-collector"),
535 controllerContext.InformerFactory.Core().V1().Pods(),
536 controllerContext.InformerFactory.Core().V1().Nodes(),
537 int(controllerContext.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
538 ).Run(ctx)
539 return nil, true, nil
540 }
541
542 func newResourceQuotaControllerDescriptor() *ControllerDescriptor {
543 return &ControllerDescriptor{
544 name: names.ResourceQuotaController,
545 aliases: []string{"resourcequota"},
546 initFunc: startResourceQuotaController,
547 }
548 }
549
550 func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
551 resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
552 resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
553 discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
554 listerFuncForResource := generic.ListerFuncForResourceFunc(controllerContext.InformerFactory.ForResource)
555 quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
556
557 resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
558 QuotaClient: resourceQuotaControllerClient.CoreV1(),
559 ResourceQuotaInformer: controllerContext.InformerFactory.Core().V1().ResourceQuotas(),
560 ResyncPeriod: pkgcontroller.StaticResyncPeriodFunc(controllerContext.ComponentConfig.ResourceQuotaController.ResourceQuotaSyncPeriod.Duration),
561 InformerFactory: controllerContext.ObjectOrMetadataInformerFactory,
562 ReplenishmentResyncPeriod: controllerContext.ResyncPeriod,
563 DiscoveryFunc: discoveryFunc,
564 IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
565 InformersStarted: controllerContext.InformersStarted,
566 Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
567 UpdateFilter: quotainstall.DefaultUpdateFilter(),
568 }
569 resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
570 if err != nil {
571 return nil, false, err
572 }
573 go resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs))
574
575
576 go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
577
578 return nil, true, nil
579 }
580
581 func newNamespaceControllerDescriptor() *ControllerDescriptor {
582 return &ControllerDescriptor{
583 name: names.NamespaceController,
584 aliases: []string{"namespace"},
585 initFunc: startNamespaceController,
586 }
587 }
588
589 func startNamespaceController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
590
591
592
593 nsKubeconfig := controllerContext.ClientBuilder.ConfigOrDie("namespace-controller")
594 nsKubeconfig.QPS *= 20
595 nsKubeconfig.Burst *= 100
596 namespaceKubeClient := clientset.NewForConfigOrDie(nsKubeconfig)
597 return startModifiedNamespaceController(ctx, controllerContext, namespaceKubeClient, nsKubeconfig)
598 }
599
600 func startModifiedNamespaceController(ctx context.Context, controllerContext ControllerContext, namespaceKubeClient clientset.Interface, nsKubeconfig *restclient.Config) (controller.Interface, bool, error) {
601
602 metadataClient, err := metadata.NewForConfig(nsKubeconfig)
603 if err != nil {
604 return nil, true, err
605 }
606
607 discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources
608
609 namespaceController := namespacecontroller.NewNamespaceController(
610 ctx,
611 namespaceKubeClient,
612 metadataClient,
613 discoverResourcesFn,
614 controllerContext.InformerFactory.Core().V1().Namespaces(),
615 controllerContext.ComponentConfig.NamespaceController.NamespaceSyncPeriod.Duration,
616 v1.FinalizerKubernetes,
617 )
618 go namespaceController.Run(ctx, int(controllerContext.ComponentConfig.NamespaceController.ConcurrentNamespaceSyncs))
619
620 return nil, true, nil
621 }
622
623 func newServiceAccountControllerDescriptor() *ControllerDescriptor {
624 return &ControllerDescriptor{
625 name: names.ServiceAccountController,
626 aliases: []string{"serviceaccount"},
627 initFunc: startServiceAccountController,
628 }
629 }
630
631 func startServiceAccountController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
632 sac, err := serviceaccountcontroller.NewServiceAccountsController(
633 controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
634 controllerContext.InformerFactory.Core().V1().Namespaces(),
635 controllerContext.ClientBuilder.ClientOrDie("service-account-controller"),
636 serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
637 )
638 if err != nil {
639 return nil, true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
640 }
641 go sac.Run(ctx, 1)
642 return nil, true, nil
643 }
644
645 func newTTLControllerDescriptor() *ControllerDescriptor {
646 return &ControllerDescriptor{
647 name: names.TTLController,
648 aliases: []string{"ttl"},
649 initFunc: startTTLController,
650 }
651 }
652
653 func startTTLController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
654 go ttlcontroller.NewTTLController(
655 ctx,
656 controllerContext.InformerFactory.Core().V1().Nodes(),
657 controllerContext.ClientBuilder.ClientOrDie("ttl-controller"),
658 ).Run(ctx, 5)
659 return nil, true, nil
660 }
661
662 func newGarbageCollectorControllerDescriptor() *ControllerDescriptor {
663 return &ControllerDescriptor{
664 name: names.GarbageCollectorController,
665 aliases: []string{"garbagecollector"},
666 initFunc: startGarbageCollectorController,
667 }
668 }
669
670 func startGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
671 if !controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector {
672 return nil, false, nil
673 }
674
675 gcClientset := controllerContext.ClientBuilder.ClientOrDie("generic-garbage-collector")
676 discoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("generic-garbage-collector")
677
678 config := controllerContext.ClientBuilder.ConfigOrDie("generic-garbage-collector")
679
680
681 config.QPS *= 2
682 metadataClient, err := metadata.NewForConfig(config)
683 if err != nil {
684 return nil, true, err
685 }
686
687 ignoredResources := make(map[schema.GroupResource]struct{})
688 for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
689 ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
690 }
691
692 garbageCollector, err := garbagecollector.NewComposedGarbageCollector(
693 ctx,
694 gcClientset,
695 metadataClient,
696 controllerContext.RESTMapper,
697 controllerContext.GraphBuilder,
698 )
699 if err != nil {
700 return nil, true, fmt.Errorf("failed to start the generic garbage collector: %w", err)
701 }
702
703
704 workers := int(controllerContext.ComponentConfig.GarbageCollectorController.ConcurrentGCSyncs)
705 go garbageCollector.Run(ctx, workers)
706
707
708
709 go garbageCollector.Sync(ctx, discoveryClient, 30*time.Second)
710
711 return garbageCollector, true, nil
712 }
713
714 func newPersistentVolumeClaimProtectionControllerDescriptor() *ControllerDescriptor {
715 return &ControllerDescriptor{
716 name: names.PersistentVolumeClaimProtectionController,
717 aliases: []string{"pvc-protection"},
718 initFunc: startPersistentVolumeClaimProtectionController,
719 }
720 }
721
722 func startPersistentVolumeClaimProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
723 pvcProtectionController, err := pvcprotection.NewPVCProtectionController(
724 klog.FromContext(ctx),
725 controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
726 controllerContext.InformerFactory.Core().V1().Pods(),
727 controllerContext.ClientBuilder.ClientOrDie("pvc-protection-controller"),
728 )
729 if err != nil {
730 return nil, true, fmt.Errorf("failed to start the pvc protection controller: %v", err)
731 }
732 go pvcProtectionController.Run(ctx, 1)
733 return nil, true, nil
734 }
735
736 func newPersistentVolumeProtectionControllerDescriptor() *ControllerDescriptor {
737 return &ControllerDescriptor{
738 name: names.PersistentVolumeProtectionController,
739 aliases: []string{"pv-protection"},
740 initFunc: startPersistentVolumeProtectionController,
741 }
742 }
743
744 func startPersistentVolumeProtectionController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
745 go pvprotection.NewPVProtectionController(
746 klog.FromContext(ctx),
747 controllerContext.InformerFactory.Core().V1().PersistentVolumes(),
748 controllerContext.ClientBuilder.ClientOrDie("pv-protection-controller"),
749 ).Run(ctx, 1)
750 return nil, true, nil
751 }
752
753 func newTTLAfterFinishedControllerDescriptor() *ControllerDescriptor {
754 return &ControllerDescriptor{
755 name: names.TTLAfterFinishedController,
756 aliases: []string{"ttl-after-finished"},
757 initFunc: startTTLAfterFinishedController,
758 }
759 }
760
761 func startTTLAfterFinishedController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
762 go ttlafterfinished.New(
763 ctx,
764 controllerContext.InformerFactory.Batch().V1().Jobs(),
765 controllerContext.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
766 ).Run(ctx, int(controllerContext.ComponentConfig.TTLAfterFinishedController.ConcurrentTTLSyncs))
767 return nil, true, nil
768 }
769
770 func newLegacyServiceAccountTokenCleanerControllerDescriptor() *ControllerDescriptor {
771 return &ControllerDescriptor{
772 name: names.LegacyServiceAccountTokenCleanerController,
773 aliases: []string{"legacy-service-account-token-cleaner"},
774 initFunc: startLegacyServiceAccountTokenCleanerController,
775 }
776 }
777
778 func startLegacyServiceAccountTokenCleanerController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
779 cleanUpPeriod := controllerContext.ComponentConfig.LegacySATokenCleaner.CleanUpPeriod.Duration
780 legacySATokenCleaner, err := serviceaccountcontroller.NewLegacySATokenCleaner(
781 controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
782 controllerContext.InformerFactory.Core().V1().Secrets(),
783 controllerContext.InformerFactory.Core().V1().Pods(),
784 controllerContext.ClientBuilder.ClientOrDie("legacy-service-account-token-cleaner"),
785 clock.RealClock{},
786 serviceaccountcontroller.LegacySATokenCleanerOptions{
787 CleanUpPeriod: cleanUpPeriod,
788 SyncInterval: serviceaccountcontroller.DefaultCleanerSyncInterval,
789 })
790 if err != nil {
791 return nil, true, fmt.Errorf("failed to start the legacy service account token cleaner: %v", err)
792 }
793 go legacySATokenCleaner.Run(ctx)
794 return nil, true, nil
795 }
796
797
798
799
800 func validateCIDRs(cidrsList string) ([]*net.IPNet, error) {
801
802 clusterCIDRs, dualStack, err := processCIDRs(cidrsList)
803 if err != nil {
804 return nil, err
805 }
806
807
808 if len(clusterCIDRs) > 1 && !dualStack {
809 return nil, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
810 }
811
812
813 if len(clusterCIDRs) > 2 {
814 return nil, fmt.Errorf("length of clusterCIDRs is:%v more than max allowed of 2", len(clusterCIDRs))
815 }
816
817 return clusterCIDRs, nil
818 }
819
820
821
822
823
824 func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) {
825 cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",")
826
827 cidrs, err := netutils.ParseCIDRs(cidrsSplit)
828 if err != nil {
829 return nil, false, err
830 }
831
832
833
834 dualstack, _ := netutils.IsDualStackCIDRs(cidrs)
835
836 return cidrs, dualstack, nil
837 }
838
839
840
841
842 func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, clusterCIDRs []*net.IPNet) ([]int, error) {
843
844 sortedSizes := func(maskSizeIPv4, maskSizeIPv6 int) []int {
845 nodeMaskCIDRs := make([]int, len(clusterCIDRs))
846
847 for idx, clusterCIDR := range clusterCIDRs {
848 if netutils.IsIPv6CIDR(clusterCIDR) {
849 nodeMaskCIDRs[idx] = maskSizeIPv6
850 } else {
851 nodeMaskCIDRs[idx] = maskSizeIPv4
852 }
853 }
854 return nodeMaskCIDRs
855 }
856
857
858 ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
859 isDualstack := len(clusterCIDRs) > 1
860
861
862 if isDualstack {
863
864 if cfg.NodeCIDRMaskSize != 0 {
865 return nil, errors.New("usage of --node-cidr-mask-size is not allowed with dual-stack clusters")
866
867 }
868
869 if cfg.NodeCIDRMaskSizeIPv4 != 0 {
870 ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
871 }
872 if cfg.NodeCIDRMaskSizeIPv6 != 0 {
873 ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
874 }
875 return sortedSizes(ipv4Mask, ipv6Mask), nil
876 }
877
878 maskConfigured := cfg.NodeCIDRMaskSize != 0
879 maskV4Configured := cfg.NodeCIDRMaskSizeIPv4 != 0
880 maskV6Configured := cfg.NodeCIDRMaskSizeIPv6 != 0
881 isSingleStackIPv6 := netutils.IsIPv6CIDR(clusterCIDRs[0])
882
883
884 if maskConfigured {
885
886 if maskV4Configured || maskV6Configured {
887 return nil, errors.New("usage of --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 is not allowed if --node-cidr-mask-size is set. For dual-stack clusters please unset it and use IPFamily specific flags")
888 }
889
890 mask := int(cfg.NodeCIDRMaskSize)
891 return sortedSizes(mask, mask), nil
892 }
893
894 if maskV4Configured {
895 if isSingleStackIPv6 {
896 return nil, errors.New("usage of --node-cidr-mask-size-ipv4 is not allowed for a single-stack IPv6 cluster")
897 }
898
899 ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
900 }
901
902
903 if maskV6Configured {
904 if !isSingleStackIPv6 {
905 return nil, errors.New("usage of --node-cidr-mask-size-ipv6 is not allowed for a single-stack IPv4 cluster")
906 }
907
908 ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
909 }
910 return sortedSizes(ipv4Mask, ipv6Mask), nil
911 }
912
913 func newStorageVersionGarbageCollectorControllerDescriptor() *ControllerDescriptor {
914 return &ControllerDescriptor{
915 name: names.StorageVersionGarbageCollectorController,
916 aliases: []string{"storage-version-gc"},
917 initFunc: startStorageVersionGarbageCollectorController,
918 requiredFeatureGates: []featuregate.Feature{
919 genericfeatures.APIServerIdentity,
920 genericfeatures.StorageVersionAPI,
921 },
922 }
923 }
924
925 func startStorageVersionGarbageCollectorController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
926 go storageversiongc.NewStorageVersionGC(
927 ctx,
928 controllerContext.ClientBuilder.ClientOrDie("storage-version-garbage-collector"),
929 controllerContext.InformerFactory.Coordination().V1().Leases(),
930 controllerContext.InformerFactory.Internal().V1alpha1().StorageVersions(),
931 ).Run(ctx)
932 return nil, true, nil
933 }
934
View as plain text