...

Source file src/k8s.io/kubernetes/pkg/controller/endpointslicemirroring/reconciler.go

Documentation: k8s.io/kubernetes/pkg/controller/endpointslicemirroring

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package endpointslicemirroring
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	corev1 "k8s.io/api/core/v1"
    24  	discovery "k8s.io/api/discovery/v1"
    25  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    26  	"k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/types"
    29  	clientset "k8s.io/client-go/kubernetes"
    30  	"k8s.io/client-go/tools/record"
    31  	endpointsliceutil "k8s.io/endpointslice/util"
    32  	"k8s.io/klog/v2"
    33  	endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
    34  	"k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
    35  )
    36  
    37  // reconciler is responsible for transforming current EndpointSlice state into
    38  // desired state
    39  type reconciler struct {
    40  	client clientset.Interface
    41  
    42  	// endpointSliceTracker tracks the list of EndpointSlices and associated
    43  	// resource versions expected for each Endpoints resource. It can help
    44  	// determine if a cached EndpointSlice is out of date.
    45  	endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
    46  
    47  	// eventRecorder allows reconciler to record an event if it finds an invalid
    48  	// IP address in an Endpoints resource.
    49  	eventRecorder record.EventRecorder
    50  
    51  	// maxEndpointsPerSubset references the maximum number of endpoints that
    52  	// should be added to an EndpointSlice for an EndpointSubset. This allows
    53  	// for a simple 1:1 mapping between EndpointSubset and EndpointSlice.
    54  	maxEndpointsPerSubset int32
    55  
    56  	// metricsCache tracks values for total numbers of desired endpoints as well
    57  	// as the efficiency of EndpointSlice endpoints distribution
    58  	metricsCache *metrics.Cache
    59  }
    60  
    61  // reconcile takes an Endpoints resource and ensures that corresponding
    62  // EndpointSlices exist. It creates, updates, or deletes EndpointSlices to
    63  // ensure the desired set of addresses are represented by EndpointSlices.
    64  func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) error {
    65  	// Calculate desired state.
    66  	d := newDesiredCalc()
    67  
    68  	numInvalidAddresses := 0
    69  	addressesSkipped := 0
    70  
    71  	// canonicalize the Endpoints subsets before processing them
    72  	subsets := endpointsv1.RepackSubsets(endpoints.Subsets)
    73  	for _, subset := range subsets {
    74  		multiKey := d.initPorts(subset.Ports)
    75  
    76  		totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses)
    77  		totalAddressesAdded := 0
    78  
    79  		for _, address := range subset.Addresses {
    80  			// Break if we've reached the max number of addresses to mirror
    81  			// per EndpointSubset. This allows for a simple 1:1 mapping between
    82  			// EndpointSubset and EndpointSlice.
    83  			if totalAddressesAdded >= int(r.maxEndpointsPerSubset) {
    84  				break
    85  			}
    86  			if ok := d.addAddress(address, multiKey, true); ok {
    87  				totalAddressesAdded++
    88  			} else {
    89  				numInvalidAddresses++
    90  				logger.Info("Address in Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice", "endpoints", klog.KObj(endpoints), "IP", address.IP)
    91  			}
    92  		}
    93  
    94  		for _, address := range subset.NotReadyAddresses {
    95  			// Break if we've reached the max number of addresses to mirror
    96  			// per EndpointSubset. This allows for a simple 1:1 mapping between
    97  			// EndpointSubset and EndpointSlice.
    98  			if totalAddressesAdded >= int(r.maxEndpointsPerSubset) {
    99  				break
   100  			}
   101  			if ok := d.addAddress(address, multiKey, false); ok {
   102  				totalAddressesAdded++
   103  			} else {
   104  				numInvalidAddresses++
   105  				logger.Info("Address in Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice", "endpoints", klog.KObj(endpoints), "IP", address.IP)
   106  			}
   107  		}
   108  
   109  		addressesSkipped += totalAddresses - totalAddressesAdded
   110  	}
   111  
   112  	// This metric includes addresses skipped for being invalid or exceeding
   113  	// MaxEndpointsPerSubset.
   114  	metrics.AddressesSkippedPerSync.WithLabelValues().Observe(float64(addressesSkipped))
   115  
   116  	// Record an event on the Endpoints resource if we skipped mirroring for any
   117  	// invalid IP addresses.
   118  	if numInvalidAddresses > 0 {
   119  		r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, InvalidIPAddress,
   120  			"Skipped %d invalid IP addresses when mirroring to EndpointSlices", numInvalidAddresses)
   121  	}
   122  
   123  	// Record a separate event if we skipped mirroring due to the number of
   124  	// addresses exceeding MaxEndpointsPerSubset.
   125  	if addressesSkipped > numInvalidAddresses {
   126  		logger.Info("Addresses in Endpoints were skipped due to exceeding MaxEndpointsPerSubset", "skippedAddresses", addressesSkipped, "endpoints", klog.KObj(endpoints))
   127  		r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, TooManyAddressesToMirror,
   128  			"A max of %d addresses can be mirrored to EndpointSlices per Endpoints subset. %d addresses were skipped", r.maxEndpointsPerSubset, addressesSkipped)
   129  	}
   130  
   131  	// Build data structures for existing state.
   132  	existingSlicesByKey := endpointSlicesByKey(existingSlices)
   133  
   134  	// Determine changes necessary for each group of slices by port map.
   135  	epMetrics := metrics.NewEndpointPortCache()
   136  	totals := totalsByAction{}
   137  	slices := slicesByAction{}
   138  
   139  	for portKey, desiredEndpoints := range d.endpointsByKey {
   140  		numEndpoints := len(desiredEndpoints)
   141  		pmSlices, pmTotals := r.reconcileByPortMapping(
   142  			endpoints, existingSlicesByKey[portKey], desiredEndpoints, d.portsByKey[portKey], portKey.addressType())
   143  
   144  		slices.append(pmSlices)
   145  		totals.add(pmTotals)
   146  
   147  		epMetrics.Set(endpointsliceutil.PortMapKey(portKey), metrics.EfficiencyInfo{
   148  			Endpoints: numEndpoints,
   149  			Slices:    len(existingSlicesByKey[portKey]) + len(pmSlices.toCreate) - len(pmSlices.toDelete),
   150  		})
   151  	}
   152  
   153  	// If there are unique sets of ports that are no longer desired, mark
   154  	// the corresponding endpoint slices for deletion.
   155  	for portKey, existingSlices := range existingSlicesByKey {
   156  		if _, ok := d.endpointsByKey[portKey]; !ok {
   157  			for _, existingSlice := range existingSlices {
   158  				slices.toDelete = append(slices.toDelete, existingSlice)
   159  			}
   160  		}
   161  	}
   162  
   163  	metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totals.added))
   164  	metrics.EndpointsUpdatedPerSync.WithLabelValues().Observe(float64(totals.updated))
   165  	metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totals.removed))
   166  
   167  	endpointsNN := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
   168  	r.metricsCache.UpdateEndpointPortCache(endpointsNN, epMetrics)
   169  
   170  	return r.finalize(endpoints, slices)
   171  }
   172  
   173  // reconcileByPortMapping compares the endpoints found in existing slices with
   174  // the list of desired endpoints and returns lists of slices to create, update,
   175  // and delete.
   176  func (r *reconciler) reconcileByPortMapping(
   177  	endpoints *corev1.Endpoints,
   178  	existingSlices []*discovery.EndpointSlice,
   179  	desiredSet endpointsliceutil.EndpointSet,
   180  	endpointPorts []discovery.EndpointPort,
   181  	addressType discovery.AddressType,
   182  ) (slicesByAction, totalsByAction) {
   183  	slices := slicesByAction{}
   184  	totals := totalsByAction{}
   185  
   186  	// If no endpoints are desired, mark existing slices for deletion and
   187  	// return.
   188  	if desiredSet.Len() == 0 {
   189  		slices.toDelete = existingSlices
   190  		for _, epSlice := range existingSlices {
   191  			totals.removed += len(epSlice.Endpoints)
   192  		}
   193  		return slices, totals
   194  	}
   195  
   196  	if len(existingSlices) == 0 {
   197  		// if no existing slices, all desired endpoints will be added.
   198  		totals.added = desiredSet.Len()
   199  	} else {
   200  		// if >0 existing slices, mark all but 1 for deletion.
   201  		slices.toDelete = existingSlices[1:]
   202  
   203  		// generated slices must mirror all endpoints annotations but EndpointsLastChangeTriggerTime and LastAppliedConfigAnnotation
   204  		compareAnnotations := cloneAndRemoveKeys(endpoints.Annotations, corev1.EndpointsLastChangeTriggerTime, corev1.LastAppliedConfigAnnotation)
   205  		compareLabels := cloneAndRemoveKeys(existingSlices[0].Labels, discovery.LabelManagedBy, discovery.LabelServiceName)
   206  		// Return early if first slice matches desired endpoints, labels and annotations
   207  		totals = totalChanges(existingSlices[0], desiredSet)
   208  		if totals.added == 0 && totals.updated == 0 && totals.removed == 0 &&
   209  			apiequality.Semantic.DeepEqual(endpoints.Labels, compareLabels) &&
   210  			apiequality.Semantic.DeepEqual(compareAnnotations, existingSlices[0].Annotations) {
   211  			return slices, totals
   212  		}
   213  	}
   214  
   215  	// generate a new slice with the desired endpoints.
   216  	var sliceName string
   217  	if len(existingSlices) > 0 {
   218  		sliceName = existingSlices[0].Name
   219  	}
   220  	newSlice := newEndpointSlice(endpoints, endpointPorts, addressType, sliceName)
   221  	for desiredSet.Len() > 0 && len(newSlice.Endpoints) < int(r.maxEndpointsPerSubset) {
   222  		endpoint, _ := desiredSet.PopAny()
   223  		newSlice.Endpoints = append(newSlice.Endpoints, *endpoint)
   224  	}
   225  
   226  	if newSlice.Name != "" {
   227  		slices.toUpdate = []*discovery.EndpointSlice{newSlice}
   228  	} else { // Slices to be created set GenerateName instead of Name.
   229  		slices.toCreate = []*discovery.EndpointSlice{newSlice}
   230  	}
   231  
   232  	return slices, totals
   233  }
   234  
   235  // finalize creates, updates, and deletes slices as specified
   236  func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction) error {
   237  	// If there are slices to create and delete, recycle the slices marked for
   238  	// deletion by replacing creates with updates of slices that would otherwise
   239  	// be deleted.
   240  	recycleSlices(&slices)
   241  
   242  	epsClient := r.client.DiscoveryV1().EndpointSlices(endpoints.Namespace)
   243  
   244  	// Don't create more EndpointSlices if corresponding Endpoints resource is
   245  	// being deleted.
   246  	if endpoints.DeletionTimestamp == nil {
   247  		for _, endpointSlice := range slices.toCreate {
   248  			createdSlice, err := epsClient.Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
   249  			if err != nil {
   250  				// If the namespace is terminating, creates will continue to fail. Simply drop the item.
   251  				if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
   252  					return nil
   253  				}
   254  				return fmt.Errorf("failed to create EndpointSlice for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err)
   255  			}
   256  			r.endpointSliceTracker.Update(createdSlice)
   257  			metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
   258  		}
   259  	}
   260  
   261  	for _, endpointSlice := range slices.toUpdate {
   262  		updatedSlice, err := epsClient.Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
   263  		if err != nil {
   264  			return fmt.Errorf("failed to update %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
   265  		}
   266  		r.endpointSliceTracker.Update(updatedSlice)
   267  		metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
   268  	}
   269  
   270  	for _, endpointSlice := range slices.toDelete {
   271  		err := epsClient.Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
   272  		if err != nil {
   273  			return fmt.Errorf("failed to delete %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
   274  		}
   275  		r.endpointSliceTracker.ExpectDeletion(endpointSlice)
   276  		metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
   277  	}
   278  
   279  	return nil
   280  }
   281  
   282  // deleteEndpoints deletes any associated EndpointSlices and cleans up any
   283  // Endpoints references from the metricsCache.
   284  func (r *reconciler) deleteEndpoints(namespace, name string, endpointSlices []*discovery.EndpointSlice) error {
   285  	r.metricsCache.DeleteEndpoints(types.NamespacedName{Namespace: namespace, Name: name})
   286  	var errs []error
   287  	for _, endpointSlice := range endpointSlices {
   288  		err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
   289  		if err != nil {
   290  			errs = append(errs, err)
   291  		}
   292  	}
   293  	if len(errs) > 0 {
   294  		return fmt.Errorf("error(s) deleting %d/%d EndpointSlices for %s/%s Endpoints, including: %s", len(errs), len(endpointSlices), namespace, name, errs[0])
   295  	}
   296  	return nil
   297  }
   298  
   299  // endpointSlicesByKey returns a map that groups EndpointSlices by unique
   300  // addrTypePortMapKey values.
   301  func endpointSlicesByKey(existingSlices []*discovery.EndpointSlice) map[addrTypePortMapKey][]*discovery.EndpointSlice {
   302  	slicesByKey := map[addrTypePortMapKey][]*discovery.EndpointSlice{}
   303  	for _, existingSlice := range existingSlices {
   304  		epKey := newAddrTypePortMapKey(existingSlice.Ports, existingSlice.AddressType)
   305  		slicesByKey[epKey] = append(slicesByKey[epKey], existingSlice)
   306  	}
   307  	return slicesByKey
   308  }
   309  
   310  // totalChanges returns the total changes that will be required for an
   311  // EndpointSlice to match a desired set of endpoints.
   312  func totalChanges(existingSlice *discovery.EndpointSlice, desiredSet endpointsliceutil.EndpointSet) totalsByAction {
   313  	totals := totalsByAction{}
   314  	existingMatches := 0
   315  
   316  	for _, endpoint := range existingSlice.Endpoints {
   317  		got := desiredSet.Get(&endpoint)
   318  		if got == nil {
   319  			// If not desired, increment number of endpoints to be deleted.
   320  			totals.removed++
   321  		} else {
   322  			existingMatches++
   323  
   324  			// If existing version of endpoint doesn't match desired version
   325  			// increment number of endpoints to be updated.
   326  			if !endpointsliceutil.EndpointsEqualBeyondHash(got, &endpoint) {
   327  				totals.updated++
   328  			}
   329  		}
   330  	}
   331  
   332  	// Any desired endpoints that have not been found in the existing slice will
   333  	// be added.
   334  	totals.added = desiredSet.Len() - existingMatches
   335  	return totals
   336  }
   337  

View as plain text