
Source file src/k8s.io/kubernetes/pkg/proxy/endpointschangetracker.go

Documentation: k8s.io/kubernetes/pkg/proxy

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package proxy
    19  import (
    20  	"sync"
    21  	"time"
    23  	v1 "k8s.io/api/core/v1"
    24  	discovery "k8s.io/api/discovery/v1"
    25  	"k8s.io/apimachinery/pkg/types"
    26  	"k8s.io/apimachinery/pkg/util/sets"
    27  	"k8s.io/client-go/tools/events"
    28  	"k8s.io/klog/v2"
    29  	"k8s.io/kubernetes/pkg/proxy/metrics"
    30  )
    32  var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
    33  	discovery.AddressTypeIPv4,
    34  	discovery.AddressTypeIPv6,
    35  )
    37  // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
    38  // Endpoints, keyed by their namespace and name.
    39  type EndpointsChangeTracker struct {
    40  	// lock protects lastChangeTriggerTimes
    41  	lock sync.Mutex
    43  	// processEndpointsMapChange is invoked by the apply function on every change.
    44  	// This function should not modify the EndpointsMaps, but just use the changes for
    45  	// any Proxier-specific cleanup.
    46  	processEndpointsMapChange processEndpointsMapChangeFunc
    48  	// endpointSliceCache holds a simplified version of endpoint slices.
    49  	endpointSliceCache *EndpointSliceCache
    51  	// lastChangeTriggerTimes maps from the Service's NamespacedName to the times of
    52  	// the triggers that caused its EndpointSlice objects to change. Used to calculate
    53  	// the network-programming-latency metric.
    54  	lastChangeTriggerTimes map[types.NamespacedName][]time.Time
    55  	// trackerStartTime is the time when the EndpointsChangeTracker was created, so
    56  	// we can avoid generating network-programming-latency metrics for changes that
    57  	// occurred before that.
    58  	trackerStartTime time.Time
    59  }
    61  type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
    62  type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
    64  // NewEndpointsChangeTracker initializes an EndpointsChangeTracker
    65  func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
    66  	return &EndpointsChangeTracker{
    67  		lastChangeTriggerTimes:    make(map[types.NamespacedName][]time.Time),
    68  		trackerStartTime:          time.Now(),
    69  		processEndpointsMapChange: processEndpointsMapChange,
    70  		endpointSliceCache:        NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
    71  	}
    72  }
    74  // EndpointSliceUpdate updates the EndpointsChangeTracker by adding/updating or removing
    75  // endpointSlice (depending on removeSlice). It returns true if this update contained a
    76  // change that needs to be synced; note that this is different from the return value of
    77  // ServiceChangeTracker.Update().
    78  func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
    79  	if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
    80  		klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
    81  		return false
    82  	}
    84  	// This should never happen
    85  	if endpointSlice == nil {
    86  		klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
    87  		return false
    88  	}
    90  	namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
    91  	if err != nil {
    92  		klog.InfoS("Error getting endpoint slice cache keys", "err", err)
    93  		return false
    94  	}
    96  	metrics.EndpointChangesTotal.Inc()
    98  	ect.lock.Lock()
    99  	defer ect.lock.Unlock()
   101  	changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
   103  	if changeNeeded {
   104  		metrics.EndpointChangesPending.Inc()
   105  		// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
   106  		// by-definition coming from the time of last update, which is not what
   107  		// we want to measure. So we simply ignore it in this cases.
   108  		// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
   109  		// when other EndpointSlice for that service still exist.
   110  		if removeSlice {
   111  			delete(ect.lastChangeTriggerTimes, namespacedName)
   112  		} else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
   113  			ect.lastChangeTriggerTimes[namespacedName] =
   114  				append(ect.lastChangeTriggerTimes[namespacedName], t)
   115  		}
   116  	}
   118  	return changeNeeded
   119  }
   121  // checkoutChanges returns a map of pending endpointsChanges and marks them as
   122  // applied.
   123  func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
   124  	metrics.EndpointChangesPending.Set(0)
   126  	return ect.endpointSliceCache.checkoutChanges()
   127  }
   129  // checkoutTriggerTimes applies the locally cached trigger times to a map of
   130  // trigger times that have been passed in and empties the local cache.
   131  func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
   132  	ect.lock.Lock()
   133  	defer ect.lock.Unlock()
   135  	for k, v := range ect.lastChangeTriggerTimes {
   136  		prev, ok := (*lastChangeTriggerTimes)[k]
   137  		if !ok {
   138  			(*lastChangeTriggerTimes)[k] = v
   139  		} else {
   140  			(*lastChangeTriggerTimes)[k] = append(prev, v...)
   141  		}
   142  	}
   143  	ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
   144  }
   146  // getLastChangeTriggerTime returns the time.Time value of the
   147  // EndpointsLastChangeTriggerTime annotation stored in the given endpoints
   148  // object or the "zero" time if the annotation wasn't set or was set
   149  // incorrectly.
   150  func getLastChangeTriggerTime(annotations map[string]string) time.Time {
   151  	// TODO(#81360): ignore case when Endpoint is deleted.
   152  	if _, ok := annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
   153  		// It's possible that the Endpoints object won't have the
   154  		// EndpointsLastChangeTriggerTime annotation set. In that case return
   155  		// the 'zero value', which is ignored in the upstream code.
   156  		return time.Time{}
   157  	}
   158  	val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime])
   159  	if err != nil {
   160  		klog.ErrorS(err, "Error while parsing EndpointsLastChangeTriggerTimeAnnotation",
   161  			"value", annotations[v1.EndpointsLastChangeTriggerTime])
   162  		// In case of error val = time.Zero, which is ignored in the upstream code.
   163  	}
   164  	return val
   165  }
   167  // endpointsChange contains all changes to endpoints that happened since proxy
   168  // rules were synced.  For a single object, changes are accumulated, i.e.
   169  // previous is state from before applying the changes, current is state after
   170  // applying the changes.
   171  type endpointsChange struct {
   172  	previous EndpointsMap
   173  	current  EndpointsMap
   174  }
   176  // UpdateEndpointsMapResult is the updated results after applying endpoints changes.
   177  type UpdateEndpointsMapResult struct {
   178  	// UpdatedServices lists the names of all services with added/updated/deleted
   179  	// endpoints since the last Update.
   180  	UpdatedServices sets.Set[types.NamespacedName]
   182  	// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
   183  	// Existing conntrack NAT entries pointing to these endpoints must be deleted to
   184  	// ensure that no further traffic for the Service gets delivered to them.
   185  	DeletedUDPEndpoints []ServiceEndpoint
   187  	// NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to
   188  	// non-0 endpoints. Existing conntrack entries caching the fact that these
   189  	// services are black holes must be deleted to ensure that traffic can immediately
   190  	// begin flowing to the new endpoints.
   191  	NewlyActiveUDPServices []ServicePortName
   193  	// List of the trigger times for all endpoints objects that changed. It's used to export the
   194  	// network programming latency.
   195  	// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
   196  	LastChangeTriggerTimes map[types.NamespacedName][]time.Time
   197  }
   199  // EndpointsMap maps a service name to a list of all its Endpoints.
   200  type EndpointsMap map[ServicePortName][]Endpoint
   202  // Update updates em based on the changes in ect, returns information about the diff since
   203  // the last Update, triggers processEndpointsMapChange on every change, and clears the
   204  // changes map.
   205  func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
   206  	result := UpdateEndpointsMapResult{
   207  		UpdatedServices:        sets.New[types.NamespacedName](),
   208  		DeletedUDPEndpoints:    make([]ServiceEndpoint, 0),
   209  		NewlyActiveUDPServices: make([]ServicePortName, 0),
   210  		LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
   211  	}
   212  	if ect == nil {
   213  		return result
   214  	}
   216  	changes := ect.checkoutChanges()
   217  	for nn, change := range changes {
   218  		if ect.processEndpointsMapChange != nil {
   219  			ect.processEndpointsMapChange(change.previous, change.current)
   220  		}
   221  		result.UpdatedServices.Insert(nn)
   223  		em.unmerge(change.previous)
   224  		em.merge(change.current)
   225  		detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
   226  	}
   227  	ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes)
   229  	return result
   230  }
   232  // Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
   233  func (em EndpointsMap) merge(other EndpointsMap) {
   234  	for svcPortName := range other {
   235  		em[svcPortName] = other[svcPortName]
   236  	}
   237  }
   239  // Unmerge removes the <service, endpoints> pairs from the current EndpointsMap which are contained in the EndpointsMap passed in.
   240  func (em EndpointsMap) unmerge(other EndpointsMap) {
   241  	for svcPortName := range other {
   242  		delete(em, svcPortName)
   243  	}
   244  }
   246  // getLocalEndpointIPs returns endpoints IPs if given endpoint is local - local means the endpoint is running in same host as kube-proxy.
   247  func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.Set[string] {
   248  	localIPs := make(map[types.NamespacedName]sets.Set[string])
   249  	for svcPortName, epList := range em {
   250  		for _, ep := range epList {
   251  			// Only add ready endpoints for health checking. Terminating endpoints may still serve traffic
   252  			// but the health check signal should fail if there are only terminating endpoints on a node.
   253  			if !ep.IsReady() {
   254  				continue
   255  			}
   257  			if ep.IsLocal() {
   258  				nsn := svcPortName.NamespacedName
   259  				if localIPs[nsn] == nil {
   260  					localIPs[nsn] = sets.New[string]()
   261  				}
   262  				localIPs[nsn].Insert(ep.IP())
   263  			}
   264  		}
   265  	}
   266  	return localIPs
   267  }
   269  // LocalReadyEndpoints returns a map of Service names to the number of local ready
   270  // endpoints for that service.
   271  func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
   272  	// TODO: If this will appear to be computationally expensive, consider
   273  	// computing this incrementally similarly to endpointsMap.
   275  	// (Note that we need to call getLocalEndpointIPs first to squash the data by IP,
   276  	// because the EndpointsMap is sorted by IP+port, not just IP, and we want to
   277  	// consider a Service pointing to and to have 1 endpoint,
   278  	// not 2.)
   280  	eps := make(map[types.NamespacedName]int)
   281  	localIPs := em.getLocalReadyEndpointIPs()
   282  	for nsn, ips := range localIPs {
   283  		eps[nsn] = len(ips)
   284  	}
   285  	return eps
   286  }
   288  // detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
   289  // (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
   290  func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
   291  	// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
   292  	// are no longer sending to newEndpointsMap. The proxier should make sure that
   293  	// conntrack does not accidentally route any new connections to them.
   294  	for svcPortName, epList := range oldEndpointsMap {
   295  		if svcPortName.Protocol != v1.ProtocolUDP {
   296  			continue
   297  		}
   299  		for _, ep := range epList {
   300  			// If the old endpoint wasn't Serving then there can't be stale
   301  			// conntrack entries since there was no traffic sent to it.
   302  			if !ep.IsServing() {
   303  				continue
   304  			}
   306  			deleted := true
   307  			// Check if the endpoint has changed, including if it went from
   308  			// serving to not serving. If it did change stale entries for the old
   309  			// endpoint have to be cleared.
   310  			for i := range newEndpointsMap[svcPortName] {
   311  				if newEndpointsMap[svcPortName][i].String() == ep.String() {
   312  					deleted = false
   313  					break
   314  				}
   315  			}
   316  			if deleted {
   317  				klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
   318  				*deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
   319  			}
   320  		}
   321  	}
   323  	// Detect services that have gone from 0 to non-0 ready endpoints. If there were
   324  	// previously 0 endpoints, but someone tried to connect to it, then a conntrack
   325  	// entry may have been created blackholing traffic to that IP, which should be
   326  	// deleted now.
   327  	for svcPortName, epList := range newEndpointsMap {
   328  		if svcPortName.Protocol != v1.ProtocolUDP {
   329  			continue
   330  		}
   332  		epServing := 0
   333  		for _, ep := range epList {
   334  			if ep.IsServing() {
   335  				epServing++
   336  			}
   337  		}
   339  		oldEpServing := 0
   340  		for _, ep := range oldEndpointsMap[svcPortName] {
   341  			if ep.IsServing() {
   342  				oldEpServing++
   343  			}
   344  		}
   346  		if epServing > 0 && oldEpServing == 0 {
   347  			*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
   348  		}
   349  	}
   350  }

View as plain text