...

Source file src/k8s.io/kubernetes/pkg/proxy/topology.go

Documentation: k8s.io/kubernetes/pkg/proxy

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package proxy
    18  
    19  import (
    20  	v1 "k8s.io/api/core/v1"
    21  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    22  	"k8s.io/klog/v2"
    23  	"k8s.io/kubernetes/pkg/features"
    24  )
    25  
    26  // CategorizeEndpoints returns:
    27  //
    28  //   - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if
    29  //     relevant). This will be nil if the service does not ever use Cluster traffic policy.
    30  //
    31  //   - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if
    32  //     relevant). This will be nil if the service does not ever use Local traffic policy.
    33  //
    34  //   - The combined list of all endpoints reachable from this node (which is the union of the
    35  //     previous two lists, but in the case where it is identical to one or the other, we avoid
    36  //     allocating a separate list).
    37  //
    38  //   - An indication of whether the service has any endpoints reachable from anywhere in the
    39  //     cluster. (This may be true even if allReachableEndpoints is empty.)
    40  func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
    41  	var useTopology, useServingTerminatingEndpoints bool
    42  
    43  	if svcInfo.UsesClusterEndpoints() {
    44  		useTopology = canUseTopology(endpoints, svcInfo, nodeLabels)
    45  		clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
    46  			if !ep.IsReady() {
    47  				return false
    48  			}
    49  			if useTopology && !availableForTopology(ep, nodeLabels) {
    50  				return false
    51  			}
    52  			return true
    53  		})
    54  
    55  		// if there are 0 cluster-wide endpoints, we can try to fallback to any terminating endpoints that are ready.
    56  		// When falling back to terminating endpoints, we do NOT consider topology aware routing since this is a best
    57  		// effort attempt to avoid dropping connections.
    58  		if len(clusterEndpoints) == 0 {
    59  			clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
    60  				if ep.IsServing() && ep.IsTerminating() {
    61  					return true
    62  				}
    63  
    64  				return false
    65  			})
    66  		}
    67  
    68  		// If there are any Ready endpoints anywhere in the cluster, we are
    69  		// guaranteed to get one in clusterEndpoints.
    70  		if len(clusterEndpoints) > 0 {
    71  			hasAnyEndpoints = true
    72  		}
    73  	}
    74  
    75  	if !svcInfo.UsesLocalEndpoints() {
    76  		allReachableEndpoints = clusterEndpoints
    77  		return
    78  	}
    79  
    80  	// Pre-scan the endpoints, to figure out which type of endpoint Local
    81  	// traffic policy will use, and also to see if there are any usable
    82  	// endpoints anywhere in the cluster.
    83  	var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
    84  	for _, ep := range endpoints {
    85  		if ep.IsReady() {
    86  			hasAnyEndpoints = true
    87  			if ep.IsLocal() {
    88  				hasLocalReadyEndpoints = true
    89  			}
    90  		} else if ep.IsServing() && ep.IsTerminating() {
    91  			hasAnyEndpoints = true
    92  			if ep.IsLocal() {
    93  				hasLocalServingTerminatingEndpoints = true
    94  			}
    95  		}
    96  	}
    97  
    98  	if hasLocalReadyEndpoints {
    99  		localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
   100  			return ep.IsLocal() && ep.IsReady()
   101  		})
   102  	} else if hasLocalServingTerminatingEndpoints {
   103  		useServingTerminatingEndpoints = true
   104  		localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
   105  			return ep.IsLocal() && ep.IsServing() && ep.IsTerminating()
   106  		})
   107  	}
   108  
   109  	if !svcInfo.UsesClusterEndpoints() {
   110  		allReachableEndpoints = localEndpoints
   111  		return
   112  	}
   113  
   114  	if !useTopology && !useServingTerminatingEndpoints {
   115  		// !useServingTerminatingEndpoints means that localEndpoints contains only
   116  		// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
   117  		// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
   118  		allReachableEndpoints = clusterEndpoints
   119  		return
   120  	}
   121  
   122  	// clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while
   123  	// localEndpoints may contain terminating or topologically-unavailable local endpoints
   124  	// that aren't in clusterEndpoints. So we have to merge the two lists.
   125  	endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
   126  	for _, ep := range clusterEndpoints {
   127  		endpointsMap[ep.String()] = ep
   128  	}
   129  	for _, ep := range localEndpoints {
   130  		endpointsMap[ep.String()] = ep
   131  	}
   132  	allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
   133  	for _, ep := range endpointsMap {
   134  		allReachableEndpoints = append(allReachableEndpoints, ep)
   135  	}
   136  
   137  	return
   138  }
   139  
   140  // canUseTopology returns true if topology aware routing is enabled and properly
   141  // configured in this cluster. That is, it checks that:
   142  //   - The TopologyAwareHints or ServiceTrafficDistribution feature is enabled.
   143  //   - If ServiceTrafficDistribution feature gate is not enabled, then the
   144  //     hintsAnnotation should represent an enabled value.
   145  //   - The node's labels include "topology.kubernetes.io/zone".
   146  //   - All of the endpoints for this Service have a topology hint.
   147  //   - At least one endpoint for this Service is hinted for this node's zone.
   148  func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
   149  	if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) && !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
   150  		return false
   151  	}
   152  
   153  	// Ignore value of hintsAnnotation if the ServiceTrafficDistribution feature
   154  	// gate is enabled.
   155  	if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
   156  		// If the hintsAnnotation has a disabled value, we do not consider hints for route programming.
   157  		hintsAnnotation := svcInfo.HintsAnnotation()
   158  		if hintsAnnotation == "" || hintsAnnotation == "disabled" || hintsAnnotation == "Disabled" {
   159  			return false
   160  		}
   161  	}
   162  
   163  	zone, ok := nodeLabels[v1.LabelTopologyZone]
   164  	if !ok || zone == "" {
   165  		klog.V(2).InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
   166  		return false
   167  	}
   168  
   169  	hasEndpointForZone := false
   170  	for _, endpoint := range endpoints {
   171  		if !endpoint.IsReady() {
   172  			continue
   173  		}
   174  		if endpoint.ZoneHints().Len() == 0 {
   175  			klog.V(2).InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint", "endpoint", endpoint)
   176  			return false
   177  		}
   178  
   179  		if endpoint.ZoneHints().Has(zone) {
   180  			hasEndpointForZone = true
   181  		}
   182  	}
   183  
   184  	if !hasEndpointForZone {
   185  		klog.V(2).InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
   186  		return false
   187  	}
   188  
   189  	return true
   190  }
   191  
   192  // availableForTopology checks if this endpoint is available for use on this node, given
   193  // topology constraints. (It assumes that canUseTopology() returned true.)
   194  func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
   195  	zone := nodeLabels[v1.LabelTopologyZone]
   196  	return endpoint.ZoneHints().Has(zone)
   197  }
   198  
   199  // filterEndpoints filters endpoints according to predicate
   200  func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
   201  	filteredEndpoints := make([]Endpoint, 0, len(endpoints))
   202  
   203  	for _, ep := range endpoints {
   204  		if predicate(ep) {
   205  			filteredEndpoints = append(filteredEndpoints, ep)
   206  		}
   207  	}
   208  
   209  	return filteredEndpoints
   210  }
   211  

View as plain text