/* Copyright 2023 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package servicecidrs import ( "context" "encoding/json" "net/netip" "sync" "time" v1 "k8s.io/api/core/v1" networkingapiv1alpha1 "k8s.io/api/networking/v1alpha1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" metav1apply "k8s.io/client-go/applyconfigurations/meta/v1" networkingapiv1alpha1apply "k8s.io/client-go/applyconfigurations/networking/v1alpha1" networkinginformers "k8s.io/client-go/informers/networking/v1alpha1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" networkinglisters "k8s.io/client-go/listers/networking/v1alpha1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/util/iptree" netutils "k8s.io/utils/net" ) const ( // maxRetries is the max number of times a service object will be retried before it is dropped out of the queue. // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the // sequence of delays between successive queuings of a service. // // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s maxRetries = 15 controllerName = "service-cidr-controller" ServiceCIDRProtectionFinalizer = "networking.k8s.io/service-cidr-finalizer" // deletionGracePeriod is the time in seconds to wait to remove the finalizer from a ServiceCIDR to ensure the // deletion informations has been propagated to the apiserver allocators to avoid allocating any IP address // before we complete delete the ServiceCIDR deletionGracePeriod = 10 * time.Second ) // NewController returns a new *Controller. func NewController( ctx context.Context, serviceCIDRInformer networkinginformers.ServiceCIDRInformer, ipAddressInformer networkinginformers.IPAddressInformer, client clientset.Interface, ) *Controller { broadcaster := record.NewBroadcaster(record.WithContext(ctx)) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) c := &Controller{ client: client, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"), tree: iptree.New[sets.Set[string]](), workerLoopPeriod: time.Second, } _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addServiceCIDR, UpdateFunc: c.updateServiceCIDR, DeleteFunc: c.deleteServiceCIDR, }) c.serviceCIDRLister = serviceCIDRInformer.Lister() c.serviceCIDRsSynced = serviceCIDRInformer.Informer().HasSynced _, _ = ipAddressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addIPAddress, DeleteFunc: c.deleteIPAddress, }) c.ipAddressLister = ipAddressInformer.Lister() c.ipAddressSynced = ipAddressInformer.Informer().HasSynced c.eventBroadcaster = broadcaster c.eventRecorder = recorder return c } // Controller manages selector-based service ipAddress. type Controller struct { client clientset.Interface eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder serviceCIDRLister networkinglisters.ServiceCIDRLister serviceCIDRsSynced cache.InformerSynced ipAddressLister networkinglisters.IPAddressLister ipAddressSynced cache.InformerSynced queue workqueue.RateLimitingInterface // workerLoopPeriod is the time between worker runs. The workers process the queue of service and ipRange changes. workerLoopPeriod time.Duration // tree store the ServiceCIDRs names associated to each muTree sync.Mutex tree *iptree.Tree[sets.Set[string]] } // Run will not return until stopCh is closed. func (c *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() c.eventBroadcaster.StartStructuredLogging(3) c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) defer c.eventBroadcaster.Shutdown() logger := klog.FromContext(ctx) logger.Info("Starting", "controller", controllerName) defer logger.Info("Shutting down", "controller", controllerName) if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), c.serviceCIDRsSynced, c.ipAddressSynced) { return } for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, c.worker, c.workerLoopPeriod) } <-ctx.Done() } func (c *Controller) addServiceCIDR(obj interface{}) { cidr, ok := obj.(*networkingapiv1alpha1.ServiceCIDR) if !ok { return } c.queue.Add(cidr.Name) for _, key := range c.overlappingServiceCIDRs(cidr) { c.queue.Add(key) } } func (c *Controller) updateServiceCIDR(oldObj, obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { c.queue.Add(key) } } // deleteServiceCIDR func (c *Controller) deleteServiceCIDR(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { c.queue.Add(key) } } // addIPAddress may block a ServiceCIDR deletion func (c *Controller) addIPAddress(obj interface{}) { ip, ok := obj.(*networkingapiv1alpha1.IPAddress) if !ok { return } for _, cidr := range c.containingServiceCIDRs(ip) { c.queue.Add(cidr) } } // deleteIPAddress may unblock a ServiceCIDR deletion func (c *Controller) deleteIPAddress(obj interface{}) { ip, ok := obj.(*networkingapiv1alpha1.IPAddress) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return } ip, ok = tombstone.Obj.(*networkingapiv1alpha1.IPAddress) if !ok { return } } for _, cidr := range c.containingServiceCIDRs(ip) { c.queue.Add(cidr) } } // overlappingServiceCIDRs, given a ServiceCIDR return the ServiceCIDRs that contain or are contained, // this is required because adding or removing a CIDR will require to recompute the // state of each ServiceCIDR to check if can be unblocked on deletion. func (c *Controller) overlappingServiceCIDRs(serviceCIDR *networkingapiv1alpha1.ServiceCIDR) []string { c.muTree.Lock() defer c.muTree.Unlock() serviceCIDRs := sets.New[string]() for _, cidr := range serviceCIDR.Spec.CIDRs { if prefix, err := netip.ParsePrefix(cidr); err == nil { // if is empty err will not be nil c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool { serviceCIDRs.Insert(v.UnsortedList()...) return false }) c.tree.WalkPrefix(prefix, func(k netip.Prefix, v sets.Set[string]) bool { serviceCIDRs.Insert(v.UnsortedList()...) return false }) } } return serviceCIDRs.UnsortedList() } // containingServiceCIDRs, given an IPAddress return the ServiceCIDRs that contains the IP, // as it may block or be blocking the deletion of the ServiceCIDRs that contain it. func (c *Controller) containingServiceCIDRs(ip *networkingapiv1alpha1.IPAddress) []string { // only process IPs managed by the kube-apiserver managedBy, ok := ip.Labels[networkingapiv1alpha1.LabelManagedBy] if !ok || managedBy != ipallocator.ControllerName { return []string{} } address, err := netip.ParseAddr(ip.Name) if err != nil { // This should not happen, the IPAddress object validates // the name is a valid IPAddress return []string{} } c.muTree.Lock() defer c.muTree.Unlock() serviceCIDRs := []string{} // walk the tree to get all the ServiceCIDRs that contain this IP address prefixes := c.tree.GetHostIPPrefixMatches(address) for _, v := range prefixes { serviceCIDRs = append(serviceCIDRs, v.UnsortedList()...) } return serviceCIDRs } func (c *Controller) worker(ctx context.Context) { for c.processNext(ctx) { } } func (c *Controller) processNext(ctx context.Context) bool { eKey, quit := c.queue.Get() if quit { return false } defer c.queue.Done(eKey) key := eKey.(string) err := c.sync(ctx, key) if err == nil { c.queue.Forget(key) return true } logger := klog.FromContext(ctx) if c.queue.NumRequeues(key) < maxRetries { logger.V(2).Info("Error syncing ServiceCIDR, retrying", "ServiceCIDR", key, "err", err) c.queue.AddRateLimited(key) } else { logger.Info("Dropping ServiceCIDR out of the queue", "ServiceCIDR", key, "err", err) c.queue.Forget(key) utilruntime.HandleError(err) } return true } // syncCIDRs rebuilds the radix tree based from the informers cache func (c *Controller) syncCIDRs() error { serviceCIDRList, err := c.serviceCIDRLister.List(labels.Everything()) if err != nil { return err } // track the names of the different ServiceCIDRs, there // can be multiple ServiceCIDRs sharing the same prefixes // and this is important to determine if a ServiceCIDR can // be deleted. tree := iptree.New[sets.Set[string]]() for _, serviceCIDR := range serviceCIDRList { for _, cidr := range serviceCIDR.Spec.CIDRs { if prefix, err := netip.ParsePrefix(cidr); err == nil { // if is empty err will not be nil // if the prefix already exist append the new ServiceCIDR name v, ok := tree.GetPrefix(prefix) if !ok { v = sets.Set[string]{} } v.Insert(serviceCIDR.Name) tree.InsertPrefix(prefix, v) } } } c.muTree.Lock() defer c.muTree.Unlock() c.tree = tree return nil } func (c *Controller) sync(ctx context.Context, key string) error { logger := klog.FromContext(ctx) startTime := time.Now() defer func() { logger.V(4).Info("Finished syncing ServiceCIDR)", "ServiceCIDR", key, "elapsed", time.Since(startTime)) }() // TODO(aojea) verify if this present a performance problem // restore the radix tree from the current state err := c.syncCIDRs() if err != nil { return err } logger.V(4).Info("syncing ServiceCIDR", "ServiceCIDR", key) cidr, err := c.serviceCIDRLister.Get(key) if err != nil { if apierrors.IsNotFound(err) { logger.V(4).Info("ServiceCIDR no longer exist", "ServiceCIDR", key) return nil } return err } // Deleting .... if !cidr.GetDeletionTimestamp().IsZero() { // check if the existing ServiceCIDR can be deleted before removing the finalizer ok, err := c.canDeleteCIDR(ctx, cidr) if err != nil { return err } if !ok { // update the status to indicate why the ServiceCIDR can not be deleted, // it will be reevaludated by an event on any ServiceCIDR or IPAddress related object // that may remove this condition. svcApplyStatus := networkingapiv1alpha1apply.ServiceCIDRStatus().WithConditions( metav1apply.Condition(). WithType(networkingapiv1alpha1.ServiceCIDRConditionReady). WithStatus(metav1.ConditionFalse). WithReason(networkingapiv1alpha1.ServiceCIDRReasonTerminating). WithMessage("There are still IPAddresses referencing the ServiceCIDR, please remove them or create a new ServiceCIDR"). WithLastTransitionTime(metav1.Now())) svcApply := networkingapiv1alpha1apply.ServiceCIDR(cidr.Name).WithStatus(svcApplyStatus) _, err = c.client.NetworkingV1alpha1().ServiceCIDRs().ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true}) return err } // If there are no IPAddress depending on this ServiceCIDR is safe to remove it, // however, there can be a race when the allocators still consider the ServiceCIDR // ready and allocate a new IPAddress from them, to avoid that, we wait during a // a grace period to be sure the deletion change has been propagated to the allocators // and no new IPAddress is going to be allocated. timeUntilDeleted := deletionGracePeriod - time.Since(cidr.GetDeletionTimestamp().Time) if timeUntilDeleted > 0 { c.queue.AddAfter(key, timeUntilDeleted) return nil } return c.removeServiceCIDRFinalizerIfNeeded(ctx, cidr) } // Created or Updated, the ServiceCIDR must have a finalizer. err = c.addServiceCIDRFinalizerIfNeeded(ctx, cidr) if err != nil { return err } // Set Ready condition to True. svcApplyStatus := networkingapiv1alpha1apply.ServiceCIDRStatus().WithConditions( metav1apply.Condition(). WithType(networkingapiv1alpha1.ServiceCIDRConditionReady). WithStatus(metav1.ConditionTrue). WithMessage("Kubernetes Service CIDR is ready"). WithLastTransitionTime(metav1.Now())) svcApply := networkingapiv1alpha1apply.ServiceCIDR(cidr.Name).WithStatus(svcApplyStatus) if _, err := c.client.NetworkingV1alpha1().ServiceCIDRs().ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true}); err != nil { logger.Info("error updating default ServiceCIDR status", "error", err) c.eventRecorder.Eventf(cidr, v1.EventTypeWarning, "KubernetesServiceCIDRError", "The ServiceCIDR Status can not be set to Ready=True") return err } return nil } // canDeleteCIDR checks that the ServiceCIDR can be safely deleted and not leave orphan IPAddresses func (c *Controller) canDeleteCIDR(ctx context.Context, serviceCIDR *networkingapiv1alpha1.ServiceCIDR) (bool, error) { // TODO(aojea) Revisit the lock usage and if we need to keep it only for the tree operations // to avoid holding it during the whole operation. c.muTree.Lock() defer c.muTree.Unlock() logger := klog.FromContext(ctx) // Check if there is a subnet that already contains the ServiceCIDR that is going to be deleted. hasParent := true for _, cidr := range serviceCIDR.Spec.CIDRs { // Walk the tree to find if there is a larger subnet that contains the existing one, // or there is another ServiceCIDR with the same subnet. if prefix, err := netip.ParsePrefix(cidr); err == nil { serviceCIDRs := sets.New[string]() c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool { serviceCIDRs.Insert(v.UnsortedList()...) return false }) if serviceCIDRs.Len() == 1 && serviceCIDRs.Has(serviceCIDR.Name) { hasParent = false } } } // All the existing IP addresses will be contained on the parent ServiceCIDRs, // it is safe to delete, remove the finalizer. if hasParent { logger.V(2).Info("Removing finalizer for ServiceCIDR", "ServiceCIDR", serviceCIDR.String()) return true, nil } // TODO: optimize this // Since current ServiceCIDR does not have another ServiceCIDR containing it, // verify there are no existing IPAddresses referencing it that will be orphan. for _, cidr := range serviceCIDR.Spec.CIDRs { // get all the IPv4 addresses ipLabelSelector := labels.Set(map[string]string{ networkingapiv1alpha1.LabelIPAddressFamily: string(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))), networkingapiv1alpha1.LabelManagedBy: ipallocator.ControllerName, }).AsSelectorPreValidated() ips, err := c.ipAddressLister.List(ipLabelSelector) if err != nil { return false, err } for _, ip := range ips { // if the longest prefix match is the ServiceCIDR to be deleted // and is the only existing one, at least one IPAddress will be // orphan, block the ServiceCIDR deletion. address, err := netip.ParseAddr(ip.Name) if err != nil { // the IPAddress object validates that the name is a valid IPAddress logger.Info("[SHOULD NOT HAPPEN] unexpected error parsing IPAddress", "IPAddress", ip.Name, "error", err) continue } // walk the tree to find all ServiceCIDRs containing this IP prefixes := c.tree.GetHostIPPrefixMatches(address) if len(prefixes) != 1 { continue } for _, v := range prefixes { if v.Len() == 1 && v.Has(serviceCIDR.Name) { return false, nil } } } } // There are no IPAddresses that depend on the existing ServiceCIDR, so // it is safe to delete, remove finalizer. logger.Info("ServiceCIDR no longer have orphan IPs", "ServiceCDIR", serviceCIDR.String()) return true, nil } func (c *Controller) addServiceCIDRFinalizerIfNeeded(ctx context.Context, cidr *networkingapiv1alpha1.ServiceCIDR) error { for _, f := range cidr.GetFinalizers() { if f == ServiceCIDRProtectionFinalizer { return nil } } patch := map[string]interface{}{ "metadata": map[string]interface{}{ "finalizers": []string{ServiceCIDRProtectionFinalizer}, }, } patchBytes, err := json.Marshal(patch) if err != nil { return err } _, err = c.client.NetworkingV1alpha1().ServiceCIDRs().Patch(ctx, cidr.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } klog.FromContext(ctx).V(4).Info("Added protection finalizer to ServiceCIDR", "ServiceCIDR", cidr.Name) return nil } func (c *Controller) removeServiceCIDRFinalizerIfNeeded(ctx context.Context, cidr *networkingapiv1alpha1.ServiceCIDR) error { found := false for _, f := range cidr.GetFinalizers() { if f == ServiceCIDRProtectionFinalizer { found = true break } } if !found { return nil } patch := map[string]interface{}{ "metadata": map[string]interface{}{ "$deleteFromPrimitiveList/finalizers": []string{ServiceCIDRProtectionFinalizer}, }, } patchBytes, err := json.Marshal(patch) if err != nil { return err } _, err = c.client.NetworkingV1alpha1().ServiceCIDRs().Patch(ctx, cidr.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } klog.FromContext(ctx).V(4).Info("Removed protection finalizer from ServiceCIDRs", "ServiceCIDR", cidr.Name) return nil } // Convert netutils.IPFamily to v1.IPFamily // TODO: consolidate helpers // copied from pkg/proxy/util/utils.go func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily { switch ipFamily { case netutils.IPv4: return v1.IPv4Protocol case netutils.IPv6: return v1.IPv6Protocol } return v1.IPFamilyUnknown }