/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package proxy import ( "reflect" "sync" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" ) // ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of // Services, keyed by their namespace and name. type ServiceChangeTracker struct { // lock protects items. lock sync.Mutex // items maps a service to its serviceChange. items map[types.NamespacedName]*serviceChange // makeServiceInfo allows the proxier to inject customized information when // processing services. makeServiceInfo makeServicePortFunc // processServiceMapChange is invoked by the apply function on every change. This // function should not modify the ServicePortMaps, but just use the changes for // any Proxier-specific cleanup. processServiceMapChange processServiceMapChangeFunc ipFamily v1.IPFamily recorder events.EventRecorder } type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort type processServiceMapChangeFunc func(previous, current ServicePortMap) // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying all of the changes. type serviceChange struct { previous ServicePortMap current ServicePortMap } // NewServiceChangeTracker initializes a ServiceChangeTracker func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { return &ServiceChangeTracker{ items: make(map[types.NamespacedName]*serviceChange), makeServiceInfo: makeServiceInfo, recorder: recorder, ipFamily: ipFamily, processServiceMapChange: processServiceMapChange, } } // Update updates the ServiceChangeTracker based on the service pair // (where either previous or current, but not both, can be nil). It returns true if sct // contains changes that need to be synced (whether or not those changes were caused by // this update); note that this is different from the return value of // EndpointChangeTracker.EndpointSliceUpdate(). func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { // This is unexpected, we should return false directly. if previous == nil && current == nil { return false } svc := current if svc == nil { svc = previous } metrics.ServiceChangesTotal.Inc() namespacedName := types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name} sct.lock.Lock() defer sct.lock.Unlock() change, exists := sct.items[namespacedName] if !exists { change = &serviceChange{} change.previous = sct.serviceToServiceMap(previous) sct.items[namespacedName] = change } change.current = sct.serviceToServiceMap(current) // if change.previous equal to change.current, it means no change if reflect.DeepEqual(change.previous, change.current) { delete(sct.items, namespacedName) } else { klog.V(4).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current)) } metrics.ServiceChangesPending.Set(float64(len(sct.items))) return len(sct.items) > 0 } // ServicePortMap maps a service to its ServicePort. type ServicePortMap map[ServicePortName]ServicePort // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { // UpdatedServices lists the names of all services added/updated/deleted since the // last Update. UpdatedServices sets.Set[types.NamespacedName] // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs // that had UDP ports. Callers can use this to abort timeout-waits or clear // connection-tracking information. DeletedUDPClusterIPs sets.Set[string] } // HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values // for all Services in sm with non-zero HealthCheckNodePort. func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 { // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to svcPortMap. ports := make(map[types.NamespacedName]uint16) for svcPortName, info := range sm { if info.HealthCheckNodePort() != 0 { ports[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) } } return ports } // serviceToServiceMap translates a single Service object to a ServicePortMap. // // NOTE: service object should NOT be modified. func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) ServicePortMap { if service == nil { return nil } if proxyutil.ShouldSkipService(service) { return nil } clusterIP := proxyutil.GetClusterIPByFamily(sct.ipFamily, service) if clusterIP == "" { return nil } svcPortMap := make(ServicePortMap) svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} baseSvcInfo := newBaseServiceInfo(service, sct.ipFamily, servicePort) if sct.makeServiceInfo != nil { svcPortMap[svcPortName] = sct.makeServiceInfo(servicePort, service, baseSvcInfo) } else { svcPortMap[svcPortName] = baseSvcInfo } } return svcPortMap } // Update updates ServicePortMap base on the given changes, returns information about the // diff since the last Update, triggers processServiceMapChange on every change, and // clears the changes map. func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult { sct.lock.Lock() defer sct.lock.Unlock() result := UpdateServiceMapResult{ UpdatedServices: sets.New[types.NamespacedName](), DeletedUDPClusterIPs: sets.New[string](), } for nn, change := range sct.items { if sct.processServiceMapChange != nil { sct.processServiceMapChange(change.previous, change.current) } result.UpdatedServices.Insert(nn) sm.merge(change.current) // filter out the Update event of current changes from previous changes // before calling unmerge() so that can skip deleting the Update events. change.previous.filter(change.current) sm.unmerge(change.previous, result.DeletedUDPClusterIPs) } // clear changes after applying them to ServicePortMap. sct.items = make(map[types.NamespacedName]*serviceChange) metrics.ServiceChangesPending.Set(0) return result } // merge adds other ServicePortMap's elements to current ServicePortMap. // If collision, other ALWAYS win. Otherwise add the other to current. // In other words, if some elements in current collisions with other, update the current by other. func (sm *ServicePortMap) merge(other ServicePortMap) { for svcPortName, info := range other { _, exists := (*sm)[svcPortName] if !exists { klog.V(4).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info) } else { klog.V(4).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info) } (*sm)[svcPortName] = info } } // filter filters out elements from ServicePortMap base on given ports string sets. func (sm *ServicePortMap) filter(other ServicePortMap) { for svcPortName := range *sm { // skip the delete for Update event. if _, ok := other[svcPortName]; ok { delete(*sm, svcPortName) } } } // unmerge deletes all other ServicePortMap's elements from current ServicePortMap and // updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs. func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.Set[string]) { for svcPortName := range other { info, exists := (*sm)[svcPortName] if exists { klog.V(4).InfoS("Removing service port", "portName", svcPortName) if info.Protocol() == v1.ProtocolUDP { deletedUDPClusterIPs.Insert(info.ClusterIP().String()) } delete(*sm, svcPortName) } else { klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName) } } }