
Source file src/k8s.io/kubernetes/pkg/registry/core/service/storage/alloc.go

Documentation: k8s.io/kubernetes/pkg/registry/core/service/storage

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package storage
    19  import (
    20  	"fmt"
    21  	"net"
    23  	"k8s.io/apimachinery/pkg/api/errors"
    24  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    25  	"k8s.io/apimachinery/pkg/util/validation/field"
    26  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    27  	"k8s.io/klog/v2"
    28  	apiservice "k8s.io/kubernetes/pkg/api/service"
    29  	api "k8s.io/kubernetes/pkg/apis/core"
    30  	"k8s.io/kubernetes/pkg/apis/core/validation"
    31  	"k8s.io/kubernetes/pkg/features"
    32  	"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
    33  	"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
    34  	netutils "k8s.io/utils/net"
    35  )
    37  // Allocators encapsulates the various allocators (IPs, ports) used in
    38  // Services.
    39  type Allocators struct {
    40  	serviceIPAllocatorsByFamily map[api.IPFamily]ipallocator.Interface
    41  	defaultServiceIPFamily      api.IPFamily // --service-cluster-ip-range[0]
    42  	serviceNodePorts            portallocator.Interface
    43  }
    45  // ServiceNodePort includes protocol and port number of a service NodePort.
    46  type ServiceNodePort struct {
    47  	// The IP protocol for this port. Supports "TCP" and "UDP".
    48  	Protocol api.Protocol
    50  	// The port on each node on which this service is exposed.
    51  	// Default is to auto-allocate a port if the ServiceType of this Service requires one.
    52  	NodePort int32
    53  }
    55  // This is a trasitionary function to facilitate service REST flattening.
    56  func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) Allocators {
    57  	return Allocators{
    58  		defaultServiceIPFamily:      defaultFamily,
    59  		serviceIPAllocatorsByFamily: ipAllocs,
    60  		serviceNodePorts:            portAlloc,
    61  	}
    62  }
    64  func (al *Allocators) allocateCreate(service *api.Service, dryRun bool) (transaction, error) {
    65  	result := metaTransaction{}
    66  	success := false
    68  	defer func() {
    69  		if !success {
    70  			result.Revert()
    71  		}
    72  	}()
    74  	// Ensure IP family fields are correctly initialized.  We do it here, since
    75  	// we want this to be visible even when dryRun == true.
    76  	if err := al.initIPFamilyFields(After{service}, Before{nil}); err != nil {
    77  		return nil, err
    78  	}
    80  	// Allocate ClusterIPs
    81  	//TODO(thockin): validation should not pass with empty clusterIP, but it
    82  	//does (and is tested!).  Fixing that all is a big PR and will have to
    83  	//happen later.
    84  	if txn, err := al.txnAllocClusterIPs(service, dryRun); err != nil {
    85  		return nil, err
    86  	} else {
    87  		result = append(result, txn)
    88  	}
    90  	// Allocate ports
    91  	if txn, err := al.txnAllocNodePorts(service, dryRun); err != nil {
    92  		return nil, err
    93  	} else {
    94  		result = append(result, txn)
    95  	}
    97  	success = true
    98  	return result, nil
    99  }
   101  // attempts to default service ip families according to cluster configuration
   102  // while ensuring that provided families are configured on cluster.
   103  func (al *Allocators) initIPFamilyFields(after After, before Before) error {
   104  	oldService, service := before.Service, after.Service
   106  	// can not do anything here
   107  	if service.Spec.Type == api.ServiceTypeExternalName {
   108  		return nil
   109  	}
   111  	// We don't want to auto-upgrade (add an IP) or downgrade (remove an IP)
   112  	// PreferDualStack services following a cluster change to/from
   113  	// dual-stackness.
   114  	//
   115  	// That means a PreferDualStack service will only be upgraded/downgraded
   116  	// when:
   117  	// - changing ipFamilyPolicy to "RequireDualStack" or "SingleStack" AND
   118  	// - adding or removing a secondary clusterIP or ipFamily
   119  	if isMatchingPreferDualStackClusterIPFields(after, before) {
   120  		return nil // nothing more to do.
   121  	}
   123  	// If the user didn't specify ipFamilyPolicy, we can infer a default.  We
   124  	// don't want a static default because we want to make sure that we never
   125  	// change between single- and dual-stack modes with explicit direction, as
   126  	// provided by ipFamilyPolicy.  Consider these cases:
   127  	//   * Create (POST): If they didn't specify a policy we can assume it's
   128  	//     always SingleStack.
   129  	//   * Update (PUT): If they didn't specify a policy we need to adopt the
   130  	//     policy from before.  This is better than always assuming SingleStack
   131  	//     because a PUT that changes clusterIPs from 2 to 1 value but doesn't
   132  	//     specify ipFamily would work.
   133  	//   * Update (PATCH): If they didn't specify a policy it will adopt the
   134  	//     policy from before.
   135  	if service.Spec.IPFamilyPolicy == nil {
   136  		if oldService != nil && oldService.Spec.IPFamilyPolicy != nil {
   137  			// Update from an object with policy, use the old policy
   138  			service.Spec.IPFamilyPolicy = oldService.Spec.IPFamilyPolicy
   139  		} else if service.Spec.ClusterIP == api.ClusterIPNone && len(service.Spec.Selector) == 0 {
   140  			// Special-case: headless + selectorless defaults to dual.
   141  			requireDualStack := api.IPFamilyPolicyRequireDualStack
   142  			service.Spec.IPFamilyPolicy = &requireDualStack
   143  		} else {
   144  			// create or update from an object without policy (e.g.
   145  			// ExternalName) to one that needs policy
   146  			singleStack := api.IPFamilyPolicySingleStack
   147  			service.Spec.IPFamilyPolicy = &singleStack
   148  		}
   149  	}
   150  	// Henceforth we can assume ipFamilyPolicy is set.
   152  	// Do some loose pre-validation of the input.  This makes it easier in the
   153  	// rest of allocation code to not have to consider corner cases.
   154  	// TODO(thockin): when we tighten validation (e.g. to require IPs) we will
   155  	// need a "strict" and a "loose" form of this.
   156  	if el := validation.ValidateServiceClusterIPsRelatedFields(service); len(el) != 0 {
   157  		return errors.NewInvalid(api.Kind("Service"), service.Name, el)
   158  	}
   160  	//TODO(thockin): Move this logic to validation?
   161  	el := make(field.ErrorList, 0)
   163  	// Update-only prep work.
   164  	if oldService != nil {
   165  		if getIPFamilyPolicy(service) == api.IPFamilyPolicySingleStack {
   166  			// As long as ClusterIPs and IPFamilies have not changed, setting
   167  			// the policy to single-stack is clear intent.
   168  			// ClusterIPs[0] is immutable, so it is safe to keep.
   169  			if sameClusterIPs(oldService, service) && len(service.Spec.ClusterIPs) > 1 {
   170  				service.Spec.ClusterIPs = service.Spec.ClusterIPs[0:1]
   171  			}
   172  			if sameIPFamilies(oldService, service) && len(service.Spec.IPFamilies) > 1 {
   173  				service.Spec.IPFamilies = service.Spec.IPFamilies[0:1]
   174  			}
   175  		} else {
   176  			// If the policy is anything but single-stack AND they reduced these
   177  			// fields, it's an error.  They need to specify policy.
   178  			if reducedClusterIPs(After{service}, Before{oldService}) {
   179  				el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
   180  					"must be 'SingleStack' to release the secondary cluster IP"))
   181  			}
   182  			if reducedIPFamilies(After{service}, Before{oldService}) {
   183  				el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
   184  					"must be 'SingleStack' to release the secondary IP family"))
   185  			}
   186  		}
   187  	}
   189  	// Make sure ipFamilyPolicy makes sense for the provided ipFamilies and
   190  	// clusterIPs.  Further checks happen below - after the special cases.
   191  	if getIPFamilyPolicy(service) == api.IPFamilyPolicySingleStack {
   192  		if len(service.Spec.ClusterIPs) == 2 {
   193  			el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
   194  				"must be 'RequireDualStack' or 'PreferDualStack' when multiple cluster IPs are specified"))
   195  		}
   196  		if len(service.Spec.IPFamilies) == 2 {
   197  			el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
   198  				"must be 'RequireDualStack' or 'PreferDualStack' when multiple IP families are specified"))
   199  		}
   200  	}
   202  	// Infer IPFamilies[] from ClusterIPs[].  Further checks happen below,
   203  	// after the special cases.
   204  	for i, ip := range service.Spec.ClusterIPs {
   205  		if ip == api.ClusterIPNone {
   206  			break
   207  		}
   209  		// We previously validated that IPs are well-formed and that if an
   210  		// ipFamilies[] entry exists it matches the IP.
   211  		fam := familyOf(ip)
   213  		// If the corresponding family is not specified, add it.
   214  		if i >= len(service.Spec.IPFamilies) {
   215  			// Families are checked more later, but this is a better error in
   216  			// this specific case (indicating the user-provided IP, rather
   217  			// than than the auto-assigned family).
   218  			if _, found := al.serviceIPAllocatorsByFamily[fam]; !found {
   219  				el = append(el, field.Invalid(field.NewPath("spec", "clusterIPs").Index(i), service.Spec.ClusterIPs,
   220  					fmt.Sprintf("%s is not configured on this cluster", fam)))
   221  			} else {
   222  				// OK to infer.
   223  				service.Spec.IPFamilies = append(service.Spec.IPFamilies, fam)
   224  			}
   225  		}
   226  	}
   228  	// If we have validation errors, bail out now so we don't make them worse.
   229  	if len(el) > 0 {
   230  		return errors.NewInvalid(api.Kind("Service"), service.Name, el)
   231  	}
   233  	// Special-case: headless + selectorless.  This has to happen before other
   234  	// checks because it explicitly allows combinations of inputs that would
   235  	// otherwise be errors.
   236  	if service.Spec.ClusterIP == api.ClusterIPNone && len(service.Spec.Selector) == 0 {
   237  		// If IPFamilies was not set by the user, start with the default
   238  		// family.
   239  		if len(service.Spec.IPFamilies) == 0 {
   240  			service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily}
   241  		}
   243  		// this follows headful services. With one exception on a single stack
   244  		// cluster the user is allowed to create headless services that has multi families
   245  		// the validation allows it
   246  		if len(service.Spec.IPFamilies) < 2 {
   247  			if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack {
   248  				// add the alt ipfamily
   249  				if service.Spec.IPFamilies[0] == api.IPv4Protocol {
   250  					service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
   251  				} else {
   252  					service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol)
   253  				}
   254  			}
   255  		}
   257  		// nothing more needed here
   258  		return nil
   259  	}
   261  	//
   262  	// Everything below this MUST happen *after* the above special cases.
   263  	//
   265  	// Demanding dual-stack on a non dual-stack cluster.
   266  	if getIPFamilyPolicy(service) == api.IPFamilyPolicyRequireDualStack {
   267  		if len(al.serviceIPAllocatorsByFamily) < 2 {
   268  			el = append(el, field.Invalid(field.NewPath("spec", "ipFamilyPolicy"), service.Spec.IPFamilyPolicy,
   269  				"this cluster is not configured for dual-stack services"))
   270  		}
   271  	}
   273  	// If there is a family requested then it has to be configured on cluster.
   274  	for i, ipFamily := range service.Spec.IPFamilies {
   275  		if _, found := al.serviceIPAllocatorsByFamily[ipFamily]; !found {
   276  			el = append(el, field.Invalid(field.NewPath("spec", "ipFamilies").Index(i), ipFamily, "not configured on this cluster"))
   277  		}
   278  	}
   280  	// If we have validation errors, don't bother with the rest.
   281  	if len(el) > 0 {
   282  		return errors.NewInvalid(api.Kind("Service"), service.Name, el)
   283  	}
   285  	// nil families, gets cluster default
   286  	if len(service.Spec.IPFamilies) == 0 {
   287  		service.Spec.IPFamilies = []api.IPFamily{al.defaultServiceIPFamily}
   288  	}
   290  	// If this service is looking for dual-stack and this cluster does have two
   291  	// families, append the missing family.
   292  	if *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicySingleStack &&
   293  		len(service.Spec.IPFamilies) == 1 &&
   294  		len(al.serviceIPAllocatorsByFamily) == 2 {
   296  		if service.Spec.IPFamilies[0] == api.IPv4Protocol {
   297  			service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv6Protocol)
   298  		} else if service.Spec.IPFamilies[0] == api.IPv6Protocol {
   299  			service.Spec.IPFamilies = append(service.Spec.IPFamilies, api.IPv4Protocol)
   300  		}
   301  	}
   303  	return nil
   304  }
   306  func (al *Allocators) txnAllocClusterIPs(service *api.Service, dryRun bool) (transaction, error) {
   307  	// clusterIPs that were allocated may need to be released in case of
   308  	// failure at a higher level.
   309  	allocated, err := al.allocClusterIPs(service, dryRun)
   310  	if err != nil {
   311  		return nil, err
   312  	}
   314  	txn := callbackTransaction{
   315  		revert: func() {
   316  			if dryRun {
   317  				return
   318  			}
   319  			actuallyReleased, err := al.releaseIPs(allocated)
   320  			if err != nil {
   321  				klog.ErrorS(err, "failed to clean up after failed service create",
   322  					"service", klog.KObj(service),
   323  					"shouldRelease", allocated,
   324  					"released", actuallyReleased)
   325  			}
   326  		},
   327  		commit: func() {
   328  			if !dryRun {
   329  				if len(allocated) > 0 {
   330  					klog.InfoS("allocated clusterIPs",
   331  						"service", klog.KObj(service),
   332  						"clusterIPs", allocated)
   333  				}
   334  			}
   335  		},
   336  	}
   337  	return txn, nil
   338  }
   340  // allocates ClusterIPs for a service
   341  func (al *Allocators) allocClusterIPs(service *api.Service, dryRun bool) (map[api.IPFamily]string, error) {
   342  	// external name don't get ClusterIPs
   343  	if service.Spec.Type == api.ServiceTypeExternalName {
   344  		return nil, nil
   345  	}
   347  	// headless don't get ClusterIPs
   348  	if len(service.Spec.ClusterIPs) > 0 && service.Spec.ClusterIPs[0] == api.ClusterIPNone {
   349  		return nil, nil
   350  	}
   352  	toAlloc := make(map[api.IPFamily]string)
   353  	// at this stage, the only fact we know is that service has correct ip families
   354  	// assigned to it. It may have partial assigned ClusterIPs (Upgrade to dual stack)
   355  	// may have no ips at all. The below loop is meant to fix this
   356  	// (we also know that this cluster has these families)
   358  	// if there is no slice to work with
   359  	if service.Spec.ClusterIPs == nil {
   360  		service.Spec.ClusterIPs = make([]string, 0, len(service.Spec.IPFamilies))
   361  	}
   363  	for i, ipFamily := range service.Spec.IPFamilies {
   364  		if i > (len(service.Spec.ClusterIPs) - 1) {
   365  			service.Spec.ClusterIPs = append(service.Spec.ClusterIPs, "" /* just a marker */)
   366  		}
   368  		toAlloc[ipFamily] = service.Spec.ClusterIPs[i]
   369  	}
   371  	// allocate
   372  	allocated, err := al.allocIPs(service, toAlloc, dryRun)
   374  	// set if successful
   375  	if err == nil {
   376  		for family, ip := range allocated {
   377  			for i, check := range service.Spec.IPFamilies {
   378  				if family == check {
   379  					service.Spec.ClusterIPs[i] = ip
   380  					// while we technically don't need to do that testing rest does not
   381  					// go through conversion logic but goes through validation *sigh*.
   382  					// so we set ClusterIP here as well
   383  					// because the testing code expects valid (as they are output-ed from conversion)
   384  					// as it patches fields
   385  					if i == 0 {
   386  						service.Spec.ClusterIP = ip
   387  					}
   388  				}
   389  			}
   390  		}
   391  	}
   393  	return allocated, err
   394  }
   396  func (al *Allocators) allocIPs(service *api.Service, toAlloc map[api.IPFamily]string, dryRun bool) (map[api.IPFamily]string, error) {
   397  	allocated := make(map[api.IPFamily]string)
   399  	for family, ip := range toAlloc {
   400  		allocator := al.serviceIPAllocatorsByFamily[family] // should always be there, as we pre validate
   401  		if dryRun {
   402  			allocator = allocator.DryRun()
   403  		}
   404  		if ip == "" {
   405  			var allocatedIP net.IP
   406  			var err error
   407  			if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
   408  				// TODO: simplify this and avoid all this duplicate code
   409  				svcAllocator, ok := allocator.(*ipallocator.MetaAllocator)
   410  				if ok {
   411  					allocatedIP, err = svcAllocator.AllocateNextService(service)
   412  				} else {
   413  					allocatedIP, err = allocator.AllocateNext()
   414  				}
   415  			} else {
   416  				allocatedIP, err = allocator.AllocateNext()
   417  			}
   418  			if err != nil {
   419  				return allocated, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err))
   420  			}
   421  			allocated[family] = allocatedIP.String()
   422  		} else {
   423  			parsedIP := netutils.ParseIPSloppy(ip)
   424  			if parsedIP == nil {
   425  				return allocated, errors.NewInternalError(fmt.Errorf("failed to parse service IP %q", ip))
   426  			}
   427  			var err error
   428  			if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
   429  				// TODO: simplify this and avoid all this duplicate code
   430  				svcAllocator, ok := allocator.(*ipallocator.MetaAllocator)
   431  				if ok {
   432  					err = svcAllocator.AllocateService(service, parsedIP)
   433  				} else {
   434  					err = allocator.Allocate(parsedIP)
   435  				}
   436  			} else {
   437  				err = allocator.Allocate(parsedIP)
   438  			}
   439  			if err != nil {
   440  				el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIPs"), service.Spec.ClusterIPs, fmt.Sprintf("failed to allocate IP %v: %v", ip, err))}
   441  				return allocated, errors.NewInvalid(api.Kind("Service"), service.Name, el)
   442  			}
   443  			allocated[family] = ip
   444  		}
   445  	}
   446  	return allocated, nil
   447  }
   449  // releases clusterIPs per family
   450  func (al *Allocators) releaseIPs(toRelease map[api.IPFamily]string) (map[api.IPFamily]string, error) {
   451  	if toRelease == nil {
   452  		return nil, nil
   453  	}
   455  	released := make(map[api.IPFamily]string)
   456  	for family, ip := range toRelease {
   457  		allocator, ok := al.serviceIPAllocatorsByFamily[family]
   458  		if !ok {
   459  			// Maybe the cluster was previously configured for dual-stack,
   460  			// then switched to single-stack?
   461  			klog.InfoS("Not releasing ClusterIP because related family is not enabled", "clusterIP", ip, "family", family)
   462  			continue
   463  		}
   465  		parsedIP := netutils.ParseIPSloppy(ip)
   466  		if parsedIP == nil {
   467  			return released, errors.NewInternalError(fmt.Errorf("failed to parse service IP %q", ip))
   468  		}
   469  		if err := allocator.Release(parsedIP); err != nil {
   470  			return released, err
   471  		}
   472  		released[family] = ip
   473  	}
   475  	return released, nil
   476  }
   478  func (al *Allocators) txnAllocNodePorts(service *api.Service, dryRun bool) (transaction, error) {
   479  	// The allocator tracks dry-run-ness internally.
   480  	nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun)
   482  	txn := callbackTransaction{
   483  		commit: func() {
   484  			nodePortOp.Commit()
   485  			// We don't NEED to call Finish() here, but for that package says
   486  			// to, so for future-safety, we will.
   487  			nodePortOp.Finish()
   488  		},
   489  		revert: func() {
   490  			// Weirdly named but this will revert if commit wasn't called
   491  			nodePortOp.Finish()
   492  		},
   493  	}
   495  	// Allocate NodePorts, if needed.
   496  	if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
   497  		if err := initNodePorts(service, nodePortOp); err != nil {
   498  			txn.Revert()
   499  			return nil, err
   500  		}
   501  	}
   503  	// Handle ExternalTraffic related fields during service creation.
   504  	if apiservice.NeedsHealthCheck(service) {
   505  		if err := al.allocHealthCheckNodePort(service, nodePortOp); err != nil {
   506  			txn.Revert()
   507  			return nil, errors.NewInternalError(err)
   508  		}
   509  	}
   511  	return txn, nil
   512  }
   514  func initNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
   515  	svcPortToNodePort := map[int]int{}
   516  	for i := range service.Spec.Ports {
   517  		servicePort := &service.Spec.Ports[i]
   518  		if servicePort.NodePort == 0 && !shouldAllocateNodePorts(service) {
   519  			// Don't allocate new ports, but do respect specific requests.
   520  			continue
   521  		}
   522  		allocatedNodePort := svcPortToNodePort[int(servicePort.Port)]
   523  		if allocatedNodePort == 0 {
   524  			// This will only scan forward in the service.Spec.Ports list because any matches
   525  			// before the current port would have been found in svcPortToNodePort. This is really
   526  			// looking for any user provided values.
   527  			np := findRequestedNodePort(int(servicePort.Port), service.Spec.Ports)
   528  			if np != 0 {
   529  				err := nodePortOp.Allocate(np)
   530  				if err != nil {
   531  					// TODO: when validation becomes versioned, this gets more complicated.
   532  					el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), np, err.Error())}
   533  					return errors.NewInvalid(api.Kind("Service"), service.Name, el)
   534  				}
   535  				servicePort.NodePort = int32(np)
   536  				svcPortToNodePort[int(servicePort.Port)] = np
   537  			} else {
   538  				nodePort, err := nodePortOp.AllocateNext()
   539  				if err != nil {
   540  					// TODO: what error should be returned here?  It's not a
   541  					// field-level validation failure (the field is valid), and it's
   542  					// not really an internal error.
   543  					return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
   544  				}
   545  				servicePort.NodePort = int32(nodePort)
   546  				svcPortToNodePort[int(servicePort.Port)] = nodePort
   547  			}
   548  		} else if int(servicePort.NodePort) != allocatedNodePort {
   549  			// TODO(xiangpengzhao): do we need to allocate a new NodePort in this case?
   550  			// Note: the current implementation is better, because it saves a NodePort.
   551  			if servicePort.NodePort == 0 {
   552  				servicePort.NodePort = int32(allocatedNodePort)
   553  			} else {
   554  				err := nodePortOp.Allocate(int(servicePort.NodePort))
   555  				if err != nil {
   556  					// TODO: when validation becomes versioned, this gets more complicated.
   557  					el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), servicePort.NodePort, err.Error())}
   558  					return errors.NewInvalid(api.Kind("Service"), service.Name, el)
   559  				}
   560  			}
   561  		}
   562  	}
   564  	return nil
   565  }
   567  // allocHealthCheckNodePort allocates health check node port to service.
   568  func (al *Allocators) allocHealthCheckNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
   569  	healthCheckNodePort := service.Spec.HealthCheckNodePort
   570  	if healthCheckNodePort != 0 {
   571  		// If the request has a health check nodePort in mind, attempt to reserve it.
   572  		err := nodePortOp.Allocate(int(healthCheckNodePort))
   573  		if err != nil {
   574  			return fmt.Errorf("failed to allocate requested HealthCheck NodePort %v: %v",
   575  				healthCheckNodePort, err)
   576  		}
   577  	} else {
   578  		// If the request has no health check nodePort specified, allocate any.
   579  		healthCheckNodePort, err := nodePortOp.AllocateNext()
   580  		if err != nil {
   581  			return fmt.Errorf("failed to allocate a HealthCheck NodePort %v: %v", healthCheckNodePort, err)
   582  		}
   583  		service.Spec.HealthCheckNodePort = int32(healthCheckNodePort)
   584  	}
   585  	return nil
   586  }
   588  func (al *Allocators) allocateUpdate(after After, before Before, dryRun bool) (transaction, error) {
   589  	result := metaTransaction{}
   590  	success := false
   592  	defer func() {
   593  		if !success {
   594  			result.Revert()
   595  		}
   596  	}()
   598  	// Ensure IP family fields are correctly initialized.  We do it here, since
   599  	// we want this to be visible even when dryRun == true.
   600  	if err := al.initIPFamilyFields(after, before); err != nil {
   601  		return nil, err
   602  	}
   604  	// Allocate ClusterIPs
   605  	//TODO(thockin): validation should not pass with empty clusterIP, but it
   606  	//does (and is tested!).  Fixing that all is a big PR and will have to
   607  	//happen later.
   608  	if txn, err := al.txnUpdateClusterIPs(after, before, dryRun); err != nil {
   609  		return nil, err
   610  	} else {
   611  		result = append(result, txn)
   612  	}
   614  	// Allocate ports
   615  	if txn, err := al.txnUpdateNodePorts(after, before, dryRun); err != nil {
   616  		return nil, err
   617  	} else {
   618  		result = append(result, txn)
   619  	}
   621  	success = true
   622  	return result, nil
   623  }
   625  func (al *Allocators) txnUpdateClusterIPs(after After, before Before, dryRun bool) (transaction, error) {
   626  	service := after.Service
   628  	allocated, released, err := al.updateClusterIPs(after, before, dryRun)
   629  	if err != nil {
   630  		return nil, err
   631  	}
   633  	// on failure: Any newly allocated IP must be released back
   634  	// on failure: Any previously allocated IP that would have been released,
   635  	//             must *not* be released
   636  	// on success: Any previously allocated IP that should be released, will be
   637  	//             released
   638  	txn := callbackTransaction{
   639  		commit: func() {
   640  			if dryRun {
   641  				return
   642  			}
   643  			if len(allocated) > 0 {
   644  				klog.InfoS("allocated clusterIPs",
   645  					"service", klog.KObj(service),
   646  					"clusterIPs", allocated)
   647  			}
   648  			if actuallyReleased, err := al.releaseIPs(released); err != nil {
   649  				klog.ErrorS(err, "failed to clean up after successful service update",
   650  					"service", klog.KObj(service),
   651  					"shouldRelease", released,
   652  					"released", actuallyReleased)
   653  			}
   654  		},
   655  		revert: func() {
   656  			if dryRun {
   657  				return
   658  			}
   659  			if actuallyReleased, err := al.releaseIPs(allocated); err != nil {
   660  				klog.ErrorS(err, "failed to clean up after failed service update",
   661  					"service", klog.KObj(service),
   662  					"shouldRelease", allocated,
   663  					"released", actuallyReleased)
   664  			}
   665  		},
   666  	}
   667  	return txn, nil
   668  }
   670  // handles type change/upgrade/downgrade change type for an update service
   671  // this func does not perform actual release of clusterIPs. it returns
   672  // a map[family]ip for the caller to release when everything else has
   673  // executed successfully
   674  func (al *Allocators) updateClusterIPs(after After, before Before, dryRun bool) (allocated map[api.IPFamily]string, toRelease map[api.IPFamily]string, err error) {
   675  	oldService, service := before.Service, after.Service
   677  	// We don't want to auto-upgrade (add an IP) or downgrade (remove an IP)
   678  	// PreferDualStack services following a cluster change to/from
   679  	// dual-stackness.
   680  	//
   681  	// That means a PreferDualStack service will only be upgraded/downgraded
   682  	// when:
   683  	// - changing ipFamilyPolicy to "RequireDualStack" or "SingleStack" AND
   684  	// - adding or removing a secondary clusterIP or ipFamily
   685  	if isMatchingPreferDualStackClusterIPFields(after, before) {
   686  		return allocated, toRelease, nil // nothing more to do.
   687  	}
   689  	// use cases:
   690  	// A: service changing types from ExternalName TO ClusterIP types ==> allocate all new
   691  	// B: service changing types from ClusterIP types TO ExternalName ==> release all allocated
   692  	// C: Service upgrading to dual stack  ==> partial allocation
   693  	// D: service downgrading from dual stack ==> partial release
   695  	// CASE A:
   696  	// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
   697  	if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
   698  		allocated, err := al.allocClusterIPs(service, dryRun)
   699  		return allocated, nil, err
   700  	}
   702  	// if headless service then we bail out early (no clusterIPs management needed)
   703  	if len(oldService.Spec.ClusterIPs) > 0 && oldService.Spec.ClusterIPs[0] == api.ClusterIPNone {
   704  		return nil, nil, nil
   705  	}
   707  	// CASE B:
   708  	// Update service from non-ExternalName to ExternalName, should release ClusterIP if exists.
   709  	if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
   710  		toRelease = make(map[api.IPFamily]string)
   711  		for i, family := range oldService.Spec.IPFamilies {
   712  			toRelease[family] = oldService.Spec.ClusterIPs[i]
   713  		}
   714  		return nil, toRelease, nil
   715  	}
   717  	upgraded := len(oldService.Spec.IPFamilies) == 1 && len(service.Spec.IPFamilies) == 2
   718  	downgraded := len(oldService.Spec.IPFamilies) == 2 && len(service.Spec.IPFamilies) == 1
   720  	// CASE C:
   721  	if upgraded {
   722  		toAllocate := make(map[api.IPFamily]string)
   723  		// if secondary ip was named, just get it. if not add a marker
   724  		if len(service.Spec.ClusterIPs) < 2 {
   725  			service.Spec.ClusterIPs = append(service.Spec.ClusterIPs, "" /* marker */)
   726  		}
   728  		toAllocate[service.Spec.IPFamilies[1]] = service.Spec.ClusterIPs[1]
   730  		// allocate
   731  		allocated, err := al.allocIPs(service, toAllocate, dryRun)
   732  		// set if successful
   733  		if err == nil {
   734  			service.Spec.ClusterIPs[1] = allocated[service.Spec.IPFamilies[1]]
   735  		}
   737  		return allocated, nil, err
   738  	}
   740  	// CASE D:
   741  	if downgraded {
   742  		toRelease = make(map[api.IPFamily]string)
   743  		toRelease[oldService.Spec.IPFamilies[1]] = oldService.Spec.ClusterIPs[1]
   744  		// note: we don't release clusterIP, this is left to clean up in the action itself
   745  		return nil, toRelease, err
   746  	}
   747  	// it was not an upgrade nor downgrade
   748  	return nil, nil, nil
   749  }
   751  func (al *Allocators) txnUpdateNodePorts(after After, before Before, dryRun bool) (transaction, error) {
   752  	oldService, service := before.Service, after.Service
   754  	// The allocator tracks dry-run-ness internally.
   755  	nodePortOp := portallocator.StartOperation(al.serviceNodePorts, dryRun)
   757  	txn := callbackTransaction{
   758  		commit: func() {
   759  			nodePortOp.Commit()
   760  			// We don't NEED to call Finish() here, but for that package says
   761  			// to, so for future-safety, we will.
   762  			nodePortOp.Finish()
   763  		},
   764  		revert: func() {
   765  			// Weirdly named but this will revert if commit wasn't called
   766  			nodePortOp.Finish()
   767  		},
   768  	}
   770  	// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
   771  	if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
   772  		(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
   773  		al.releaseNodePorts(oldService, nodePortOp)
   774  	}
   776  	// Update service from any type to NodePort or LoadBalancer, should update NodePort.
   777  	if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
   778  		if err := al.updateNodePorts(After{service}, Before{oldService}, nodePortOp); err != nil {
   779  			txn.Revert()
   780  			return nil, err
   781  		}
   782  	}
   784  	// Handle ExternalTraffic related updates.
   785  	success, err := al.updateHealthCheckNodePort(After{service}, Before{oldService}, nodePortOp)
   786  	if !success || err != nil {
   787  		txn.Revert()
   788  		return nil, err
   789  	}
   791  	return txn, nil
   792  }
   794  func (al *Allocators) releaseNodePorts(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
   795  	nodePorts := collectServiceNodePorts(service)
   797  	for _, nodePort := range nodePorts {
   798  		nodePortOp.ReleaseDeferred(nodePort)
   799  	}
   800  }
   802  func (al *Allocators) updateNodePorts(after After, before Before, nodePortOp *portallocator.PortAllocationOperation) error {
   803  	oldService, newService := before.Service, after.Service
   805  	oldNodePortsNumbers := collectServiceNodePorts(oldService)
   806  	newNodePorts := []ServiceNodePort{}
   807  	portAllocated := map[int]bool{}
   809  	for i := range newService.Spec.Ports {
   810  		servicePort := &newService.Spec.Ports[i]
   811  		if servicePort.NodePort == 0 && !shouldAllocateNodePorts(newService) {
   812  			// Don't allocate new ports, but do respect specific requests.
   813  			continue
   814  		}
   815  		nodePort := ServiceNodePort{Protocol: servicePort.Protocol, NodePort: servicePort.NodePort}
   816  		if nodePort.NodePort != 0 {
   817  			if !containsNumber(oldNodePortsNumbers, int(nodePort.NodePort)) && !portAllocated[int(nodePort.NodePort)] {
   818  				err := nodePortOp.Allocate(int(nodePort.NodePort))
   819  				if err != nil {
   820  					el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort.NodePort, err.Error())}
   821  					return errors.NewInvalid(api.Kind("Service"), newService.Name, el)
   822  				}
   823  				portAllocated[int(nodePort.NodePort)] = true
   824  			}
   825  		} else {
   826  			nodePortNumber, err := nodePortOp.AllocateNext()
   827  			if err != nil {
   828  				// TODO: what error should be returned here?  It's not a
   829  				// field-level validation failure (the field is valid), and it's
   830  				// not really an internal error.
   831  				return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
   832  			}
   833  			servicePort.NodePort = int32(nodePortNumber)
   834  			nodePort.NodePort = servicePort.NodePort
   835  		}
   836  		if containsNodePort(newNodePorts, nodePort) {
   837  			return fmt.Errorf("duplicate nodePort: %v", nodePort)
   838  		}
   839  		newNodePorts = append(newNodePorts, nodePort)
   840  	}
   842  	newNodePortsNumbers := collectServiceNodePorts(newService)
   844  	// The comparison loops are O(N^2), but we don't expect N to be huge
   845  	// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
   846  	for _, oldNodePortNumber := range oldNodePortsNumbers {
   847  		if containsNumber(newNodePortsNumbers, oldNodePortNumber) {
   848  			continue
   849  		}
   850  		nodePortOp.ReleaseDeferred(int(oldNodePortNumber))
   851  	}
   853  	return nil
   854  }
   856  // updateHealthCheckNodePort handles HealthCheckNodePort allocation/release
   857  // and adjusts HealthCheckNodePort during service update if needed.
   858  func (al *Allocators) updateHealthCheckNodePort(after After, before Before, nodePortOp *portallocator.PortAllocationOperation) (bool, error) {
   859  	oldService, service := before.Service, after.Service
   861  	neededHealthCheckNodePort := apiservice.NeedsHealthCheck(oldService)
   862  	oldHealthCheckNodePort := oldService.Spec.HealthCheckNodePort
   864  	needsHealthCheckNodePort := apiservice.NeedsHealthCheck(service)
   866  	switch {
   867  	// Case 1: Transition from don't need HealthCheckNodePort to needs HealthCheckNodePort.
   868  	// Allocate a health check node port or attempt to reserve the user-specified one if provided.
   869  	// Insert health check node port into the service's HealthCheckNodePort field if needed.
   870  	case !neededHealthCheckNodePort && needsHealthCheckNodePort:
   871  		if err := al.allocHealthCheckNodePort(service, nodePortOp); err != nil {
   872  			return false, errors.NewInternalError(err)
   873  		}
   875  	// Case 2: Transition from needs HealthCheckNodePort to don't need HealthCheckNodePort.
   876  	// Free the existing healthCheckNodePort and clear the HealthCheckNodePort field.
   877  	case neededHealthCheckNodePort && !needsHealthCheckNodePort:
   878  		nodePortOp.ReleaseDeferred(int(oldHealthCheckNodePort))
   879  	}
   880  	return true, nil
   881  }
   883  func (al *Allocators) releaseAllocatedResources(svc *api.Service) {
   884  	al.releaseClusterIPs(svc)
   886  	for _, nodePort := range collectServiceNodePorts(svc) {
   887  		err := al.serviceNodePorts.Release(nodePort)
   888  		if err != nil {
   889  			// these should be caught by an eventual reconciliation / restart
   890  			utilruntime.HandleError(fmt.Errorf("Error releasing service %s node port %d: %v", svc.Name, nodePort, err))
   891  		}
   892  	}
   894  	if apiservice.NeedsHealthCheck(svc) {
   895  		nodePort := svc.Spec.HealthCheckNodePort
   896  		if nodePort > 0 {
   897  			err := al.serviceNodePorts.Release(int(nodePort))
   898  			if err != nil {
   899  				// these should be caught by an eventual reconciliation / restart
   900  				utilruntime.HandleError(fmt.Errorf("Error releasing service %s health check node port %d: %v", svc.Name, nodePort, err))
   901  			}
   902  		}
   903  	}
   904  }
   906  // releases allocated ClusterIPs for service that is about to be deleted
   907  func (al *Allocators) releaseClusterIPs(service *api.Service) (released map[api.IPFamily]string, err error) {
   908  	// external name don't get ClusterIPs
   909  	if service.Spec.Type == api.ServiceTypeExternalName {
   910  		return nil, nil
   911  	}
   913  	// headless don't get ClusterIPs
   914  	if len(service.Spec.ClusterIPs) > 0 && service.Spec.ClusterIPs[0] == api.ClusterIPNone {
   915  		return nil, nil
   916  	}
   918  	toRelease := make(map[api.IPFamily]string)
   919  	for _, ip := range service.Spec.ClusterIPs {
   920  		if netutils.IsIPv6String(ip) {
   921  			toRelease[api.IPv6Protocol] = ip
   922  		} else {
   923  			toRelease[api.IPv4Protocol] = ip
   924  		}
   925  	}
   926  	return al.releaseIPs(toRelease)
   927  }
   929  func (al *Allocators) Destroy() {
   930  	al.serviceNodePorts.Destroy()
   931  	for _, a := range al.serviceIPAllocatorsByFamily {
   932  		a.Destroy()
   933  	}
   934  }
   936  // This is O(N), but we expect haystack to be small;
   937  // so small that we expect a linear search to be faster
   938  func containsNumber(haystack []int, needle int) bool {
   939  	for _, v := range haystack {
   940  		if v == needle {
   941  			return true
   942  		}
   943  	}
   944  	return false
   945  }
   947  // This is O(N), but we expect serviceNodePorts to be small;
   948  // so small that we expect a linear search to be faster
   949  func containsNodePort(serviceNodePorts []ServiceNodePort, serviceNodePort ServiceNodePort) bool {
   950  	for _, snp := range serviceNodePorts {
   951  		if snp == serviceNodePort {
   952  			return true
   953  		}
   954  	}
   955  	return false
   956  }
   958  // Loop through the service ports list, find one with the same port number and
   959  // NodePort specified, return this NodePort otherwise return 0.
   960  func findRequestedNodePort(port int, servicePorts []api.ServicePort) int {
   961  	for i := range servicePorts {
   962  		servicePort := servicePorts[i]
   963  		if port == int(servicePort.Port) && servicePort.NodePort != 0 {
   964  			return int(servicePort.NodePort)
   965  		}
   966  	}
   967  	return 0
   968  }
   970  func shouldAllocateNodePorts(service *api.Service) bool {
   971  	if service.Spec.Type == api.ServiceTypeNodePort {
   972  		return true
   973  	}
   974  	if service.Spec.Type == api.ServiceTypeLoadBalancer {
   975  		return *service.Spec.AllocateLoadBalancerNodePorts
   976  	}
   977  	return false
   978  }
   980  func collectServiceNodePorts(service *api.Service) []int {
   981  	servicePorts := []int{}
   982  	for i := range service.Spec.Ports {
   983  		servicePort := &service.Spec.Ports[i]
   984  		if servicePort.NodePort != 0 {
   985  			servicePorts = append(servicePorts, int(servicePort.NodePort))
   986  		}
   987  	}
   988  	return servicePorts
   989  }
   991  // tests if two preferred dual-stack service have matching ClusterIPFields
   992  // assumption: old service is a valid, default service (e.g., loaded from store)
   993  func isMatchingPreferDualStackClusterIPFields(after After, before Before) bool {
   994  	oldService, service := before.Service, after.Service
   996  	if oldService == nil {
   997  		return false
   998  	}
  1000  	if service.Spec.IPFamilyPolicy == nil {
  1001  		return false
  1002  	}
  1004  	// if type mutated then it is an update
  1005  	// that needs to run through the entire process.
  1006  	if oldService.Spec.Type != service.Spec.Type {
  1007  		return false
  1008  	}
  1009  	// both must be type that gets an IP assigned
  1010  	if service.Spec.Type != api.ServiceTypeClusterIP &&
  1011  		service.Spec.Type != api.ServiceTypeNodePort &&
  1012  		service.Spec.Type != api.ServiceTypeLoadBalancer {
  1013  		return false
  1014  	}
  1016  	// both must be of IPFamilyPolicy==PreferDualStack
  1017  	if service.Spec.IPFamilyPolicy != nil && *(service.Spec.IPFamilyPolicy) != api.IPFamilyPolicyPreferDualStack {
  1018  		return false
  1019  	}
  1021  	if oldService.Spec.IPFamilyPolicy != nil && *(oldService.Spec.IPFamilyPolicy) != api.IPFamilyPolicyPreferDualStack {
  1022  		return false
  1023  	}
  1025  	if !sameClusterIPs(oldService, service) {
  1026  		return false
  1027  	}
  1029  	if !sameIPFamilies(oldService, service) {
  1030  		return false
  1031  	}
  1033  	// they match on
  1034  	// Policy: preferDualStack
  1035  	// ClusterIPs
  1036  	// IPFamilies
  1037  	return true
  1038  }
  1040  // Helper to avoid nil-checks all over.  Callers of this need to be checking
  1041  // for an exact value.
  1042  func getIPFamilyPolicy(svc *api.Service) api.IPFamilyPolicy {
  1043  	if svc.Spec.IPFamilyPolicy == nil {
  1044  		return "" // callers need to handle this
  1045  	}
  1046  	return *svc.Spec.IPFamilyPolicy
  1047  }
  1049  func sameClusterIPs(lhs, rhs *api.Service) bool {
  1050  	if len(rhs.Spec.ClusterIPs) != len(lhs.Spec.ClusterIPs) {
  1051  		return false
  1052  	}
  1054  	for i, ip := range rhs.Spec.ClusterIPs {
  1055  		if lhs.Spec.ClusterIPs[i] != ip {
  1056  			return false
  1057  		}
  1058  	}
  1060  	return true
  1061  }
  1063  func reducedClusterIPs(after After, before Before) bool {
  1064  	oldSvc, newSvc := before.Service, after.Service
  1066  	if len(newSvc.Spec.ClusterIPs) == 0 { // Not specified
  1067  		return false
  1068  	}
  1069  	return len(newSvc.Spec.ClusterIPs) < len(oldSvc.Spec.ClusterIPs)
  1070  }
  1072  func sameIPFamilies(lhs, rhs *api.Service) bool {
  1073  	if len(rhs.Spec.IPFamilies) != len(lhs.Spec.IPFamilies) {
  1074  		return false
  1075  	}
  1077  	for i, family := range rhs.Spec.IPFamilies {
  1078  		if lhs.Spec.IPFamilies[i] != family {
  1079  			return false
  1080  		}
  1081  	}
  1083  	return true
  1084  }
  1086  func reducedIPFamilies(after After, before Before) bool {
  1087  	oldSvc, newSvc := before.Service, after.Service
  1089  	if len(newSvc.Spec.IPFamilies) == 0 { // Not specified
  1090  		return false
  1091  	}
  1092  	return len(newSvc.Spec.IPFamilies) < len(oldSvc.Spec.IPFamilies)
  1093  }
  1095  // Helper to get the IP family of a given IP.
  1096  func familyOf(ip string) api.IPFamily {
  1097  	if netutils.IsIPv4String(ip) {
  1098  		return api.IPv4Protocol
  1099  	}
  1100  	if netutils.IsIPv6String(ip) {
  1101  		return api.IPv6Protocol
  1102  	}
  1103  	return api.IPFamily("unknown")
  1104  }

View as plain text