...

Source file src/k8s.io/kubernetes/pkg/registry/core/service/ipallocator/ipallocator.go

Documentation: k8s.io/kubernetes/pkg/registry/core/service/ipallocator

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package ipallocator
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"math/big"
    24  	"math/rand"
    25  	"net"
    26  	"net/netip"
    27  	"sync/atomic"
    28  	"time"
    29  
    30  	networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
    35  	networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
    36  	networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1"
    37  	"k8s.io/client-go/tools/cache"
    38  	"k8s.io/klog/v2"
    39  	api "k8s.io/kubernetes/pkg/apis/core"
    40  	netutils "k8s.io/utils/net"
    41  	utiltrace "k8s.io/utils/trace"
    42  )
    43  
    44  const ControllerName = "ipallocator.k8s.io"
    45  
    46  // Allocator implements current ipallocator interface using IPAddress API object
    47  // and an informer as backend.
    48  type Allocator struct {
    49  	cidr          *net.IPNet
    50  	prefix        netip.Prefix
    51  	firstAddress  netip.Addr   // first IP address within the range
    52  	offsetAddress netip.Addr   // IP address that delimits the upper and lower subranges
    53  	lastAddress   netip.Addr   // last IP address within the range
    54  	family        api.IPFamily // family is the IP family of this range
    55  
    56  	rangeOffset int    // subdivides the assigned IP range to prefer dynamic allocation from the upper range
    57  	size        uint64 // cap the total number of IPs available to maxInt64
    58  
    59  	client          networkingv1alpha1client.NetworkingV1alpha1Interface
    60  	ipAddressLister networkingv1alpha1listers.IPAddressLister
    61  	ipAddressSynced cache.InformerSynced
    62  	// ready indicates if the allocator is able to allocate new IP addresses.
    63  	// This is required because it depends on the ServiceCIDR to be ready.
    64  	ready atomic.Bool
    65  
    66  	// metrics is a metrics recorder that can be disabled
    67  	metrics     metricsRecorderInterface
    68  	metricLabel string
    69  
    70  	rand *rand.Rand
    71  }
    72  
    73  var _ Interface = &Allocator{}
    74  
    75  // NewIPAllocator returns an IP allocator associated to a network range
    76  // that use the IPAddress objectto track the assigned IP addresses,
    77  // using an informer cache as storage.
    78  func NewIPAllocator(
    79  	cidr *net.IPNet,
    80  	client networkingv1alpha1client.NetworkingV1alpha1Interface,
    81  	ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
    82  ) (*Allocator, error) {
    83  	prefix, err := netip.ParsePrefix(cidr.String())
    84  	if err != nil {
    85  		return nil, err
    86  	}
    87  
    88  	if prefix.Addr().Is6() && prefix.Bits() < 64 {
    89  		return nil, fmt.Errorf("shortest allowed prefix length for service CIDR is 64, got %d", prefix.Bits())
    90  	}
    91  
    92  	// TODO: use the utils/net function once is available
    93  	size := hostsPerNetwork(cidr)
    94  	var family api.IPFamily
    95  	if netutils.IsIPv6CIDR(cidr) {
    96  		family = api.IPv6Protocol
    97  	} else {
    98  		family = api.IPv4Protocol
    99  	}
   100  	// Caching the first, offset and last addresses allows to optimize
   101  	// the search loops by using the netip.Addr iterator instead
   102  	// of having to do conversions with IP addresses.
   103  	// Don't allocate the network's ".0" address.
   104  	ipFirst := prefix.Masked().Addr().Next()
   105  	if err != nil {
   106  		return nil, err
   107  	}
   108  	// Use the broadcast address as last address for IPv6
   109  	ipLast, err := broadcastAddress(prefix)
   110  	if err != nil {
   111  		return nil, err
   112  	}
   113  	// For IPv4 don't use the network's broadcast address
   114  	if family == api.IPv4Protocol {
   115  		ipLast = ipLast.Prev()
   116  	}
   117  	// KEP-3070: Reserve Service IP Ranges For Dynamic and Static IP Allocation
   118  	// calculate the subrange offset
   119  	rangeOffset := calculateRangeOffset(cidr)
   120  	offsetAddress, err := addOffsetAddress(ipFirst, uint64(rangeOffset))
   121  	if err != nil {
   122  		return nil, err
   123  	}
   124  	a := &Allocator{
   125  		cidr:            cidr,
   126  		prefix:          prefix,
   127  		firstAddress:    ipFirst,
   128  		lastAddress:     ipLast,
   129  		rangeOffset:     rangeOffset,
   130  		offsetAddress:   offsetAddress,
   131  		size:            size,
   132  		family:          family,
   133  		client:          client,
   134  		ipAddressLister: ipAddressInformer.Lister(),
   135  		ipAddressSynced: ipAddressInformer.Informer().HasSynced,
   136  		metrics:         &emptyMetricsRecorder{}, // disabled by default
   137  		metricLabel:     cidr.String(),
   138  		rand:            rand.New(rand.NewSource(time.Now().UnixNano())),
   139  	}
   140  	a.ready.Store(true)
   141  	return a, nil
   142  }
   143  
   144  func (a *Allocator) createIPAddress(name string, svc *api.Service, scope string) error {
   145  	ipAddress := networkingv1alpha1.IPAddress{
   146  		ObjectMeta: metav1.ObjectMeta{
   147  			Name: name,
   148  			Labels: map[string]string{
   149  				networkingv1alpha1.LabelIPAddressFamily: string(a.IPFamily()),
   150  				networkingv1alpha1.LabelManagedBy:       ControllerName,
   151  			},
   152  		},
   153  		Spec: networkingv1alpha1.IPAddressSpec{
   154  			ParentRef: serviceToRef(svc),
   155  		},
   156  	}
   157  	_, err := a.client.IPAddresses().Create(context.Background(), &ipAddress, metav1.CreateOptions{})
   158  	if err != nil {
   159  		// update metrics
   160  		a.metrics.incrementAllocationErrors(a.metricLabel, scope)
   161  		if apierrors.IsAlreadyExists(err) {
   162  			return ErrAllocated
   163  		}
   164  		return err
   165  	}
   166  	// update metrics
   167  	a.metrics.incrementAllocations(a.metricLabel, scope)
   168  	a.metrics.setAllocated(a.metricLabel, a.Used())
   169  	a.metrics.setAvailable(a.metricLabel, a.Free())
   170  	return nil
   171  }
   172  
   173  // Allocate attempts to reserve the provided IP. ErrNotInRange or
   174  // ErrAllocated will be returned if the IP is not valid for this range
   175  // or has already been reserved.  ErrFull will be returned if there
   176  // are no addresses left.
   177  // Only for testing, it will fail to create the IPAddress object because
   178  // the Service reference is required.
   179  func (a *Allocator) Allocate(ip net.IP) error {
   180  	return a.AllocateService(nil, ip)
   181  }
   182  
   183  // AllocateService attempts to reserve the provided IP. ErrNotInRange or
   184  // ErrAllocated will be returned if the IP is not valid for this range
   185  // or has already been reserved.  ErrFull will be returned if there
   186  // are no addresses left.
   187  func (a *Allocator) AllocateService(svc *api.Service, ip net.IP) error {
   188  	return a.allocateService(svc, ip, dryRunFalse)
   189  }
   190  
   191  func (a *Allocator) allocateService(svc *api.Service, ip net.IP, dryRun bool) error {
   192  	if !a.ready.Load() || !a.ipAddressSynced() {
   193  		return ErrNotReady
   194  	}
   195  	addr, err := netip.ParseAddr(ip.String())
   196  	if err != nil {
   197  		return err
   198  	}
   199  
   200  	// check address is within the range of available addresses
   201  	if addr.Less(a.firstAddress) || // requested address is lower than the first address in the subnet
   202  		a.lastAddress.Less(addr) { // the last address in the subnet is lower than the requested address
   203  		if !dryRun {
   204  			// update metrics
   205  			a.metrics.incrementAllocationErrors(a.metricLabel, "static")
   206  		}
   207  		return &ErrNotInRange{ip, a.prefix.String()}
   208  	}
   209  	if dryRun {
   210  		return nil
   211  	}
   212  	return a.createIPAddress(ip.String(), svc, "static")
   213  }
   214  
   215  // AllocateNext return an IP address that wasn't allocated yet.
   216  // Only for testing, it will fail to create the IPAddress object because
   217  // the Service reference is required.
   218  func (a *Allocator) AllocateNext() (net.IP, error) {
   219  	return a.AllocateNextService(nil)
   220  }
   221  
   222  // AllocateNext return an IP address that wasn't allocated yet.
   223  func (a *Allocator) AllocateNextService(svc *api.Service) (net.IP, error) {
   224  	return a.allocateNextService(svc, dryRunFalse)
   225  }
   226  
   227  // allocateNextService tries to allocate a free IP address within the subnet.
   228  // If the subnet is big enough, it partitions the subnet into two subranges,
   229  // delimited by a.rangeOffset.
   230  // It tries to allocate a free IP address from the upper subnet first and
   231  // falls back to the lower subnet.
   232  // It starts allocating from a random IP within each range.
   233  func (a *Allocator) allocateNextService(svc *api.Service, dryRun bool) (net.IP, error) {
   234  	if !a.ready.Load() || !a.ipAddressSynced() {
   235  		return nil, ErrNotReady
   236  	}
   237  	if dryRun {
   238  		// Don't bother finding a free value. It's racy and not worth the
   239  		// effort to plumb any further.
   240  		return a.CIDR().IP, nil
   241  	}
   242  
   243  	trace := utiltrace.New("allocate dynamic ClusterIP address")
   244  	defer trace.LogIfLong(500 * time.Millisecond)
   245  
   246  	// rand.Int63n panics for n <= 0 so we need to avoid problems when
   247  	// converting from uint64 to int64
   248  	rangeSize := a.size - uint64(a.rangeOffset)
   249  	var offset uint64
   250  	switch {
   251  	case rangeSize >= math.MaxInt64:
   252  		offset = rand.Uint64()
   253  	case rangeSize == 0:
   254  		return net.IP{}, ErrFull
   255  	default:
   256  		offset = uint64(a.rand.Int63n(int64(rangeSize)))
   257  	}
   258  	iterator := ipIterator(a.offsetAddress, a.lastAddress, offset)
   259  	ip, err := a.allocateFromRange(iterator, svc)
   260  	if err == nil {
   261  		return ip, nil
   262  	}
   263  	// check the lower range
   264  	if a.rangeOffset != 0 {
   265  		offset = uint64(a.rand.Intn(a.rangeOffset))
   266  		iterator = ipIterator(a.firstAddress, a.offsetAddress.Prev(), offset)
   267  		ip, err = a.allocateFromRange(iterator, svc)
   268  		if err == nil {
   269  			return ip, nil
   270  		}
   271  	}
   272  	// update metrics
   273  	a.metrics.incrementAllocationErrors(a.metricLabel, "dynamic")
   274  	return net.IP{}, ErrFull
   275  }
   276  
   277  // IP iterator allows to iterate over all the IP addresses
   278  // in a range defined by the start and last address.
   279  // It starts iterating at the address position defined by the offset.
   280  // It returns an invalid address to indicate it hasfinished.
   281  func ipIterator(first netip.Addr, last netip.Addr, offset uint64) func() netip.Addr {
   282  	// There are no modulo operations for IP addresses
   283  	modulo := func(addr netip.Addr) netip.Addr {
   284  		if addr.Compare(last) == 1 {
   285  			return first
   286  		}
   287  		return addr
   288  	}
   289  	next := func(addr netip.Addr) netip.Addr {
   290  		return modulo(addr.Next())
   291  	}
   292  	start, err := addOffsetAddress(first, offset)
   293  	if err != nil {
   294  		return func() netip.Addr { return netip.Addr{} }
   295  	}
   296  	start = modulo(start)
   297  	ip := start
   298  	seen := false
   299  	return func() netip.Addr {
   300  		value := ip
   301  		// is the last or the first iteration
   302  		if value == start {
   303  			if seen {
   304  				return netip.Addr{}
   305  			}
   306  			seen = true
   307  		}
   308  		ip = next(ip)
   309  		return value
   310  	}
   311  
   312  }
   313  
   314  // allocateFromRange allocates an empty IP address from the range of
   315  // IPs between the first and last address (both included), starting
   316  // from the start address.
   317  // TODO: this is a linear search, it can be optimized.
   318  func (a *Allocator) allocateFromRange(iterator func() netip.Addr, svc *api.Service) (net.IP, error) {
   319  	for {
   320  		ip := iterator()
   321  		if !ip.IsValid() {
   322  			break
   323  		}
   324  		name := ip.String()
   325  		_, err := a.ipAddressLister.Get(name)
   326  		// continue if ip already exist
   327  		if err == nil {
   328  			continue
   329  		}
   330  		if !apierrors.IsNotFound(err) {
   331  			klog.Infof("unexpected error: %v", err)
   332  			continue
   333  		}
   334  		// address is not present on the cache, try to allocate it
   335  		err = a.createIPAddress(name, svc, "dynamic")
   336  		// an error can happen if there is a race and our informer was not updated
   337  		// swallow the error and try with the next IP address
   338  		if err != nil {
   339  			klog.Infof("can not create IPAddress %s: %v", name, err)
   340  			continue
   341  		}
   342  		return ip.AsSlice(), nil
   343  	}
   344  	return net.IP{}, ErrFull
   345  }
   346  
   347  // Release releases the IP back to the pool. Releasing an
   348  // unallocated IP or an IP out of the range is a no-op and
   349  // returns no error.
   350  func (a *Allocator) Release(ip net.IP) error {
   351  	return a.release(ip, dryRunFalse)
   352  }
   353  
   354  func (a *Allocator) release(ip net.IP, dryRun bool) error {
   355  	if dryRun {
   356  		return nil
   357  	}
   358  	name := ip.String()
   359  	// Try to Delete the IPAddress independently of the cache state.
   360  	// The error is ignored for compatibility reasons.
   361  	err := a.client.IPAddresses().Delete(context.Background(), name, metav1.DeleteOptions{})
   362  	if err == nil {
   363  		// update metrics
   364  		a.metrics.setAllocated(a.metricLabel, a.Used())
   365  		a.metrics.setAvailable(a.metricLabel, a.Free())
   366  		return nil
   367  	}
   368  	klog.Infof("error releasing ip %s : %v", name, err)
   369  	return nil
   370  }
   371  
   372  // ForEach executes the function on each allocated IP
   373  // This is required to satisfy the Allocator Interface only
   374  func (a *Allocator) ForEach(f func(net.IP)) {
   375  	ipLabelSelector := labels.Set(map[string]string{
   376  		networkingv1alpha1.LabelIPAddressFamily: string(a.IPFamily()),
   377  		networkingv1alpha1.LabelManagedBy:       ControllerName,
   378  	}).AsSelectorPreValidated()
   379  	ips, err := a.ipAddressLister.List(ipLabelSelector)
   380  	if err != nil {
   381  		return
   382  	}
   383  	for _, ip := range ips {
   384  		f(netutils.ParseIPSloppy(ip.Name))
   385  	}
   386  }
   387  
   388  func (a *Allocator) CIDR() net.IPNet {
   389  	return *a.cidr
   390  }
   391  
   392  // for testing
   393  func (a *Allocator) Has(ip net.IP) bool {
   394  	// convert IP to name
   395  	name := ip.String()
   396  	ipAddress, err := a.client.IPAddresses().Get(context.Background(), name, metav1.GetOptions{})
   397  	if err != nil || len(ipAddress.Name) == 0 {
   398  		return false
   399  	}
   400  	return true
   401  }
   402  
   403  func (a *Allocator) IPFamily() api.IPFamily {
   404  	return a.family
   405  }
   406  
   407  // for testing, it assumes this is the allocator is unique for the ipFamily
   408  func (a *Allocator) Used() int {
   409  	ipLabelSelector := labels.Set(map[string]string{
   410  		networkingv1alpha1.LabelIPAddressFamily: string(a.IPFamily()),
   411  		networkingv1alpha1.LabelManagedBy:       ControllerName,
   412  	}).AsSelectorPreValidated()
   413  	ips, err := a.ipAddressLister.List(ipLabelSelector)
   414  	if err != nil {
   415  		return 0
   416  	}
   417  	return len(ips)
   418  }
   419  
   420  // for testing, it assumes this is the allocator is unique for the ipFamily
   421  func (a *Allocator) Free() int {
   422  	return int(a.size) - a.Used()
   423  }
   424  
   425  // Destroy
   426  func (a *Allocator) Destroy() {
   427  }
   428  
   429  // DryRun
   430  func (a *Allocator) DryRun() Interface {
   431  	return dryRunAllocator{a}
   432  }
   433  
   434  // EnableMetrics
   435  func (a *Allocator) EnableMetrics() {
   436  	registerMetrics()
   437  	a.metrics = &metricsRecorder{}
   438  }
   439  
   440  // dryRunRange is a shim to satisfy Interface without persisting state.
   441  type dryRunAllocator struct {
   442  	real *Allocator
   443  }
   444  
   445  func (dry dryRunAllocator) Allocate(ip net.IP) error {
   446  	return dry.real.allocateService(nil, ip, dryRunTrue)
   447  
   448  }
   449  
   450  func (dry dryRunAllocator) AllocateNext() (net.IP, error) {
   451  	return dry.real.allocateNextService(nil, dryRunTrue)
   452  }
   453  
   454  func (dry dryRunAllocator) Release(ip net.IP) error {
   455  	return dry.real.release(ip, dryRunTrue)
   456  }
   457  
   458  func (dry dryRunAllocator) ForEach(cb func(net.IP)) {
   459  	dry.real.ForEach(cb)
   460  }
   461  
   462  func (dry dryRunAllocator) CIDR() net.IPNet {
   463  	return dry.real.CIDR()
   464  }
   465  
   466  func (dry dryRunAllocator) IPFamily() api.IPFamily {
   467  	return dry.real.IPFamily()
   468  }
   469  
   470  func (dry dryRunAllocator) DryRun() Interface {
   471  	return dry
   472  }
   473  
   474  func (dry dryRunAllocator) Has(ip net.IP) bool {
   475  	return dry.real.Has(ip)
   476  }
   477  
   478  func (dry dryRunAllocator) Destroy() {
   479  }
   480  
   481  func (dry dryRunAllocator) EnableMetrics() {
   482  }
   483  
   484  // addOffsetAddress returns the address at the provided offset within the subnet
   485  // TODO: move it to k8s.io/utils/net, this is the same as current AddIPOffset()
   486  // but using netip.Addr instead of net.IP
   487  func addOffsetAddress(address netip.Addr, offset uint64) (netip.Addr, error) {
   488  	addressBytes := address.AsSlice()
   489  	addressBig := big.NewInt(0).SetBytes(addressBytes)
   490  	r := big.NewInt(0).Add(addressBig, big.NewInt(int64(offset))).Bytes()
   491  	// r must be 4 or 16 bytes depending of the ip family
   492  	// bigInt conversion to bytes will not take this into consideration
   493  	// and drop the leading zeros, so we have to take this into account.
   494  	lenDiff := len(addressBytes) - len(r)
   495  	if lenDiff > 0 {
   496  		r = append(make([]byte, lenDiff), r...)
   497  	} else if lenDiff < 0 {
   498  		return netip.Addr{}, fmt.Errorf("invalid address %v", r)
   499  	}
   500  	addr, ok := netip.AddrFromSlice(r)
   501  	if !ok {
   502  		return netip.Addr{}, fmt.Errorf("invalid address %v", r)
   503  	}
   504  	return addr, nil
   505  }
   506  
   507  // hostsPerNetwork returns the number of available hosts in a subnet.
   508  // The max number is limited by the size of an uint64.
   509  // Number of hosts is calculated with the formula:
   510  // IPv4: 2^x – 2, not consider network and broadcast address
   511  // IPv6: 2^x - 1, not consider network address
   512  // where x is the number of host bits in the subnet.
   513  func hostsPerNetwork(subnet *net.IPNet) uint64 {
   514  	ones, bits := subnet.Mask.Size()
   515  	// this checks that we are not overflowing an int64
   516  	if bits-ones >= 64 {
   517  		return math.MaxUint64
   518  	}
   519  	max := uint64(1) << uint(bits-ones)
   520  	// Don't use the network's ".0" address,
   521  	if max == 0 {
   522  		return 0
   523  	}
   524  	max--
   525  	if netutils.IsIPv4CIDR(subnet) {
   526  		// Don't use the IPv4 network's broadcast address
   527  		if max == 0 {
   528  			return 0
   529  		}
   530  		max--
   531  	}
   532  	return max
   533  }
   534  
   535  // broadcastAddress returns the broadcast address of the subnet
   536  // The broadcast address is obtained by setting all the host bits
   537  // in a subnet to 1.
   538  // network 192.168.0.0/24 : subnet bits 24 host bits 32 - 24 = 8
   539  // broadcast address 192.168.0.255
   540  func broadcastAddress(subnet netip.Prefix) (netip.Addr, error) {
   541  	base := subnet.Masked().Addr()
   542  	bytes := base.AsSlice()
   543  	// get all the host bits from the subnet
   544  	n := 8*len(bytes) - subnet.Bits()
   545  	// set all the host bits to 1
   546  	for i := len(bytes) - 1; i >= 0 && n > 0; i-- {
   547  		if n >= 8 {
   548  			bytes[i] = 0xff
   549  			n -= 8
   550  		} else {
   551  			mask := ^uint8(0) >> (8 - n)
   552  			bytes[i] |= mask
   553  			break
   554  		}
   555  	}
   556  
   557  	addr, ok := netip.AddrFromSlice(bytes)
   558  	if !ok {
   559  		return netip.Addr{}, fmt.Errorf("invalid address %v", bytes)
   560  	}
   561  	return addr, nil
   562  }
   563  
   564  // serviceToRef obtain the Service Parent Reference
   565  func serviceToRef(svc *api.Service) *networkingv1alpha1.ParentReference {
   566  	if svc == nil {
   567  		return nil
   568  	}
   569  
   570  	return &networkingv1alpha1.ParentReference{
   571  		Group:     "",
   572  		Resource:  "services",
   573  		Namespace: svc.Namespace,
   574  		Name:      svc.Name,
   575  	}
   576  }
   577  

View as plain text