...

Source file src/k8s.io/kubernetes/pkg/proxy/servicechangetracker.go

Documentation: k8s.io/kubernetes/pkg/proxy

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package proxy
    18  
    19  import (
    20  	"reflect"
    21  	"sync"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	"k8s.io/apimachinery/pkg/types"
    25  	"k8s.io/apimachinery/pkg/util/sets"
    26  	"k8s.io/client-go/tools/events"
    27  	"k8s.io/klog/v2"
    28  	"k8s.io/kubernetes/pkg/proxy/metrics"
    29  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    30  )
    31  
    32  // ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of
    33  // Services, keyed by their namespace and name.
    34  type ServiceChangeTracker struct {
    35  	// lock protects items.
    36  	lock sync.Mutex
    37  	// items maps a service to its serviceChange.
    38  	items map[types.NamespacedName]*serviceChange
    39  
    40  	// makeServiceInfo allows the proxier to inject customized information when
    41  	// processing services.
    42  	makeServiceInfo makeServicePortFunc
    43  	// processServiceMapChange is invoked by the apply function on every change. This
    44  	// function should not modify the ServicePortMaps, but just use the changes for
    45  	// any Proxier-specific cleanup.
    46  	processServiceMapChange processServiceMapChangeFunc
    47  
    48  	ipFamily v1.IPFamily
    49  	recorder events.EventRecorder
    50  }
    51  
    52  type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
    53  type processServiceMapChangeFunc func(previous, current ServicePortMap)
    54  
    55  // serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,
    56  // changes are accumulated, i.e. previous is state from before applying the changes,
    57  // current is state after applying all of the changes.
    58  type serviceChange struct {
    59  	previous ServicePortMap
    60  	current  ServicePortMap
    61  }
    62  
    63  // NewServiceChangeTracker initializes a ServiceChangeTracker
    64  func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
    65  	return &ServiceChangeTracker{
    66  		items:                   make(map[types.NamespacedName]*serviceChange),
    67  		makeServiceInfo:         makeServiceInfo,
    68  		recorder:                recorder,
    69  		ipFamily:                ipFamily,
    70  		processServiceMapChange: processServiceMapChange,
    71  	}
    72  }
    73  
    74  // Update updates the ServiceChangeTracker based on the <previous, current> service pair
    75  // (where either previous or current, but not both, can be nil). It returns true if sct
    76  // contains changes that need to be synced (whether or not those changes were caused by
    77  // this update); note that this is different from the return value of
    78  // EndpointChangeTracker.EndpointSliceUpdate().
    79  func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
    80  	// This is unexpected, we should return false directly.
    81  	if previous == nil && current == nil {
    82  		return false
    83  	}
    84  
    85  	svc := current
    86  	if svc == nil {
    87  		svc = previous
    88  	}
    89  	metrics.ServiceChangesTotal.Inc()
    90  	namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}
    91  
    92  	sct.lock.Lock()
    93  	defer sct.lock.Unlock()
    94  
    95  	change, exists := sct.items[namespacedName]
    96  	if !exists {
    97  		change = &serviceChange{}
    98  		change.previous = sct.serviceToServiceMap(previous)
    99  		sct.items[namespacedName] = change
   100  	}
   101  	change.current = sct.serviceToServiceMap(current)
   102  	// if change.previous equal to change.current, it means no change
   103  	if reflect.DeepEqual(change.previous, change.current) {
   104  		delete(sct.items, namespacedName)
   105  	} else {
   106  		klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current))
   107  	}
   108  	metrics.ServiceChangesPending.Set(float64(len(sct.items)))
   109  	return len(sct.items) > 0
   110  }
   111  
   112  // ServicePortMap maps a service to its ServicePort.
   113  type ServicePortMap map[ServicePortName]ServicePort
   114  
   115  // UpdateServiceMapResult is the updated results after applying service changes.
   116  type UpdateServiceMapResult struct {
   117  	// UpdatedServices lists the names of all services added/updated/deleted since the
   118  	// last Update.
   119  	UpdatedServices sets.Set[types.NamespacedName]
   120  
   121  	// DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
   122  	// that had UDP ports. Callers can use this to abort timeout-waits or clear
   123  	// connection-tracking information.
   124  	DeletedUDPClusterIPs sets.Set[string]
   125  }
   126  
   127  // HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
   128  // for all Services in sm with non-zero HealthCheckNodePort.
   129  func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
   130  	// TODO: If this will appear to be computationally expensive, consider
   131  	// computing this incrementally similarly to svcPortMap.
   132  	ports := make(map[types.NamespacedName]uint16)
   133  	for svcPortName, info := range sm {
   134  		if info.HealthCheckNodePort() != 0 {
   135  			ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
   136  		}
   137  	}
   138  	return ports
   139  }
   140  
   141  // serviceToServiceMap translates a single Service object to a ServicePortMap.
   142  //
   143  // NOTE: service object should NOT be modified.
   144  func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap {
   145  	if service == nil {
   146  		return nil
   147  	}
   148  
   149  	if proxyutil.ShouldSkipService(service) {
   150  		return nil
   151  	}
   152  
   153  	clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service)
   154  	if clusterIP == "" {
   155  		return nil
   156  	}
   157  
   158  	svcPortMap := make(ServicePortMap)
   159  	svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
   160  	for i := range service.Spec.Ports {
   161  		servicePort := &service.Spec.Ports[i]
   162  		svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol}
   163  		baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort)
   164  		if sct.makeServiceInfo != nil {
   165  			svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo)
   166  		} else {
   167  			svcPortMap[svcPortName] = baseSvcInfo
   168  		}
   169  	}
   170  	return svcPortMap
   171  }
   172  
   173  // Update updates ServicePortMap base on the given changes, returns information about the
   174  // diff since the last Update, triggers processServiceMapChange on every change, and
   175  // clears the changes map.
   176  func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
   177  	sct.lock.Lock()
   178  	defer sct.lock.Unlock()
   179  
   180  	result := UpdateServiceMapResult{
   181  		UpdatedServices:      sets.New[types.NamespacedName](),
   182  		DeletedUDPClusterIPs: sets.New[string](),
   183  	}
   184  
   185  	for nn, change := range sct.items {
   186  		if sct.processServiceMapChange != nil {
   187  			sct.processServiceMapChange(change.previous, change.current)
   188  		}
   189  		result.UpdatedServices.Insert(nn)
   190  
   191  		sm.merge(change.current)
   192  		// filter out the Update event of current changes from previous changes
   193  		// before calling unmerge() so that can skip deleting the Update events.
   194  		change.previous.filter(change.current)
   195  		sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
   196  	}
   197  	// clear changes after applying them to ServicePortMap.
   198  	sct.items = make(map[types.NamespacedName]*serviceChange)
   199  	metrics.ServiceChangesPending.Set(0)
   200  
   201  	return result
   202  }
   203  
   204  // merge adds other ServicePortMap's elements to current ServicePortMap.
   205  // If collision, other ALWAYS win. Otherwise add the other to current.
   206  // In other words, if some elements in current collisions with other, update the current by other.
   207  func (sm *ServicePortMap) merge(other ServicePortMap) {
   208  	for svcPortName, info := range other {
   209  		_, exists := (*sm)[svcPortName]
   210  		if !exists {
   211  			klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info)
   212  		} else {
   213  			klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info)
   214  		}
   215  		(*sm)[svcPortName] = info
   216  	}
   217  }
   218  
   219  // filter filters out elements from ServicePortMap base on given ports string sets.
   220  func (sm *ServicePortMap) filter(other ServicePortMap) {
   221  	for svcPortName := range *sm {
   222  		// skip the delete for Update event.
   223  		if _, ok := other[svcPortName]; ok {
   224  			delete(*sm, svcPortName)
   225  		}
   226  	}
   227  }
   228  
   229  // unmerge deletes all other ServicePortMap's elements from current ServicePortMap and
   230  // updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs.
   231  func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) {
   232  	for svcPortName := range other {
   233  		info, exists := (*sm)[svcPortName]
   234  		if exists {
   235  			klog.V(4).InfoS("Removing service port", "portName", svcPortName)
   236  			if info.Protocol() == v1.ProtocolUDP {
   237  				deletedUDPClusterIPs.Insert(info.ClusterIP().String())
   238  			}
   239  			delete(*sm, svcPortName)
   240  		} else {
   241  			klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName)
   242  		}
   243  	}
   244  }
   245  

View as plain text