1
16
17 package nodeipam
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "time"
24
25 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26 coreinformers "k8s.io/client-go/informers/core/v1"
27 clientset "k8s.io/client-go/kubernetes"
28 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
29 corelisters "k8s.io/client-go/listers/core/v1"
30 "k8s.io/client-go/tools/cache"
31 "k8s.io/client-go/tools/record"
32 cloudprovider "k8s.io/cloud-provider"
33 controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
34 "k8s.io/klog/v2"
35 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
36 )
37
38 const (
39
40
41 ipamResyncInterval = 30 * time.Second
42
43
44 ipamMaxBackoff = 10 * time.Second
45
46
47 ipamInitialBackoff = 250 * time.Millisecond
48 )
49
50
51
52
53 type ipamController interface {
54 Run(ctx context.Context)
55 }
56
57
58 type Controller struct {
59 allocatorType ipam.CIDRAllocatorType
60
61 cloud cloudprovider.Interface
62 clusterCIDRs []*net.IPNet
63 serviceCIDR *net.IPNet
64 secondaryServiceCIDR *net.IPNet
65 kubeClient clientset.Interface
66 eventBroadcaster record.EventBroadcaster
67
68 nodeLister corelisters.NodeLister
69 nodeInformerSynced cache.InformerSynced
70
71 legacyIPAM ipamController
72 cidrAllocator ipam.CIDRAllocator
73 }
74
75
76
77
78
79
80 func NewNodeIpamController(
81 ctx context.Context,
82 nodeInformer coreinformers.NodeInformer,
83 cloud cloudprovider.Interface,
84 kubeClient clientset.Interface,
85 clusterCIDRs []*net.IPNet,
86 serviceCIDR *net.IPNet,
87 secondaryServiceCIDR *net.IPNet,
88 nodeCIDRMaskSizes []int,
89 allocatorType ipam.CIDRAllocatorType) (*Controller, error) {
90
91 if kubeClient == nil {
92 return nil, fmt.Errorf("kubeClient is nil when starting Controller")
93 }
94
95
96 if allocatorType != ipam.CloudAllocatorType {
97 if len(clusterCIDRs) == 0 {
98 return nil, fmt.Errorf("Controller: Must specify --cluster-cidr if --allocate-node-cidrs is set")
99 }
100
101 for idx, cidr := range clusterCIDRs {
102 mask := cidr.Mask
103 if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSizes[idx] {
104 return nil, fmt.Errorf("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than or equal to --node-cidr-mask-size configured for CIDR family")
105 }
106 }
107 }
108
109 ic := &Controller{
110 cloud: cloud,
111 kubeClient: kubeClient,
112 eventBroadcaster: record.NewBroadcaster(record.WithContext(ctx)),
113 clusterCIDRs: clusterCIDRs,
114 serviceCIDR: serviceCIDR,
115 secondaryServiceCIDR: secondaryServiceCIDR,
116 allocatorType: allocatorType,
117 }
118
119
120 if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType {
121 var err error
122 ic.legacyIPAM, err = createLegacyIPAM(ctx, ic, nodeInformer, cloud, kubeClient, clusterCIDRs, serviceCIDR, nodeCIDRMaskSizes)
123 if err != nil {
124 return nil, err
125 }
126 } else {
127 var err error
128
129 allocatorParams := ipam.CIDRAllocatorParams{
130 ClusterCIDRs: clusterCIDRs,
131 ServiceCIDR: ic.serviceCIDR,
132 SecondaryServiceCIDR: ic.secondaryServiceCIDR,
133 NodeCIDRMaskSizes: nodeCIDRMaskSizes,
134 }
135
136 ic.cidrAllocator, err = ipam.New(ctx, kubeClient, cloud, nodeInformer, ic.allocatorType, allocatorParams)
137 if err != nil {
138 return nil, err
139 }
140 }
141
142 ic.nodeLister = nodeInformer.Lister()
143 ic.nodeInformerSynced = nodeInformer.Informer().HasSynced
144
145 return ic, nil
146 }
147
148
149 func (nc *Controller) Run(ctx context.Context) {
150 defer utilruntime.HandleCrash()
151
152
153 nc.eventBroadcaster.StartStructuredLogging(3)
154 nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
155 defer nc.eventBroadcaster.Shutdown()
156 klog.FromContext(ctx).Info("Starting ipam controller")
157 defer klog.FromContext(ctx).Info("Shutting down ipam controller")
158
159 if !cache.WaitForNamedCacheSync("node", ctx.Done(), nc.nodeInformerSynced) {
160 return
161 }
162
163 if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
164 go nc.legacyIPAM.Run(ctx)
165 } else {
166 go nc.cidrAllocator.Run(ctx)
167 }
168
169 <-ctx.Done()
170 }
171
172
173 func (nc *Controller) RunWithMetrics(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
174 controllerManagerMetrics.ControllerStarted("nodeipam")
175 defer controllerManagerMetrics.ControllerStopped("nodeipam")
176 nc.Run(ctx)
177 }
178
View as plain text