1
16
17 package servicecidrs
18
19 import (
20 "context"
21 "encoding/json"
22 "net/netip"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 networkingapiv1alpha1 "k8s.io/api/networking/v1alpha1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/types"
32 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33 "k8s.io/apimachinery/pkg/util/sets"
34 "k8s.io/apimachinery/pkg/util/wait"
35 metav1apply "k8s.io/client-go/applyconfigurations/meta/v1"
36 networkingapiv1alpha1apply "k8s.io/client-go/applyconfigurations/networking/v1alpha1"
37 networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
38 clientset "k8s.io/client-go/kubernetes"
39 "k8s.io/client-go/kubernetes/scheme"
40 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
41 networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
42 "k8s.io/client-go/tools/cache"
43 "k8s.io/client-go/tools/record"
44 "k8s.io/client-go/util/workqueue"
45 "k8s.io/klog/v2"
46 "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
47 "k8s.io/kubernetes/pkg/util/iptree"
48 netutils "k8s.io/utils/net"
49 )
50
51 const (
52
53
54
55
56
57 maxRetries = 15
58 controllerName = "service-cidr-controller"
59
60 ServiceCIDRProtectionFinalizer = "networking.k8s.io/service-cidr-finalizer"
61
62
63
64
65 deletionGracePeriod = 10 * time.Second
66 )
67
68
69 func NewController(
70 ctx context.Context,
71 serviceCIDRInformer networkinginformers.ServiceCIDRInformer,
72 ipAddressInformer networkinginformers.IPAddressInformer,
73 client clientset.Interface,
74 ) *Controller {
75 broadcaster := record.NewBroadcaster(record.WithContext(ctx))
76 recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
77 c := &Controller{
78 client: client,
79 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"),
80 tree: iptree.New[sets.Set[string]](),
81 workerLoopPeriod: time.Second,
82 }
83
84 _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
85 AddFunc: c.addServiceCIDR,
86 UpdateFunc: c.updateServiceCIDR,
87 DeleteFunc: c.deleteServiceCIDR,
88 })
89 c.serviceCIDRLister = serviceCIDRInformer.Lister()
90 c.serviceCIDRsSynced = serviceCIDRInformer.Informer().HasSynced
91
92 _, _ = ipAddressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
93 AddFunc: c.addIPAddress,
94 DeleteFunc: c.deleteIPAddress,
95 })
96
97 c.ipAddressLister = ipAddressInformer.Lister()
98 c.ipAddressSynced = ipAddressInformer.Informer().HasSynced
99
100 c.eventBroadcaster = broadcaster
101 c.eventRecorder = recorder
102
103 return c
104 }
105
106
107 type Controller struct {
108 client clientset.Interface
109 eventBroadcaster record.EventBroadcaster
110 eventRecorder record.EventRecorder
111
112 serviceCIDRLister networkinglisters.ServiceCIDRLister
113 serviceCIDRsSynced cache.InformerSynced
114
115 ipAddressLister networkinglisters.IPAddressLister
116 ipAddressSynced cache.InformerSynced
117
118 queue workqueue.RateLimitingInterface
119
120
121 workerLoopPeriod time.Duration
122
123
124 muTree sync.Mutex
125 tree *iptree.Tree[sets.Set[string]]
126 }
127
128
129 func (c *Controller) Run(ctx context.Context, workers int) {
130 defer utilruntime.HandleCrash()
131 defer c.queue.ShutDown()
132
133 c.eventBroadcaster.StartStructuredLogging(3)
134 c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
135 defer c.eventBroadcaster.Shutdown()
136
137 logger := klog.FromContext(ctx)
138
139 logger.Info("Starting", "controller", controllerName)
140 defer logger.Info("Shutting down", "controller", controllerName)
141
142 if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.serviceCIDRsSynced, c.ipAddressSynced) {
143 return
144 }
145
146 for i := 0; i < workers; i++ {
147 go wait.UntilWithContext(ctx, c.worker, c.workerLoopPeriod)
148 }
149 <-ctx.Done()
150 }
151
152 func (c *Controller) addServiceCIDR(obj interface{}) {
153 cidr, ok := obj.(*networkingapiv1alpha1.ServiceCIDR)
154 if !ok {
155 return
156 }
157 c.queue.Add(cidr.Name)
158 for _, key := range c.overlappingServiceCIDRs(cidr) {
159 c.queue.Add(key)
160 }
161 }
162
163 func (c *Controller) updateServiceCIDR(oldObj, obj interface{}) {
164 key, err := cache.MetaNamespaceKeyFunc(obj)
165 if err == nil {
166 c.queue.Add(key)
167 }
168 }
169
170
171 func (c *Controller) deleteServiceCIDR(obj interface{}) {
172 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
173 if err == nil {
174 c.queue.Add(key)
175 }
176 }
177
178
179 func (c *Controller) addIPAddress(obj interface{}) {
180 ip, ok := obj.(*networkingapiv1alpha1.IPAddress)
181 if !ok {
182 return
183 }
184
185 for _, cidr := range c.containingServiceCIDRs(ip) {
186 c.queue.Add(cidr)
187 }
188 }
189
190
191 func (c *Controller) deleteIPAddress(obj interface{}) {
192 ip, ok := obj.(*networkingapiv1alpha1.IPAddress)
193 if !ok {
194 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
195 if !ok {
196 return
197 }
198 ip, ok = tombstone.Obj.(*networkingapiv1alpha1.IPAddress)
199 if !ok {
200 return
201 }
202 }
203
204 for _, cidr := range c.containingServiceCIDRs(ip) {
205 c.queue.Add(cidr)
206 }
207 }
208
209
210
211
212 func (c *Controller) overlappingServiceCIDRs(serviceCIDR *networkingapiv1alpha1.ServiceCIDR) []string {
213 c.muTree.Lock()
214 defer c.muTree.Unlock()
215
216 serviceCIDRs := sets.New[string]()
217 for _, cidr := range serviceCIDR.Spec.CIDRs {
218 if prefix, err := netip.ParsePrefix(cidr); err == nil {
219 c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
220 serviceCIDRs.Insert(v.UnsortedList()...)
221 return false
222 })
223 c.tree.WalkPrefix(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
224 serviceCIDRs.Insert(v.UnsortedList()...)
225 return false
226 })
227 }
228 }
229
230 return serviceCIDRs.UnsortedList()
231 }
232
233
234
235 func (c *Controller) containingServiceCIDRs(ip *networkingapiv1alpha1.IPAddress) []string {
236
237 managedBy, ok := ip.Labels[networkingapiv1alpha1.LabelManagedBy]
238 if !ok || managedBy != ipallocator.ControllerName {
239 return []string{}
240 }
241
242 address, err := netip.ParseAddr(ip.Name)
243 if err != nil {
244
245
246 return []string{}
247 }
248
249 c.muTree.Lock()
250 defer c.muTree.Unlock()
251 serviceCIDRs := []string{}
252
253 prefixes := c.tree.GetHostIPPrefixMatches(address)
254 for _, v := range prefixes {
255 serviceCIDRs = append(serviceCIDRs, v.UnsortedList()...)
256 }
257
258 return serviceCIDRs
259 }
260
261 func (c *Controller) worker(ctx context.Context) {
262 for c.processNext(ctx) {
263 }
264 }
265
266 func (c *Controller) processNext(ctx context.Context) bool {
267 eKey, quit := c.queue.Get()
268 if quit {
269 return false
270 }
271 defer c.queue.Done(eKey)
272
273 key := eKey.(string)
274 err := c.sync(ctx, key)
275 if err == nil {
276 c.queue.Forget(key)
277 return true
278 }
279 logger := klog.FromContext(ctx)
280 if c.queue.NumRequeues(key) < maxRetries {
281 logger.V(2).Info("Error syncing ServiceCIDR, retrying", "ServiceCIDR", key, "err", err)
282 c.queue.AddRateLimited(key)
283 } else {
284 logger.Info("Dropping ServiceCIDR out of the queue", "ServiceCIDR", key, "err", err)
285 c.queue.Forget(key)
286 utilruntime.HandleError(err)
287 }
288 return true
289 }
290
291
292 func (c *Controller) syncCIDRs() error {
293 serviceCIDRList, err := c.serviceCIDRLister.List(labels.Everything())
294 if err != nil {
295 return err
296 }
297
298
299
300
301
302 tree := iptree.New[sets.Set[string]]()
303 for _, serviceCIDR := range serviceCIDRList {
304 for _, cidr := range serviceCIDR.Spec.CIDRs {
305 if prefix, err := netip.ParsePrefix(cidr); err == nil {
306
307 v, ok := tree.GetPrefix(prefix)
308 if !ok {
309 v = sets.Set[string]{}
310 }
311 v.Insert(serviceCIDR.Name)
312 tree.InsertPrefix(prefix, v)
313 }
314 }
315 }
316
317 c.muTree.Lock()
318 defer c.muTree.Unlock()
319 c.tree = tree
320 return nil
321 }
322
323 func (c *Controller) sync(ctx context.Context, key string) error {
324 logger := klog.FromContext(ctx)
325 startTime := time.Now()
326 defer func() {
327 logger.V(4).Info("Finished syncing ServiceCIDR)", "ServiceCIDR", key, "elapsed", time.Since(startTime))
328 }()
329
330
331
332 err := c.syncCIDRs()
333 if err != nil {
334 return err
335 }
336
337 logger.V(4).Info("syncing ServiceCIDR", "ServiceCIDR", key)
338 cidr, err := c.serviceCIDRLister.Get(key)
339 if err != nil {
340 if apierrors.IsNotFound(err) {
341 logger.V(4).Info("ServiceCIDR no longer exist", "ServiceCIDR", key)
342 return nil
343 }
344 return err
345 }
346
347
348 if !cidr.GetDeletionTimestamp().IsZero() {
349
350 ok, err := c.canDeleteCIDR(ctx, cidr)
351 if err != nil {
352 return err
353 }
354 if !ok {
355
356
357
358 svcApplyStatus := networkingapiv1alpha1apply.ServiceCIDRStatus().WithConditions(
359 metav1apply.Condition().
360 WithType(networkingapiv1alpha1.ServiceCIDRConditionReady).
361 WithStatus(metav1.ConditionFalse).
362 WithReason(networkingapiv1alpha1.ServiceCIDRReasonTerminating).
363 WithMessage("There are still IPAddresses referencing the ServiceCIDR, please remove them or create a new ServiceCIDR").
364 WithLastTransitionTime(metav1.Now()))
365 svcApply := networkingapiv1alpha1apply.ServiceCIDR(cidr.Name).WithStatus(svcApplyStatus)
366 _, err = c.client.NetworkingV1alpha1().ServiceCIDRs().ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true})
367 return err
368 }
369
370
371
372
373
374 timeUntilDeleted := deletionGracePeriod - time.Since(cidr.GetDeletionTimestamp().Time)
375 if timeUntilDeleted > 0 {
376 c.queue.AddAfter(key, timeUntilDeleted)
377 return nil
378 }
379 return c.removeServiceCIDRFinalizerIfNeeded(ctx, cidr)
380 }
381
382
383 err = c.addServiceCIDRFinalizerIfNeeded(ctx, cidr)
384 if err != nil {
385 return err
386 }
387
388
389 svcApplyStatus := networkingapiv1alpha1apply.ServiceCIDRStatus().WithConditions(
390 metav1apply.Condition().
391 WithType(networkingapiv1alpha1.ServiceCIDRConditionReady).
392 WithStatus(metav1.ConditionTrue).
393 WithMessage("Kubernetes Service CIDR is ready").
394 WithLastTransitionTime(metav1.Now()))
395 svcApply := networkingapiv1alpha1apply.ServiceCIDR(cidr.Name).WithStatus(svcApplyStatus)
396 if _, err := c.client.NetworkingV1alpha1().ServiceCIDRs().ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true}); err != nil {
397 logger.Info("error updating default ServiceCIDR status", "error", err)
398 c.eventRecorder.Eventf(cidr, v1.EventTypeWarning, "KubernetesServiceCIDRError", "The ServiceCIDR Status can not be set to Ready=True")
399 return err
400 }
401
402 return nil
403 }
404
405
406 func (c *Controller) canDeleteCIDR(ctx context.Context, serviceCIDR *networkingapiv1alpha1.ServiceCIDR) (bool, error) {
407
408
409 c.muTree.Lock()
410 defer c.muTree.Unlock()
411 logger := klog.FromContext(ctx)
412
413 hasParent := true
414 for _, cidr := range serviceCIDR.Spec.CIDRs {
415
416
417 if prefix, err := netip.ParsePrefix(cidr); err == nil {
418 serviceCIDRs := sets.New[string]()
419 c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
420 serviceCIDRs.Insert(v.UnsortedList()...)
421 return false
422 })
423 if serviceCIDRs.Len() == 1 && serviceCIDRs.Has(serviceCIDR.Name) {
424 hasParent = false
425 }
426 }
427 }
428
429
430
431 if hasParent {
432 logger.V(2).Info("Removing finalizer for ServiceCIDR", "ServiceCIDR", serviceCIDR.String())
433 return true, nil
434 }
435
436
437
438
439 for _, cidr := range serviceCIDR.Spec.CIDRs {
440
441 ipLabelSelector := labels.Set(map[string]string{
442 networkingapiv1alpha1.LabelIPAddressFamily: string(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))),
443 networkingapiv1alpha1.LabelManagedBy: ipallocator.ControllerName,
444 }).AsSelectorPreValidated()
445 ips, err := c.ipAddressLister.List(ipLabelSelector)
446 if err != nil {
447 return false, err
448 }
449 for _, ip := range ips {
450
451
452
453 address, err := netip.ParseAddr(ip.Name)
454 if err != nil {
455
456 logger.Info("[SHOULD NOT HAPPEN] unexpected error parsing IPAddress", "IPAddress", ip.Name, "error", err)
457 continue
458 }
459
460 prefixes := c.tree.GetHostIPPrefixMatches(address)
461 if len(prefixes) != 1 {
462 continue
463 }
464 for _, v := range prefixes {
465 if v.Len() == 1 && v.Has(serviceCIDR.Name) {
466 return false, nil
467 }
468 }
469 }
470 }
471
472
473
474 logger.Info("ServiceCIDR no longer have orphan IPs", "ServiceCDIR", serviceCIDR.String())
475 return true, nil
476 }
477
478 func (c *Controller) addServiceCIDRFinalizerIfNeeded(ctx context.Context, cidr *networkingapiv1alpha1.ServiceCIDR) error {
479 for _, f := range cidr.GetFinalizers() {
480 if f == ServiceCIDRProtectionFinalizer {
481 return nil
482 }
483 }
484
485 patch := map[string]interface{}{
486 "metadata": map[string]interface{}{
487 "finalizers": []string{ServiceCIDRProtectionFinalizer},
488 },
489 }
490 patchBytes, err := json.Marshal(patch)
491 if err != nil {
492 return err
493 }
494 _, err = c.client.NetworkingV1alpha1().ServiceCIDRs().Patch(ctx, cidr.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
495 if err != nil && !apierrors.IsNotFound(err) {
496 return err
497 }
498 klog.FromContext(ctx).V(4).Info("Added protection finalizer to ServiceCIDR", "ServiceCIDR", cidr.Name)
499 return nil
500
501 }
502
503 func (c *Controller) removeServiceCIDRFinalizerIfNeeded(ctx context.Context, cidr *networkingapiv1alpha1.ServiceCIDR) error {
504 found := false
505 for _, f := range cidr.GetFinalizers() {
506 if f == ServiceCIDRProtectionFinalizer {
507 found = true
508 break
509 }
510 }
511 if !found {
512 return nil
513 }
514 patch := map[string]interface{}{
515 "metadata": map[string]interface{}{
516 "$deleteFromPrimitiveList/finalizers": []string{ServiceCIDRProtectionFinalizer},
517 },
518 }
519 patchBytes, err := json.Marshal(patch)
520 if err != nil {
521 return err
522 }
523 _, err = c.client.NetworkingV1alpha1().ServiceCIDRs().Patch(ctx, cidr.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
524 if err != nil && !apierrors.IsNotFound(err) {
525 return err
526 }
527 klog.FromContext(ctx).V(4).Info("Removed protection finalizer from ServiceCIDRs", "ServiceCIDR", cidr.Name)
528 return nil
529 }
530
531
532
533
534 func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
535 switch ipFamily {
536 case netutils.IPv4:
537 return v1.IPv4Protocol
538 case netutils.IPv6:
539 return v1.IPv6Protocol
540 }
541
542 return v1.IPFamilyUnknown
543 }
544
View as plain text