...

Source file src/k8s.io/kubernetes/pkg/proxy/endpointslicecache.go

Documentation: k8s.io/kubernetes/pkg/proxy

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package proxy
    18  
    19  import (
    20  	"fmt"
    21  	"reflect"
    22  	"sort"
    23  	"strings"
    24  	"sync"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	discovery "k8s.io/api/discovery/v1"
    28  	"k8s.io/apimachinery/pkg/types"
    29  	"k8s.io/apimachinery/pkg/util/sets"
    30  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    31  	"k8s.io/client-go/tools/events"
    32  	"k8s.io/klog/v2"
    33  	"k8s.io/kubernetes/pkg/features"
    34  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    35  	utilnet "k8s.io/utils/net"
    36  )
    37  
    38  // EndpointSliceCache is used as a cache of EndpointSlice information.
    39  type EndpointSliceCache struct {
    40  	// lock protects trackerByServiceMap.
    41  	lock sync.Mutex
    42  
    43  	// trackerByServiceMap is the basis of this cache. It contains endpoint
    44  	// slice trackers grouped by service name and endpoint slice name. The first
    45  	// key represents a namespaced service name while the second key represents
    46  	// an endpoint slice name. Since endpoints can move between slices, we
    47  	// require slice specific caching to prevent endpoints being removed from
    48  	// the cache when they may have just moved to a different slice.
    49  	trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker
    50  
    51  	makeEndpointInfo makeEndpointFunc
    52  	hostname         string
    53  	ipFamily         v1.IPFamily
    54  	recorder         events.EventRecorder
    55  }
    56  
    57  // endpointSliceTracker keeps track of EndpointSlices as they have been applied
    58  // by a proxier along with any pending EndpointSlices that have been updated
    59  // in this cache but not yet applied by a proxier.
    60  type endpointSliceTracker struct {
    61  	applied endpointSliceDataByName
    62  	pending endpointSliceDataByName
    63  }
    64  
    65  // endpointSliceDataByName groups endpointSliceData by the names of the
    66  // corresponding EndpointSlices.
    67  type endpointSliceDataByName map[string]*endpointSliceData
    68  
    69  // endpointSliceData contains just the attributes kube-proxy cares about.
    70  // Used for caching. Intentionally small to limit memory util.
    71  type endpointSliceData struct {
    72  	Ports     []discovery.EndpointPort
    73  	Endpoints []*endpointData
    74  	Remove    bool
    75  }
    76  
    77  // endpointData contains just the attributes kube-proxy cares about.
    78  // Used for caching. Intentionally small to limit memory util.
    79  // Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints.
    80  type endpointData struct {
    81  	Addresses []string
    82  	NodeName  *string
    83  	Zone      *string
    84  	ZoneHints sets.Set[string]
    85  
    86  	Ready       bool
    87  	Serving     bool
    88  	Terminating bool
    89  }
    90  
    91  // NewEndpointSliceCache initializes an EndpointSliceCache.
    92  func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
    93  	if makeEndpointInfo == nil {
    94  		makeEndpointInfo = standardEndpointInfo
    95  	}
    96  	return &EndpointSliceCache{
    97  		trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
    98  		hostname:            hostname,
    99  		ipFamily:            ipFamily,
   100  		makeEndpointInfo:    makeEndpointInfo,
   101  		recorder:            recorder,
   102  	}
   103  }
   104  
   105  // newEndpointSliceTracker initializes an endpointSliceTracker.
   106  func newEndpointSliceTracker() *endpointSliceTracker {
   107  	return &endpointSliceTracker{
   108  		applied: endpointSliceDataByName{},
   109  		pending: endpointSliceDataByName{},
   110  	}
   111  }
   112  
   113  // newEndpointSliceData generates endpointSliceData from an EndpointSlice.
   114  func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData {
   115  	esData := &endpointSliceData{
   116  		Ports:     make([]discovery.EndpointPort, len(endpointSlice.Ports)),
   117  		Endpoints: []*endpointData{},
   118  		Remove:    remove,
   119  	}
   120  
   121  	// copy here to avoid mutating shared EndpointSlice object.
   122  	copy(esData.Ports, endpointSlice.Ports)
   123  	sort.Sort(byPort(esData.Ports))
   124  
   125  	if !remove {
   126  		for _, endpoint := range endpointSlice.Endpoints {
   127  			epData := &endpointData{
   128  				Addresses: endpoint.Addresses,
   129  				Zone:      endpoint.Zone,
   130  				NodeName:  endpoint.NodeName,
   131  
   132  				// conditions
   133  				Ready:       endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready,
   134  				Serving:     endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving,
   135  				Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating,
   136  			}
   137  
   138  			if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
   139  				if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
   140  					epData.ZoneHints = sets.New[string]()
   141  					for _, zone := range endpoint.Hints.ForZones {
   142  						epData.ZoneHints.Insert(zone.Name)
   143  					}
   144  				}
   145  			}
   146  
   147  			esData.Endpoints = append(esData.Endpoints, epData)
   148  		}
   149  
   150  		sort.Sort(byAddress(esData.Endpoints))
   151  	}
   152  
   153  	return esData
   154  }
   155  
   156  // standardEndpointInfo is the default makeEndpointFunc.
   157  func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint {
   158  	return ep
   159  }
   160  
   161  // updatePending updates a pending slice in the cache.
   162  func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
   163  	serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
   164  	if err != nil {
   165  		klog.ErrorS(err, "Error getting endpoint slice cache keys")
   166  		return false
   167  	}
   168  
   169  	esData := newEndpointSliceData(endpointSlice, remove)
   170  
   171  	cache.lock.Lock()
   172  	defer cache.lock.Unlock()
   173  
   174  	if _, ok := cache.trackerByServiceMap[serviceKey]; !ok {
   175  		cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
   176  	}
   177  
   178  	changed := cache.esDataChanged(serviceKey, sliceKey, esData)
   179  
   180  	if changed {
   181  		cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esData
   182  	}
   183  
   184  	return changed
   185  }
   186  
   187  // checkoutChanges returns a map of all endpointsChanges that are
   188  // pending and then marks them as applied.
   189  func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*endpointsChange {
   190  	changes := make(map[types.NamespacedName]*endpointsChange)
   191  
   192  	cache.lock.Lock()
   193  	defer cache.lock.Unlock()
   194  
   195  	for serviceNN, esTracker := range cache.trackerByServiceMap {
   196  		if len(esTracker.pending) == 0 {
   197  			continue
   198  		}
   199  
   200  		change := &endpointsChange{}
   201  
   202  		change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
   203  
   204  		for name, sliceData := range esTracker.pending {
   205  			if sliceData.Remove {
   206  				delete(esTracker.applied, name)
   207  			} else {
   208  				esTracker.applied[name] = sliceData
   209  			}
   210  
   211  			delete(esTracker.pending, name)
   212  		}
   213  
   214  		change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
   215  		changes[serviceNN] = change
   216  	}
   217  
   218  	return changes
   219  }
   220  
   221  // spToEndpointMap stores groups Endpoint objects by ServicePortName and
   222  // endpoint string (returned by Endpoint.String()).
   223  type spToEndpointMap map[ServicePortName]map[string]Endpoint
   224  
   225  // getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices.
   226  func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) EndpointsMap {
   227  	endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceDataByName)
   228  	return endpointsMapFromEndpointInfo(endpointInfoBySP)
   229  }
   230  
   231  // endpointInfoByServicePort groups endpoint info by service port name and address.
   232  func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) spToEndpointMap {
   233  	endpointInfoBySP := spToEndpointMap{}
   234  
   235  	for _, sliceData := range sliceDataByName {
   236  		for _, port := range sliceData.Ports {
   237  			if port.Name == nil {
   238  				klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name)
   239  				continue
   240  			}
   241  			// TODO: handle nil ports to mean "all"
   242  			if port.Port == nil || *port.Port == int32(0) {
   243  				klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", *port.Name)
   244  				continue
   245  			}
   246  
   247  			svcPortName := ServicePortName{
   248  				NamespacedName: serviceNN,
   249  				Port:           *port.Name,
   250  				Protocol:       *port.Protocol,
   251  			}
   252  
   253  			endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints)
   254  		}
   255  	}
   256  
   257  	return endpointInfoBySP
   258  }
   259  
   260  // addEndpoints adds endpointInfo for each unique endpoint.
   261  func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint {
   262  	if endpointSet == nil {
   263  		endpointSet = map[string]Endpoint{}
   264  	}
   265  
   266  	// iterate through endpoints to add them to endpointSet.
   267  	for _, endpoint := range endpoints {
   268  		if len(endpoint.Addresses) == 0 {
   269  			klog.ErrorS(nil, "Ignoring invalid endpoint port with empty address", "endpoint", endpoint)
   270  			continue
   271  		}
   272  
   273  		// Filter out the incorrect IP version case. Any endpoint port that
   274  		// contains incorrect IP version will be ignored.
   275  		if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
   276  			// Emit event on the corresponding service which had a different IP
   277  			// version than the endpoint.
   278  			proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
   279  			continue
   280  		}
   281  
   282  		isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
   283  
   284  		endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal,
   285  			endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints)
   286  
   287  		// This logic ensures we're deduplicating potential overlapping endpoints
   288  		// isLocal should not vary between matching endpoints, but if it does, we
   289  		// favor a true value here if it exists.
   290  		if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal {
   291  			endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo, svcPortName)
   292  		}
   293  	}
   294  
   295  	return endpointSet
   296  }
   297  
   298  func (cache *EndpointSliceCache) isLocal(hostname string) bool {
   299  	return len(cache.hostname) > 0 && hostname == cache.hostname
   300  }
   301  
   302  // esDataChanged returns true if the esData parameter should be set as a new
   303  // pending value in the cache.
   304  func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName, sliceKey string, esData *endpointSliceData) bool {
   305  	if _, ok := cache.trackerByServiceMap[serviceKey]; ok {
   306  		appliedData, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
   307  		pendingData, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
   308  
   309  		// If there's already a pending value, return whether or not this would
   310  		// change that.
   311  		if pendingOk {
   312  			return !reflect.DeepEqual(esData, pendingData)
   313  		}
   314  
   315  		// If there's already an applied value, return whether or not this would
   316  		// change that.
   317  		if appliedOk {
   318  			return !reflect.DeepEqual(esData, appliedData)
   319  		}
   320  	}
   321  
   322  	// If this is marked for removal and does not exist in the cache, no changes
   323  	// are necessary.
   324  	if esData.Remove {
   325  		return false
   326  	}
   327  
   328  	// If not in the cache, and not marked for removal, it should be added.
   329  	return true
   330  }
   331  
   332  // endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
   333  // has been grouped by service port and IP.
   334  func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
   335  	endpointsMap := EndpointsMap{}
   336  
   337  	// transform endpointInfoByServicePort into an endpointsMap with sorted IPs.
   338  	for svcPortName, endpointSet := range endpointInfoBySP {
   339  		if len(endpointSet) > 0 {
   340  			endpointsMap[svcPortName] = []Endpoint{}
   341  			for _, endpointInfo := range endpointSet {
   342  				endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo)
   343  
   344  			}
   345  			// Ensure endpoints are always returned in the same order to simplify diffing.
   346  			sort.Sort(byEndpoint(endpointsMap[svcPortName]))
   347  
   348  			klog.V(3).InfoS("Setting endpoints for service port name", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName]))
   349  		}
   350  	}
   351  
   352  	return endpointsMap
   353  }
   354  
   355  // formatEndpointsList returns a string list converted from an endpoints list.
   356  func formatEndpointsList(endpoints []Endpoint) []string {
   357  	var formattedList []string
   358  	for _, ep := range endpoints {
   359  		formattedList = append(formattedList, ep.String())
   360  	}
   361  	return formattedList
   362  }
   363  
   364  // endpointSliceCacheKeys returns cache keys used for a given EndpointSlice.
   365  func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) {
   366  	var err error
   367  	serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
   368  	if !ok || serviceName == "" {
   369  		err = fmt.Errorf("no %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name)
   370  	} else if endpointSlice.Namespace == "" || endpointSlice.Name == "" {
   371  		err = fmt.Errorf("expected EndpointSlice name and namespace to be set: %v", endpointSlice)
   372  	}
   373  	return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
   374  }
   375  
   376  // byAddress helps sort endpointData
   377  type byAddress []*endpointData
   378  
   379  func (e byAddress) Len() int {
   380  	return len(e)
   381  }
   382  func (e byAddress) Swap(i, j int) {
   383  	e[i], e[j] = e[j], e[i]
   384  }
   385  func (e byAddress) Less(i, j int) bool {
   386  	return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
   387  }
   388  
   389  // byEndpoint helps sort endpoints by endpoint string.
   390  type byEndpoint []Endpoint
   391  
   392  func (e byEndpoint) Len() int {
   393  	return len(e)
   394  }
   395  func (e byEndpoint) Swap(i, j int) {
   396  	e[i], e[j] = e[j], e[i]
   397  }
   398  func (e byEndpoint) Less(i, j int) bool {
   399  	return e[i].String() < e[j].String()
   400  }
   401  
   402  // byPort helps sort EndpointSlice ports by port number
   403  type byPort []discovery.EndpointPort
   404  
   405  func (p byPort) Len() int {
   406  	return len(p)
   407  }
   408  func (p byPort) Swap(i, j int) {
   409  	p[i], p[j] = p[j], p[i]
   410  }
   411  func (p byPort) Less(i, j int) bool {
   412  	return *p[i].Port < *p[j].Port
   413  }
   414  

View as plain text