...

Source file src/k8s.io/kubernetes/pkg/registry/core/service/storage/storage.go

Documentation: k8s.io/kubernetes/pkg/registry/core/service/storage

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"net"
    24  	"net/http"
    25  	"net/url"
    26  	"strconv"
    27  
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	utilnet "k8s.io/apimachinery/pkg/util/net"
    32  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    35  	"k8s.io/apiserver/pkg/registry/generic"
    36  	genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
    37  	"k8s.io/apiserver/pkg/registry/rest"
    38  	"k8s.io/apiserver/pkg/util/dryrun"
    39  	"k8s.io/klog/v2"
    40  	api "k8s.io/kubernetes/pkg/apis/core"
    41  	"k8s.io/kubernetes/pkg/printers"
    42  	printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
    43  	printerstorage "k8s.io/kubernetes/pkg/printers/storage"
    44  	svcreg "k8s.io/kubernetes/pkg/registry/core/service"
    45  	"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
    46  	"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
    47  	netutil "k8s.io/utils/net"
    48  	"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
    49  )
    50  
    51  type EndpointsStorage interface {
    52  	rest.Getter
    53  	rest.GracefulDeleter
    54  }
    55  
    56  type PodStorage interface {
    57  	rest.Getter
    58  }
    59  
    60  type REST struct {
    61  	*genericregistry.Store
    62  	primaryIPFamily   api.IPFamily
    63  	secondaryIPFamily api.IPFamily
    64  	alloc             Allocators
    65  	endpoints         EndpointsStorage
    66  	pods              PodStorage
    67  	proxyTransport    http.RoundTripper
    68  }
    69  
    70  var (
    71  	_ rest.CategoriesProvider     = &REST{}
    72  	_ rest.ShortNamesProvider     = &REST{}
    73  	_ rest.StorageVersionProvider = &REST{}
    74  	_ rest.ResetFieldsStrategy    = &REST{}
    75  	_ rest.Redirector             = &REST{}
    76  )
    77  
    78  // NewREST returns a REST object that will work against services.
    79  func NewREST(
    80  	optsGetter generic.RESTOptionsGetter,
    81  	serviceIPFamily api.IPFamily,
    82  	ipAllocs map[api.IPFamily]ipallocator.Interface,
    83  	portAlloc portallocator.Interface,
    84  	endpoints EndpointsStorage,
    85  	pods PodStorage,
    86  	proxyTransport http.RoundTripper) (*REST, *StatusREST, *svcreg.ProxyREST, error) {
    87  
    88  	store := &genericregistry.Store{
    89  		NewFunc:                   func() runtime.Object { return &api.Service{} },
    90  		NewListFunc:               func() runtime.Object { return &api.ServiceList{} },
    91  		DefaultQualifiedResource:  api.Resource("services"),
    92  		SingularQualifiedResource: api.Resource("service"),
    93  		ReturnDeletedObject:       true,
    94  
    95  		CreateStrategy:      svcreg.Strategy,
    96  		UpdateStrategy:      svcreg.Strategy,
    97  		DeleteStrategy:      svcreg.Strategy,
    98  		ResetFieldsStrategy: svcreg.Strategy,
    99  
   100  		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
   101  	}
   102  	options := &generic.StoreOptions{RESTOptions: optsGetter}
   103  	if err := store.CompleteWithOptions(options); err != nil {
   104  		return nil, nil, nil, err
   105  	}
   106  
   107  	statusStore := *store
   108  	statusStore.UpdateStrategy = svcreg.StatusStrategy
   109  	statusStore.ResetFieldsStrategy = svcreg.StatusStrategy
   110  
   111  	var primaryIPFamily api.IPFamily = serviceIPFamily
   112  	var secondaryIPFamily api.IPFamily = "" // sentinel value
   113  	if len(ipAllocs) > 1 {
   114  		secondaryIPFamily = otherFamily(serviceIPFamily)
   115  	}
   116  	genericStore := &REST{
   117  		Store:             store,
   118  		primaryIPFamily:   primaryIPFamily,
   119  		secondaryIPFamily: secondaryIPFamily,
   120  		alloc:             makeAlloc(serviceIPFamily, ipAllocs, portAlloc),
   121  		endpoints:         endpoints,
   122  		pods:              pods,
   123  		proxyTransport:    proxyTransport,
   124  	}
   125  	store.Decorator = genericStore.defaultOnRead
   126  	store.AfterDelete = genericStore.afterDelete
   127  	store.BeginCreate = genericStore.beginCreate
   128  	store.BeginUpdate = genericStore.beginUpdate
   129  
   130  	// users can patch the status to remove the finalizer,
   131  	// hence statusStore must participate on the AfterDelete
   132  	// hook to release the allocated resources
   133  	statusStore.AfterDelete = genericStore.afterDelete
   134  
   135  	return genericStore, &StatusREST{store: &statusStore}, &svcreg.ProxyREST{Redirector: genericStore, ProxyTransport: proxyTransport}, nil
   136  }
   137  
   138  // otherFamily returns the non-selected IPFamily.  This assumes the input is
   139  // valid.
   140  func otherFamily(fam api.IPFamily) api.IPFamily {
   141  	if fam == api.IPv4Protocol {
   142  		return api.IPv6Protocol
   143  	}
   144  	return api.IPv4Protocol
   145  }
   146  
   147  var (
   148  	_ rest.ShortNamesProvider = &REST{}
   149  	_ rest.CategoriesProvider = &REST{}
   150  )
   151  
   152  // ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
   153  func (r *REST) ShortNames() []string {
   154  	return []string{"svc"}
   155  }
   156  
   157  // Categories implements the CategoriesProvider interface. Returns a list of categories a resource is part of.
   158  func (r *REST) Categories() []string {
   159  	return []string{"all"}
   160  }
   161  
   162  // Destroy cleans up everything on shutdown.
   163  func (r *REST) Destroy() {
   164  	r.Store.Destroy()
   165  	r.alloc.Destroy()
   166  }
   167  
   168  // StatusREST implements the REST endpoint for changing the status of a service.
   169  type StatusREST struct {
   170  	store *genericregistry.Store
   171  }
   172  
   173  func (r *StatusREST) New() runtime.Object {
   174  	return &api.Service{}
   175  }
   176  
   177  // Destroy cleans up resources on shutdown.
   178  func (r *StatusREST) Destroy() {
   179  	// Given that underlying store is shared with REST,
   180  	// we don't destroy it here explicitly.
   181  }
   182  
   183  // Get retrieves the object from the storage. It is required to support Patch.
   184  func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
   185  	return r.store.Get(ctx, name, options)
   186  }
   187  
   188  // Update alters the status subset of an object.
   189  func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
   190  	// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
   191  	// subresources should never allow create on update.
   192  	return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
   193  }
   194  
   195  func (r *StatusREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
   196  	return r.store.ConvertToTable(ctx, object, tableOptions)
   197  }
   198  
   199  // GetResetFields implements rest.ResetFieldsStrategy
   200  func (r *StatusREST) GetResetFields() map[fieldpath.APIVersion]*fieldpath.Set {
   201  	return r.store.GetResetFields()
   202  }
   203  
   204  // We have a lot of functions that take a pair of "before" and "after" or
   205  // "oldSvc" and "newSvc" args.  Convention across the codebase is to pass them
   206  // as (new, old), but it's easy to screw up when they are the same type.
   207  //
   208  // These types force us to pay attention.  If the order of the arguments
   209  // matters, please receive them as:
   210  //    func something(after After, before Before) {
   211  //        oldSvc, newSvc := before.Service, after.Service
   212  //
   213  // If the order of arguments DOES NOT matter, please receive them as:
   214  //    func something(lhs, rhs *api.Service) {
   215  
   216  type Before struct {
   217  	*api.Service
   218  }
   219  type After struct {
   220  	*api.Service
   221  }
   222  
   223  // defaultOnRead sets interlinked fields that were not previously set on read.
   224  // We can't do this in the normal defaulting path because that same logic
   225  // applies on Get, Create, and Update, but we need to distinguish between them.
   226  //
   227  // This will be called on both Service and ServiceList types.
   228  func (r *REST) defaultOnRead(obj runtime.Object) {
   229  	switch s := obj.(type) {
   230  	case *api.Service:
   231  		r.defaultOnReadService(s)
   232  	case *api.ServiceList:
   233  		r.defaultOnReadServiceList(s)
   234  	default:
   235  		// This was not an object we can default.  This is not an error, as the
   236  		// caching layer can pass through here, too.
   237  	}
   238  }
   239  
   240  // defaultOnReadServiceList defaults a ServiceList.
   241  func (r *REST) defaultOnReadServiceList(serviceList *api.ServiceList) {
   242  	if serviceList == nil {
   243  		return
   244  	}
   245  
   246  	for i := range serviceList.Items {
   247  		r.defaultOnReadService(&serviceList.Items[i])
   248  	}
   249  }
   250  
   251  // defaultOnReadService defaults a single Service.
   252  func (r *REST) defaultOnReadService(service *api.Service) {
   253  	if service == nil {
   254  		return
   255  	}
   256  
   257  	// We might find Services that were written before ClusterIP became plural.
   258  	// We still want to present a consistent view of them.
   259  	normalizeClusterIPs(After{service}, Before{nil})
   260  
   261  	// Set ipFamilies and ipFamilyPolicy if needed.
   262  	r.defaultOnReadIPFamilies(service)
   263  
   264  	// We unintentionally defaulted internalTrafficPolicy when it's not needed
   265  	// for the ExternalName type. It's too late to change the field in storage,
   266  	// but we can drop the field when read.
   267  	defaultOnReadInternalTrafficPolicy(service)
   268  }
   269  
   270  func defaultOnReadInternalTrafficPolicy(service *api.Service) {
   271  	if service.Spec.Type == api.ServiceTypeExternalName {
   272  		service.Spec.InternalTrafficPolicy = nil
   273  	}
   274  }
   275  
   276  func (r *REST) defaultOnReadIPFamilies(service *api.Service) {
   277  	// ExternalName does not need this.
   278  	if !needsClusterIP(service) {
   279  		return
   280  	}
   281  
   282  	// If IPFamilies is set, we assume IPFamilyPolicy is also set (it should
   283  	// not be possible to have one and not the other), and therefore we don't
   284  	// need further defaulting.  Likewise, if IPFamilies is *not* set, we
   285  	// assume IPFamilyPolicy can't be set either.
   286  	if len(service.Spec.IPFamilies) > 0 {
   287  		return
   288  	}
   289  
   290  	singleStack := api.IPFamilyPolicySingleStack
   291  	requireDualStack := api.IPFamilyPolicyRequireDualStack
   292  
   293  	if service.Spec.ClusterIP == api.ClusterIPNone {
   294  		// Headless.
   295  		if len(service.Spec.Selector) == 0 {
   296  			// Headless + selectorless is a special-case.
   297  			//
   298  			// At this stage we don't know what kind of endpoints (specifically
   299  			// their IPFamilies) the user has assigned to this selectorless
   300  			// service. We assume it has dual-stack and we default it to
   301  			// RequireDualStack on any cluster (single- or dual-stack
   302  			// configured).
   303  			service.Spec.IPFamilyPolicy = &requireDualStack
   304  			service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily, otherFamily(r.primaryIPFamily)}
   305  		} else {
   306  			// Headless + selector - default to single.
   307  			service.Spec.IPFamilyPolicy = &singleStack
   308  			service.Spec.IPFamilies = []api.IPFamily{r.primaryIPFamily}
   309  		}
   310  	} else {
   311  		// Headful: init ipFamilies from clusterIPs.
   312  		service.Spec.IPFamilies = make([]api.IPFamily, len(service.Spec.ClusterIPs))
   313  		for idx, ip := range service.Spec.ClusterIPs {
   314  			if netutil.IsIPv6String(ip) {
   315  				service.Spec.IPFamilies[idx] = api.IPv6Protocol
   316  			} else {
   317  				service.Spec.IPFamilies[idx] = api.IPv4Protocol
   318  			}
   319  		}
   320  		if len(service.Spec.IPFamilies) == 1 {
   321  			service.Spec.IPFamilyPolicy = &singleStack
   322  		} else if len(service.Spec.IPFamilies) == 2 {
   323  			// It shouldn't be possible to get here, but just in case.
   324  			service.Spec.IPFamilyPolicy = &requireDualStack
   325  		}
   326  	}
   327  }
   328  
   329  func (r *REST) afterDelete(obj runtime.Object, options *metav1.DeleteOptions) {
   330  	svc := obj.(*api.Service)
   331  
   332  	// Normally this defaulting is done automatically, but the hook (Decorator)
   333  	// is called at the end of this process, and we want the fully-formed
   334  	// object.
   335  	r.defaultOnReadService(svc)
   336  
   337  	// Only perform the cleanup if this is a non-dryrun deletion
   338  	if !dryrun.IsDryRun(options.DryRun) {
   339  		// It would be better if we had the caller context, but that changes
   340  		// this hook signature.
   341  		ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), svc.Namespace)
   342  		// TODO: This is clumsy.  It was added for fear that the endpoints
   343  		// controller might lag, and we could end up rusing the service name
   344  		// with old endpoints.  We should solve that better and remove this, or
   345  		// else we should do this for EndpointSlice, too.
   346  		_, _, err := r.endpoints.Delete(ctx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{})
   347  		if err != nil && !errors.IsNotFound(err) {
   348  			klog.Errorf("delete service endpoints %s/%s failed: %v", svc.Name, svc.Namespace, err)
   349  		}
   350  
   351  		r.alloc.releaseAllocatedResources(svc)
   352  	}
   353  }
   354  
   355  func (r *REST) beginCreate(ctx context.Context, obj runtime.Object, options *metav1.CreateOptions) (genericregistry.FinishFunc, error) {
   356  	svc := obj.(*api.Service)
   357  
   358  	// Make sure ClusterIP and ClusterIPs are in sync.  This has to happen
   359  	// early, before anyone looks at them.
   360  	normalizeClusterIPs(After{svc}, Before{nil})
   361  
   362  	// Allocate IPs and ports. If we had a transactional store, this would just
   363  	// be part of the larger transaction.  We don't have that, so we have to do
   364  	// it manually. This has to happen here and not in any earlier hooks (e.g.
   365  	// defaulting) because it needs to be aware of flags and be able to access
   366  	// API storage.
   367  	txn, err := r.alloc.allocateCreate(svc, dryrun.IsDryRun(options.DryRun))
   368  	if err != nil {
   369  		return nil, err
   370  	}
   371  
   372  	// Our cleanup callback
   373  	finish := func(_ context.Context, success bool) {
   374  		if success {
   375  			txn.Commit()
   376  		} else {
   377  			txn.Revert()
   378  		}
   379  	}
   380  
   381  	return finish, nil
   382  }
   383  
   384  func (r *REST) beginUpdate(ctx context.Context, obj, oldObj runtime.Object, options *metav1.UpdateOptions) (genericregistry.FinishFunc, error) {
   385  	newSvc := obj.(*api.Service)
   386  	oldSvc := oldObj.(*api.Service)
   387  
   388  	// Make sure the existing object has all fields we expect to be defaulted.
   389  	// This might not be true if the saved object predates these fields (the
   390  	// Decorator hook is not called on 'old' in the update path.
   391  	r.defaultOnReadService(oldSvc)
   392  
   393  	// Fix up allocated values that the client may have not specified (for
   394  	// idempotence).
   395  	patchAllocatedValues(After{newSvc}, Before{oldSvc})
   396  
   397  	// Make sure ClusterIP and ClusterIPs are in sync.  This has to happen
   398  	// early, before anyone looks at them.
   399  	normalizeClusterIPs(After{newSvc}, Before{oldSvc})
   400  
   401  	// Allocate and initialize fields.
   402  	txn, err := r.alloc.allocateUpdate(After{newSvc}, Before{oldSvc}, dryrun.IsDryRun(options.DryRun))
   403  	if err != nil {
   404  		return nil, err
   405  	}
   406  
   407  	// Our cleanup callback
   408  	finish := func(_ context.Context, success bool) {
   409  		if success {
   410  			txn.Commit()
   411  		} else {
   412  			txn.Revert()
   413  		}
   414  	}
   415  
   416  	return finish, nil
   417  }
   418  
   419  // ResourceLocation returns a URL to which one can send traffic for the specified service.
   420  func (r *REST) ResourceLocation(ctx context.Context, id string) (*url.URL, http.RoundTripper, error) {
   421  	// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port".
   422  	svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id)
   423  	if !valid {
   424  		return nil, nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
   425  	}
   426  
   427  	// If a port *number* was specified, find the corresponding service port name
   428  	if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil {
   429  		obj, err := r.Get(ctx, svcName, &metav1.GetOptions{})
   430  		if err != nil {
   431  			return nil, nil, err
   432  		}
   433  		svc := obj.(*api.Service)
   434  		found := false
   435  		for _, svcPort := range svc.Spec.Ports {
   436  			if int64(svcPort.Port) == portNum {
   437  				// use the declared port's name
   438  				portStr = svcPort.Name
   439  				found = true
   440  				break
   441  			}
   442  		}
   443  		if !found {
   444  			return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName))
   445  		}
   446  	}
   447  
   448  	obj, err := r.endpoints.Get(ctx, svcName, &metav1.GetOptions{})
   449  	if err != nil {
   450  		return nil, nil, err
   451  	}
   452  	eps := obj.(*api.Endpoints)
   453  	if len(eps.Subsets) == 0 {
   454  		return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName))
   455  	}
   456  	// Pick a random Subset to start searching from.
   457  	ssSeed := rand.Intn(len(eps.Subsets))
   458  	// Find a Subset that has the port.
   459  	for ssi := 0; ssi < len(eps.Subsets); ssi++ {
   460  		ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
   461  		if len(ss.Addresses) == 0 {
   462  			continue
   463  		}
   464  		for i := range ss.Ports {
   465  			if ss.Ports[i].Name == portStr {
   466  				addrSeed := rand.Intn(len(ss.Addresses))
   467  				// This is a little wonky, but it's expensive to test for the presence of a Pod
   468  				// So we repeatedly try at random and validate it, this means that for an invalid
   469  				// service with a lot of endpoints we're going to potentially make a lot of calls,
   470  				// but in the expected case we'll only make one.
   471  				for try := 0; try < len(ss.Addresses); try++ {
   472  					addr := ss.Addresses[(addrSeed+try)%len(ss.Addresses)]
   473  					// We only proxy to addresses that are actually pods.
   474  					if err := isValidAddress(ctx, &addr, r.pods); err != nil {
   475  						utilruntime.HandleError(fmt.Errorf("Address %v isn't valid (%v)", addr, err))
   476  						continue
   477  					}
   478  					ip := addr.IP
   479  					port := int(ss.Ports[i].Port)
   480  					return &url.URL{
   481  						Scheme: svcScheme,
   482  						Host:   net.JoinHostPort(ip, strconv.Itoa(port)),
   483  					}, r.proxyTransport, nil
   484  				}
   485  				utilruntime.HandleError(fmt.Errorf("Failed to find a valid address, skipping subset: %v", ss))
   486  			}
   487  		}
   488  	}
   489  	return nil, nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
   490  }
   491  
   492  func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Getter) error {
   493  	if addr.TargetRef == nil {
   494  		return fmt.Errorf("Address has no target ref, skipping: %v", addr)
   495  	}
   496  	if genericapirequest.NamespaceValue(ctx) != addr.TargetRef.Namespace {
   497  		return fmt.Errorf("Address namespace doesn't match context namespace")
   498  	}
   499  	obj, err := pods.Get(ctx, addr.TargetRef.Name, &metav1.GetOptions{})
   500  	if err != nil {
   501  		return err
   502  	}
   503  	pod, ok := obj.(*api.Pod)
   504  	if !ok {
   505  		return fmt.Errorf("failed to cast to pod: %v", obj)
   506  	}
   507  	if pod == nil {
   508  		return fmt.Errorf("pod is missing, skipping (%s/%s)", addr.TargetRef.Namespace, addr.TargetRef.Name)
   509  	}
   510  	for _, podIP := range pod.Status.PodIPs {
   511  		if podIP.IP == addr.IP {
   512  			return nil
   513  		}
   514  	}
   515  	return fmt.Errorf("pod ip(s) doesn't match endpoint ip, skipping: %v vs %s (%s/%s)", pod.Status.PodIPs, addr.IP, addr.TargetRef.Namespace, addr.TargetRef.Name)
   516  }
   517  
   518  // normalizeClusterIPs adjust clusterIPs based on ClusterIP.  This must not
   519  // consider any other fields.
   520  func normalizeClusterIPs(after After, before Before) {
   521  	oldSvc, newSvc := before.Service, after.Service
   522  
   523  	// In all cases here, we don't need to over-think the inputs.  Validation
   524  	// will be called on the new object soon enough.  All this needs to do is
   525  	// try to divine what user meant with these linked fields. The below
   526  	// is verbosely written for clarity.
   527  
   528  	// **** IMPORTANT *****
   529  	// as a governing rule. User must (either)
   530  	// -- Use singular only (old client)
   531  	// -- singular and plural fields (new clients)
   532  
   533  	if oldSvc == nil {
   534  		// This was a create operation.
   535  		// User specified singular and not plural (e.g. an old client), so init
   536  		// plural for them.
   537  		if len(newSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
   538  			newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
   539  			return
   540  		}
   541  
   542  		// we don't init singular based on plural because
   543  		// new client must use both fields
   544  
   545  		// Either both were not specified (will be allocated) or both were
   546  		// specified (will be validated).
   547  		return
   548  	}
   549  
   550  	// This was an update operation
   551  
   552  	// ClusterIPs were cleared by an old client which was trying to patch
   553  	// some field and didn't provide ClusterIPs
   554  	if len(oldSvc.Spec.ClusterIPs) > 0 && len(newSvc.Spec.ClusterIPs) == 0 {
   555  		// if ClusterIP is the same, then it is an old client trying to
   556  		// patch service and didn't provide ClusterIPs
   557  		if oldSvc.Spec.ClusterIP == newSvc.Spec.ClusterIP {
   558  			newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
   559  		}
   560  	}
   561  
   562  	// clusterIP is not the same
   563  	if oldSvc.Spec.ClusterIP != newSvc.Spec.ClusterIP {
   564  		// this is a client trying to clear it
   565  		if len(oldSvc.Spec.ClusterIP) > 0 && len(newSvc.Spec.ClusterIP) == 0 {
   566  			// if clusterIPs are the same, then clear on their behalf
   567  			if sameClusterIPs(oldSvc, newSvc) {
   568  				newSvc.Spec.ClusterIPs = nil
   569  			}
   570  
   571  			// if they provided nil, then we are fine (handled by patching case above)
   572  			// if they changed it then validation will catch it
   573  		} else {
   574  			// ClusterIP has changed but not cleared *and* ClusterIPs are the same
   575  			// then we set ClusterIPs based on ClusterIP
   576  			if sameClusterIPs(oldSvc, newSvc) {
   577  				newSvc.Spec.ClusterIPs = []string{newSvc.Spec.ClusterIP}
   578  			}
   579  		}
   580  	}
   581  }
   582  
   583  // patchAllocatedValues allows clients to avoid a read-modify-write cycle while
   584  // preserving values that we allocated on their behalf.  For example, they
   585  // might create a Service without specifying the ClusterIP, in which case we
   586  // allocate one.  If they resubmit that same YAML, we want it to succeed.
   587  func patchAllocatedValues(after After, before Before) {
   588  	oldSvc, newSvc := before.Service, after.Service
   589  
   590  	if needsClusterIP(oldSvc) && needsClusterIP(newSvc) {
   591  		if newSvc.Spec.ClusterIP == "" {
   592  			newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP
   593  		}
   594  		if len(newSvc.Spec.ClusterIPs) == 0 && len(oldSvc.Spec.ClusterIPs) > 0 {
   595  			newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
   596  		}
   597  	}
   598  
   599  	if needsNodePort(oldSvc) && needsNodePort(newSvc) {
   600  		nodePortsUsed := func(svc *api.Service) sets.Int32 {
   601  			used := sets.NewInt32()
   602  			for _, p := range svc.Spec.Ports {
   603  				if p.NodePort != 0 {
   604  					used.Insert(p.NodePort)
   605  				}
   606  			}
   607  			return used
   608  		}
   609  
   610  		// Build a set of all the ports in oldSvc that are also in newSvc.  We know
   611  		// we can't patch these values.
   612  		used := nodePortsUsed(oldSvc).Intersection(nodePortsUsed(newSvc))
   613  
   614  		// Map NodePorts by name.  The user may have changed other properties
   615  		// of the port, but we won't see that here.
   616  		np := map[string]int32{}
   617  		for i := range oldSvc.Spec.Ports {
   618  			p := &oldSvc.Spec.Ports[i]
   619  			np[p.Name] = p.NodePort
   620  		}
   621  
   622  		// If newSvc is missing values, try to patch them in when we know them and
   623  		// they haven't been used for another port.
   624  
   625  		for i := range newSvc.Spec.Ports {
   626  			p := &newSvc.Spec.Ports[i]
   627  			if p.NodePort == 0 {
   628  				oldVal := np[p.Name]
   629  				if !used.Has(oldVal) {
   630  					p.NodePort = oldVal
   631  				}
   632  			}
   633  		}
   634  	}
   635  
   636  	if needsHCNodePort(oldSvc) && needsHCNodePort(newSvc) {
   637  		if newSvc.Spec.HealthCheckNodePort == 0 {
   638  			newSvc.Spec.HealthCheckNodePort = oldSvc.Spec.HealthCheckNodePort
   639  		}
   640  	}
   641  }
   642  
   643  func needsClusterIP(svc *api.Service) bool {
   644  	if svc.Spec.Type == api.ServiceTypeExternalName {
   645  		return false
   646  	}
   647  	return true
   648  }
   649  
   650  func needsNodePort(svc *api.Service) bool {
   651  	if svc.Spec.Type == api.ServiceTypeNodePort {
   652  		return true
   653  	}
   654  	if svc.Spec.Type == api.ServiceTypeLoadBalancer &&
   655  		(svc.Spec.AllocateLoadBalancerNodePorts == nil || *svc.Spec.AllocateLoadBalancerNodePorts) {
   656  		return true
   657  	}
   658  	return false
   659  }
   660  
   661  func needsHCNodePort(svc *api.Service) bool {
   662  	if svc.Spec.Type != api.ServiceTypeLoadBalancer {
   663  		return false
   664  	}
   665  	if svc.Spec.ExternalTrafficPolicy != api.ServiceExternalTrafficPolicyLocal {
   666  		return false
   667  	}
   668  	return true
   669  }
   670  

View as plain text