1
16
17 package ipam
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "sync"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/klog/v2"
27 netutils "k8s.io/utils/net"
28
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 "k8s.io/apimachinery/pkg/types"
31 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32 "k8s.io/apimachinery/pkg/util/sets"
33 informers "k8s.io/client-go/informers/core/v1"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/kubernetes/scheme"
36 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
37 corelisters "k8s.io/client-go/listers/core/v1"
38 "k8s.io/client-go/tools/cache"
39 "k8s.io/client-go/tools/record"
40 nodeutil "k8s.io/component-helpers/node/util"
41 "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
42 controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
43 )
44
45 type rangeAllocator struct {
46 client clientset.Interface
47
48 clusterCIDRs []*net.IPNet
49
50 cidrSets []*cidrset.CidrSet
51
52 nodeLister corelisters.NodeLister
53
54 nodesSynced cache.InformerSynced
55
56
57 nodeCIDRUpdateChannel chan nodeReservedCIDRs
58 broadcaster record.EventBroadcaster
59 recorder record.EventRecorder
60
61 lock sync.Mutex
62 nodesInProcessing sets.String
63 }
64
65
66
67
68
69
70 func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, nodeInformer informers.NodeInformer, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) {
71 logger := klog.FromContext(ctx)
72 if client == nil {
73 logger.Error(nil, "kubeClient is nil when starting CIDRRangeAllocator")
74 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
75 }
76
77 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
78 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
79
80
81
82 cidrSets := make([]*cidrset.CidrSet, len(allocatorParams.ClusterCIDRs))
83 for idx, cidr := range allocatorParams.ClusterCIDRs {
84 cidrSet, err := cidrset.NewCIDRSet(cidr, allocatorParams.NodeCIDRMaskSizes[idx])
85 if err != nil {
86 return nil, err
87 }
88 cidrSets[idx] = cidrSet
89 }
90
91 ra := &rangeAllocator{
92 client: client,
93 clusterCIDRs: allocatorParams.ClusterCIDRs,
94 cidrSets: cidrSets,
95 nodeLister: nodeInformer.Lister(),
96 nodesSynced: nodeInformer.Informer().HasSynced,
97 nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),
98 broadcaster: eventBroadcaster,
99 recorder: recorder,
100 nodesInProcessing: sets.NewString(),
101 }
102
103 if allocatorParams.ServiceCIDR != nil {
104 ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR)
105 } else {
106 logger.Info("No Service CIDR provided. Skipping filtering out service addresses")
107 }
108
109 if allocatorParams.SecondaryServiceCIDR != nil {
110 ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR)
111 } else {
112 logger.Info("No Secondary Service CIDR provided. Skipping filtering out secondary service addresses")
113 }
114
115 if nodeList != nil {
116 for _, node := range nodeList.Items {
117 if len(node.Spec.PodCIDRs) == 0 {
118 logger.V(4).Info("Node has no CIDR, ignoring", "node", klog.KObj(&node))
119 continue
120 }
121 logger.V(4).Info("Node has CIDR, occupying it in CIDR map", "node", klog.KObj(&node), "podCIDR", node.Spec.PodCIDR)
122 if err := ra.occupyCIDRs(&node); err != nil {
123
124
125
126
127 return nil, err
128 }
129 }
130 }
131
132 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
133 AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
134 return ra.AllocateOrOccupyCIDR(logger, node)
135 }),
136 UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 if len(newNode.Spec.PodCIDRs) == 0 {
157 return ra.AllocateOrOccupyCIDR(logger, newNode)
158 }
159 return nil
160 }),
161 DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
162 return ra.ReleaseCIDR(logger, node)
163 }),
164 })
165
166 return ra, nil
167 }
168
169 func (r *rangeAllocator) Run(ctx context.Context) {
170 defer utilruntime.HandleCrash()
171
172
173 r.broadcaster.StartStructuredLogging(3)
174 logger := klog.FromContext(ctx)
175 logger.Info("Sending events to api server")
176 r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
177 defer r.broadcaster.Shutdown()
178
179 logger.Info("Starting range CIDR allocator")
180 defer logger.Info("Shutting down range CIDR allocator")
181
182 if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), r.nodesSynced) {
183 return
184 }
185
186 for i := 0; i < cidrUpdateWorkers; i++ {
187 go r.worker(ctx)
188 }
189
190 <-ctx.Done()
191 }
192
193 func (r *rangeAllocator) worker(ctx context.Context) {
194 logger := klog.FromContext(ctx)
195 for {
196 select {
197 case workItem, ok := <-r.nodeCIDRUpdateChannel:
198 if !ok {
199 logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
200 return
201 }
202 if err := r.updateCIDRsAllocation(logger, workItem); err != nil {
203
204 r.nodeCIDRUpdateChannel <- workItem
205 }
206 case <-ctx.Done():
207 return
208 }
209 }
210 }
211
212 func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
213 r.lock.Lock()
214 defer r.lock.Unlock()
215 if r.nodesInProcessing.Has(nodeName) {
216 return false
217 }
218 r.nodesInProcessing.Insert(nodeName)
219 return true
220 }
221
222 func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
223 r.lock.Lock()
224 defer r.lock.Unlock()
225 r.nodesInProcessing.Delete(nodeName)
226 }
227
228
229 func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error {
230 defer r.removeNodeFromProcessing(node.Name)
231 if len(node.Spec.PodCIDRs) == 0 {
232 return nil
233 }
234 for idx, cidr := range node.Spec.PodCIDRs {
235 _, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
236 if err != nil {
237 return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
238 }
239
240
241
242 if idx >= len(r.cidrSets) {
243 return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx)
244 }
245
246 if err := r.cidrSets[idx].Occupy(podCIDR); err != nil {
247 return fmt.Errorf("failed to mark cidr[%v] at idx [%v] as occupied for node: %v: %v", podCIDR, idx, node.Name, err)
248 }
249 }
250 return nil
251 }
252
253
254
255
256 func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
257 if node == nil {
258 return nil
259 }
260 if !r.insertNodeToProcessing(node.Name) {
261 logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
262 return nil
263 }
264
265 if len(node.Spec.PodCIDRs) > 0 {
266 return r.occupyCIDRs(node)
267 }
268
269 allocated := nodeReservedCIDRs{
270 nodeName: node.Name,
271 allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
272 }
273
274 for idx := range r.cidrSets {
275 podCIDR, err := r.cidrSets[idx].AllocateNext()
276 if err != nil {
277 r.removeNodeFromProcessing(node.Name)
278 controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
279 return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
280 }
281 allocated.allocatedCIDRs[idx] = podCIDR
282 }
283
284
285 logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocated.allocatedCIDRs)
286 r.nodeCIDRUpdateChannel <- allocated
287 return nil
288 }
289
290
291 func (r *rangeAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
292 if node == nil || len(node.Spec.PodCIDRs) == 0 {
293 return nil
294 }
295
296 for idx, cidr := range node.Spec.PodCIDRs {
297 _, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
298 if err != nil {
299 return fmt.Errorf("failed to parse CIDR %s on Node %v: %v", cidr, node.Name, err)
300 }
301
302
303
304
305 if idx >= len(r.cidrSets) {
306 return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx)
307 }
308
309 logger.V(4).Info("Release CIDR for node", "CIDR", cidr, "node", klog.KObj(node))
310 if err = r.cidrSets[idx].Release(podCIDR); err != nil {
311 return fmt.Errorf("error when releasing CIDR %v: %v", cidr, err)
312 }
313 }
314 return nil
315 }
316
317
318
319 func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet) {
320
321
322
323
324
325 for idx, cidr := range r.clusterCIDRs {
326
327 if !cidr.Contains(serviceCIDR.IP.Mask(cidr.Mask)) && !serviceCIDR.Contains(cidr.IP.Mask(serviceCIDR.Mask)) {
328 continue
329 }
330
331
332 if err := r.cidrSets[idx].Occupy(serviceCIDR); err != nil {
333 logger.Error(err, "Error filtering out service cidr out cluster cidr", "CIDR", cidr, "index", idx, "serviceCIDR", serviceCIDR)
334 }
335 }
336 }
337
338
339 func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error {
340 var err error
341 var node *v1.Node
342 defer r.removeNodeFromProcessing(data.nodeName)
343 cidrsString := ipnetToStringList(data.allocatedCIDRs)
344 node, err = r.nodeLister.Get(data.nodeName)
345 if err != nil {
346 logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", data.nodeName))
347 return err
348 }
349
350
351
352
353 if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) {
354 match := true
355 for idx, cidr := range cidrsString {
356 if node.Spec.PodCIDRs[idx] != cidr {
357 match = false
358 break
359 }
360 }
361 if match {
362 logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", data.allocatedCIDRs)
363 return nil
364 }
365 }
366
367
368 if len(node.Spec.PodCIDRs) != 0 {
369 logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs)
370 for idx, cidr := range data.allocatedCIDRs {
371 if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
372 logger.Error(releaseErr, "Error when releasing CIDR", "index", idx, "CIDR", cidr)
373 }
374 }
375 return nil
376 }
377
378
379 for i := 0; i < cidrUpdateRetries; i++ {
380 if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
381 logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
382 return nil
383 }
384 }
385
386 logger.Error(err, "Failed to update node PodCIDR after multiple attempts", "node", klog.KObj(node), "podCIDRs", cidrsString)
387 controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
388
389
390
391 if !apierrors.IsServerTimeout(err) {
392 logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node))
393 for idx, cidr := range data.allocatedCIDRs {
394 if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
395 logger.Error(releaseErr, "Error releasing allocated CIDR for node", "node", klog.KObj(node))
396 }
397 }
398 }
399 return err
400 }
401
View as plain text