...

Source file src/k8s.io/kubernetes/pkg/registry/core/service/ipallocator/cidrallocator.go

Documentation: k8s.io/kubernetes/pkg/registry/core/service/ipallocator

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package ipallocator
    18  
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"net/netip"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    31  	"k8s.io/apimachinery/pkg/util/runtime"
    32  	"k8s.io/apimachinery/pkg/util/sets"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
    35  	networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
    36  	networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1"
    37  	"k8s.io/client-go/tools/cache"
    38  	"k8s.io/client-go/util/workqueue"
    39  	"k8s.io/klog/v2"
    40  	api "k8s.io/kubernetes/pkg/apis/core"
    41  	"k8s.io/kubernetes/pkg/util/iptree"
    42  	netutils "k8s.io/utils/net"
    43  )
    44  
    45  // MetaAllocator maintains a Tree with the ServiceCIDRs containing an IP Allocator
    46  // on the nodes. Since each allocator doesn't stored the IPAddresses because it reads
    47  // them from the informer cache, it is cheap to create and delete IP Allocators.
    48  // MetaAllocator forwards the request to any of the internal allocators that has free
    49  // addresses.
    50  
    51  // MetaAllocator implements current allocator interface using
    52  // ServiceCIDR and IPAddress API objects.
    53  type MetaAllocator struct {
    54  	client            networkingv1alpha1client.NetworkingV1alpha1Interface
    55  	serviceCIDRLister networkingv1alpha1listers.ServiceCIDRLister
    56  	serviceCIDRSynced cache.InformerSynced
    57  	ipAddressLister   networkingv1alpha1listers.IPAddressLister
    58  	ipAddressSynced   cache.InformerSynced
    59  	ipAddressInformer networkingv1alpha1informers.IPAddressInformer
    60  	queue             workqueue.RateLimitingInterface
    61  
    62  	internalStopCh chan struct{}
    63  
    64  	muTree sync.Mutex
    65  	tree   *iptree.Tree[*Allocator]
    66  
    67  	ipFamily api.IPFamily
    68  }
    69  
    70  var _ Interface = &MetaAllocator{}
    71  
    72  // NewMetaAllocator returns an IP allocator that use the IPAddress
    73  // and ServiceCIDR objects to track the assigned IP addresses,
    74  // using an informer cache as storage.
    75  func NewMetaAllocator(
    76  	client networkingv1alpha1client.NetworkingV1alpha1Interface,
    77  	serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer,
    78  	ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
    79  	isIPv6 bool,
    80  ) (*MetaAllocator, error) {
    81  
    82  	// TODO: make the NewMetaAllocator agnostic of the IP family
    83  	family := api.IPv4Protocol
    84  	if isIPv6 {
    85  		family = api.IPv6Protocol
    86  	}
    87  
    88  	c := &MetaAllocator{
    89  		client:            client,
    90  		serviceCIDRLister: serviceCIDRInformer.Lister(),
    91  		serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
    92  		ipAddressLister:   ipAddressInformer.Lister(),
    93  		ipAddressSynced:   ipAddressInformer.Informer().HasSynced,
    94  		ipAddressInformer: ipAddressInformer,
    95  		queue:             workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: ControllerName}),
    96  		internalStopCh:    make(chan struct{}),
    97  		tree:              iptree.New[*Allocator](),
    98  		ipFamily:          family,
    99  	}
   100  
   101  	_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   102  		AddFunc:    c.addServiceCIDR,
   103  		UpdateFunc: c.updateServiceCIDR,
   104  		DeleteFunc: c.deleteServiceCIDR,
   105  	})
   106  
   107  	go c.run()
   108  
   109  	return c, nil
   110  }
   111  
   112  func (c *MetaAllocator) addServiceCIDR(obj interface{}) {
   113  	key, err := cache.MetaNamespaceKeyFunc(obj)
   114  	if err == nil {
   115  		c.queue.Add(key)
   116  	}
   117  }
   118  func (c *MetaAllocator) updateServiceCIDR(old, new interface{}) {
   119  	key, err := cache.MetaNamespaceKeyFunc(new)
   120  	if err == nil {
   121  		c.queue.Add(key)
   122  	}
   123  }
   124  
   125  func (c *MetaAllocator) deleteServiceCIDR(obj interface{}) {
   126  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   127  	if err == nil {
   128  		c.queue.Add(key)
   129  	}
   130  }
   131  
   132  func (c *MetaAllocator) run() {
   133  	defer runtime.HandleCrash()
   134  	defer c.queue.ShutDown()
   135  	klog.Info("Starting ServiceCIDR Allocator Controller")
   136  	defer klog.Info("Stopping ServiceCIDR Allocator Controllerr")
   137  
   138  	// Wait for all involved caches to be synced, before processing items from the queue is started
   139  	if !cache.WaitForCacheSync(c.internalStopCh, c.serviceCIDRSynced, c.ipAddressSynced) {
   140  		runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
   141  		return
   142  	}
   143  
   144  	// this is single threaded only one serviceCIDR at a time
   145  	go wait.Until(c.runWorker, time.Second, c.internalStopCh)
   146  
   147  	<-c.internalStopCh
   148  }
   149  
   150  func (c *MetaAllocator) runWorker() {
   151  	for c.processNextItem() {
   152  	}
   153  }
   154  
   155  func (c *MetaAllocator) processNextItem() bool {
   156  	// Wait until there is a new item in the working queue
   157  	key, quit := c.queue.Get()
   158  	if quit {
   159  		return false
   160  	}
   161  	defer c.queue.Done(key)
   162  
   163  	err := c.syncTree()
   164  	// Handle the error if something went wrong during the execution of the business logic
   165  	if err != nil {
   166  		if c.queue.NumRequeues(key) < 5 {
   167  			klog.Infof("Error syncing cidr %v: %v", key, err)
   168  			c.queue.AddRateLimited(key)
   169  			return true
   170  		}
   171  	}
   172  	c.queue.Forget(key)
   173  	return true
   174  }
   175  
   176  // syncTree syncs the ipTrees from the informer cache
   177  // It deletes or creates allocator and sets the corresponding state
   178  func (c *MetaAllocator) syncTree() error {
   179  	now := time.Now()
   180  	defer func() {
   181  		klog.V(2).Infof("Finished sync for CIDRs took %v", time.Since(now))
   182  	}()
   183  
   184  	serviceCIDRs, err := c.serviceCIDRLister.List(labels.Everything())
   185  	if err != nil {
   186  		return err
   187  	}
   188  
   189  	cidrsSet := sets.New[string]()
   190  	cidrReady := map[string]bool{}
   191  	for _, serviceCIDR := range serviceCIDRs {
   192  		ready := true
   193  		if !isReady(serviceCIDR) || !serviceCIDR.DeletionTimestamp.IsZero() {
   194  			ready = false
   195  		}
   196  
   197  		for _, cidr := range serviceCIDR.Spec.CIDRs {
   198  			if c.ipFamily == api.IPFamily(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))) {
   199  				cidrsSet.Insert(cidr)
   200  				cidrReady[cidr] = ready
   201  			}
   202  		}
   203  	}
   204  
   205  	// obtain the existing allocators and set the existing state
   206  	treeSet := sets.New[string]()
   207  	c.muTree.Lock()
   208  	c.tree.DepthFirstWalk(c.ipFamily == api.IPv6Protocol, func(k netip.Prefix, v *Allocator) bool {
   209  		v.ready.Store(cidrReady[k.String()])
   210  		treeSet.Insert(k.String())
   211  		return false
   212  	})
   213  	c.muTree.Unlock()
   214  	cidrsToRemove := treeSet.Difference(cidrsSet)
   215  	cidrsToAdd := cidrsSet.Difference(treeSet)
   216  
   217  	errs := []error{}
   218  	// Add new allocators
   219  	for _, cidr := range cidrsToAdd.UnsortedList() {
   220  		_, ipnet, err := netutils.ParseCIDRSloppy(cidr)
   221  		if err != nil {
   222  			return err
   223  		}
   224  		// New ServiceCIDR, create new allocator
   225  		allocator, err := NewIPAllocator(ipnet, c.client, c.ipAddressInformer)
   226  		if err != nil {
   227  			errs = append(errs, err)
   228  			continue
   229  		}
   230  		allocator.ready.Store(cidrReady[cidr])
   231  		prefix, err := netip.ParsePrefix(cidr)
   232  		if err != nil {
   233  			return err
   234  		}
   235  		c.addAllocator(prefix, allocator)
   236  		klog.Infof("Created ClusterIP allocator for Service CIDR %s", cidr)
   237  	}
   238  	// Remove allocators that no longer exist
   239  	for _, cidr := range cidrsToRemove.UnsortedList() {
   240  		prefix, err := netip.ParsePrefix(cidr)
   241  		if err != nil {
   242  			return err
   243  		}
   244  		c.deleteAllocator(prefix)
   245  	}
   246  
   247  	return utilerrors.NewAggregate(errs)
   248  }
   249  
   250  func (c *MetaAllocator) getAllocator(ip net.IP) (*Allocator, error) {
   251  	c.muTree.Lock()
   252  	defer c.muTree.Unlock()
   253  
   254  	address := ipToAddr(ip)
   255  	prefix := netip.PrefixFrom(address, address.BitLen())
   256  	// Use the largest subnet to allocate addresses because
   257  	// all the other subnets will be contained.
   258  	_, allocator, ok := c.tree.ShortestPrefixMatch(prefix)
   259  	if !ok {
   260  		klog.V(2).Infof("Could not get allocator for IP %s", ip.String())
   261  		return nil, ErrMismatchedNetwork
   262  	}
   263  	return allocator, nil
   264  }
   265  
   266  func (c *MetaAllocator) addAllocator(cidr netip.Prefix, allocator *Allocator) {
   267  	c.muTree.Lock()
   268  	defer c.muTree.Unlock()
   269  	c.tree.InsertPrefix(cidr, allocator)
   270  }
   271  
   272  func (c *MetaAllocator) deleteAllocator(cidr netip.Prefix) {
   273  	c.muTree.Lock()
   274  	defer c.muTree.Unlock()
   275  	ok := c.tree.DeletePrefix(cidr)
   276  	if ok {
   277  		klog.V(3).Infof("CIDR %s deleted", cidr)
   278  	}
   279  }
   280  
   281  func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error {
   282  	allocator, err := c.getAllocator(ip)
   283  	if err != nil {
   284  		return err
   285  	}
   286  	return allocator.AllocateService(service, ip)
   287  }
   288  
   289  func (c *MetaAllocator) Allocate(ip net.IP) error {
   290  	allocator, err := c.getAllocator(ip)
   291  	if err != nil {
   292  		return err
   293  	}
   294  	return allocator.Allocate(ip)
   295  }
   296  
   297  func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) {
   298  	c.muTree.Lock()
   299  	defer c.muTree.Unlock()
   300  
   301  	// TODO(aojea) add strategy to return a random allocator but
   302  	// taking into consideration the number of addresses of each allocator.
   303  	// Per example, if we have allocator A and B with 256 and 1024 possible
   304  	// addresses each, the chances to get B has to be 4 times the chances to
   305  	// get A so we can spread the load of IPs randomly.
   306  	// However, we need to validate the best strategy before going to Beta.
   307  	isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
   308  	for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
   309  		ip, err := allocator.AllocateNextService(service)
   310  		if err == nil {
   311  			return ip, nil
   312  		}
   313  	}
   314  	return nil, ErrFull
   315  }
   316  
   317  func (c *MetaAllocator) AllocateNext() (net.IP, error) {
   318  	c.muTree.Lock()
   319  	defer c.muTree.Unlock()
   320  
   321  	// TODO(aojea) add strategy to return a random allocator but
   322  	// taking into consideration the number of addresses of each allocator.
   323  	// Per example, if we have allocator A and B with 256 and 1024 possible
   324  	// addresses each, the chances to get B has to be 4 times the chances to
   325  	// get A so we can spread the load of IPs randomly.
   326  	// However, we need to validate the best strategy before going to Beta.
   327  	isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
   328  	for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
   329  		ip, err := allocator.AllocateNext()
   330  		if err == nil {
   331  			return ip, nil
   332  		}
   333  	}
   334  	return nil, ErrFull
   335  }
   336  
   337  func (c *MetaAllocator) Release(ip net.IP) error {
   338  	allocator, err := c.getAllocator(ip)
   339  	if err != nil {
   340  		return err
   341  	}
   342  	return allocator.Release(ip)
   343  
   344  }
   345  func (c *MetaAllocator) ForEach(f func(ip net.IP)) {
   346  	ipLabelSelector := labels.Set(map[string]string{
   347  		networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()),
   348  		networkingv1alpha1.LabelManagedBy:       ControllerName,
   349  	}).AsSelectorPreValidated()
   350  	ips, err := c.ipAddressLister.List(ipLabelSelector)
   351  	if err != nil {
   352  		return
   353  	}
   354  	for _, ip := range ips {
   355  		f(netutils.ParseIPSloppy(ip.Name))
   356  	}
   357  }
   358  
   359  func (c *MetaAllocator) CIDR() net.IPNet {
   360  	return net.IPNet{}
   361  
   362  }
   363  func (c *MetaAllocator) IPFamily() api.IPFamily {
   364  	return c.ipFamily
   365  }
   366  func (c *MetaAllocator) Has(ip net.IP) bool {
   367  	allocator, err := c.getAllocator(ip)
   368  	if err != nil {
   369  		return false
   370  	}
   371  	return allocator.Has(ip)
   372  }
   373  func (c *MetaAllocator) Destroy() {
   374  	select {
   375  	case <-c.internalStopCh:
   376  	default:
   377  		close(c.internalStopCh)
   378  	}
   379  }
   380  
   381  // for testing
   382  func (c *MetaAllocator) Used() int {
   383  	ipLabelSelector := labels.Set(map[string]string{
   384  		networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()),
   385  		networkingv1alpha1.LabelManagedBy:       ControllerName,
   386  	}).AsSelectorPreValidated()
   387  	ips, err := c.ipAddressLister.List(ipLabelSelector)
   388  	if err != nil {
   389  		return 0
   390  	}
   391  	return len(ips)
   392  }
   393  
   394  // for testing
   395  func (c *MetaAllocator) Free() int {
   396  	c.muTree.Lock()
   397  	defer c.muTree.Unlock()
   398  
   399  	size := 0
   400  	isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
   401  	for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
   402  		size += int(allocator.size)
   403  	}
   404  	return size - c.Used()
   405  }
   406  
   407  func (c *MetaAllocator) EnableMetrics() {}
   408  
   409  // DryRun returns a random allocator
   410  func (c *MetaAllocator) DryRun() Interface {
   411  	c.muTree.Lock()
   412  	defer c.muTree.Unlock()
   413  	isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
   414  	for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
   415  		return allocator.DryRun()
   416  	}
   417  	return &Allocator{}
   418  }
   419  
   420  func isReady(serviceCIDR *networkingv1alpha1.ServiceCIDR) bool {
   421  	if serviceCIDR == nil {
   422  		return false
   423  	}
   424  
   425  	for _, condition := range serviceCIDR.Status.Conditions {
   426  		if condition.Type == networkingv1alpha1.ServiceCIDRConditionReady {
   427  			return condition.Status == metav1.ConditionStatus(metav1.ConditionTrue)
   428  		}
   429  	}
   430  	// assume the ServiceCIDR is Ready, in order to handle scenarios where kcm is not running
   431  	return true
   432  }
   433  
   434  // ipToAddr converts a net.IP to a netip.Addr
   435  // if the net.IP is not valid it returns an empty netip.Addr{}
   436  func ipToAddr(ip net.IP) netip.Addr {
   437  	// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
   438  	// so we have to check the IP family to return exactly the format that we want
   439  	// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
   440  	// an address like ::ffff:192.168.0.1/32
   441  	bytes := ip.To4()
   442  	if bytes == nil {
   443  		bytes = ip.To16()
   444  	}
   445  	// AddrFromSlice returns Addr{}, false if the input is invalid.
   446  	address, _ := netip.AddrFromSlice(bytes)
   447  	return address
   448  }
   449  
   450  // Convert netutils.IPFamily to v1.IPFamily
   451  // TODO: consolidate helpers
   452  // copied from pkg/proxy/util/utils.go
   453  func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
   454  	switch ipFamily {
   455  	case netutils.IPv4:
   456  		return v1.IPv4Protocol
   457  	case netutils.IPv6:
   458  		return v1.IPv6Protocol
   459  	}
   460  
   461  	return v1.IPFamilyUnknown
   462  }
   463  

View as plain text