...

Source file src/k8s.io/kubernetes/pkg/proxy/winkernel/proxier.go

Documentation: k8s.io/kubernetes/pkg/proxy/winkernel

     1  //go:build windows
     2  // +build windows
     3  
     4  /*
     5  Copyright 2017 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package winkernel
    21  
    22  import (
    23  	"fmt"
    24  	"net"
    25  	"os"
    26  	"strconv"
    27  	"strings"
    28  	"sync"
    29  	"sync/atomic"
    30  	"time"
    31  
    32  	"github.com/Microsoft/hcsshim"
    33  	"github.com/Microsoft/hcsshim/hcn"
    34  	v1 "k8s.io/api/core/v1"
    35  	discovery "k8s.io/api/discovery/v1"
    36  	"k8s.io/apimachinery/pkg/util/intstr"
    37  	apiutil "k8s.io/apimachinery/pkg/util/net"
    38  	"k8s.io/apimachinery/pkg/util/sets"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    41  	"k8s.io/client-go/tools/events"
    42  	"k8s.io/klog/v2"
    43  	kubefeatures "k8s.io/kubernetes/pkg/features"
    44  	"k8s.io/kubernetes/pkg/proxy"
    45  	"k8s.io/kubernetes/pkg/proxy/apis/config"
    46  	proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
    47  	"k8s.io/kubernetes/pkg/proxy/healthcheck"
    48  	"k8s.io/kubernetes/pkg/proxy/metaproxier"
    49  	"k8s.io/kubernetes/pkg/proxy/metrics"
    50  	proxyutil "k8s.io/kubernetes/pkg/proxy/util"
    51  	"k8s.io/kubernetes/pkg/util/async"
    52  	netutils "k8s.io/utils/net"
    53  )
    54  
    55  // KernelCompatTester tests whether the required kernel capabilities are
    56  // present to run the windows kernel proxier.
    57  type KernelCompatTester interface {
    58  	IsCompatible() error
    59  }
    60  
    61  // CanUseWinKernelProxier returns true if we should use the Kernel Proxier
    62  // instead of the "classic" userspace Proxier.  This is determined by checking
    63  // the windows kernel version and for the existence of kernel features.
    64  func CanUseWinKernelProxier(kcompat KernelCompatTester) (bool, error) {
    65  	// Check that the kernel supports what we need.
    66  	if err := kcompat.IsCompatible(); err != nil {
    67  		return false, err
    68  	}
    69  	return true, nil
    70  }
    71  
    72  type WindowsKernelCompatTester struct{}
    73  
    74  // IsCompatible returns true if winkernel can support this mode of proxy
    75  func (lkct WindowsKernelCompatTester) IsCompatible() error {
    76  	_, err := hcsshim.HNSListPolicyListRequest()
    77  	if err != nil {
    78  		return fmt.Errorf("Windows kernel is not compatible for Kernel mode")
    79  	}
    80  	return nil
    81  }
    82  
    83  type externalIPInfo struct {
    84  	ip    string
    85  	hnsID string
    86  }
    87  
    88  type loadBalancerIngressInfo struct {
    89  	ip               string
    90  	hnsID            string
    91  	healthCheckHnsID string
    92  }
    93  
    94  type loadBalancerInfo struct {
    95  	hnsID string
    96  }
    97  
    98  type loadBalancerIdentifier struct {
    99  	protocol      uint16
   100  	internalPort  uint16
   101  	externalPort  uint16
   102  	vip           string
   103  	endpointsHash [20]byte
   104  }
   105  
   106  type loadBalancerFlags struct {
   107  	isILB           bool
   108  	isDSR           bool
   109  	isVipExternalIP bool
   110  	localRoutedVIP  bool
   111  	useMUX          bool
   112  	preserveDIP     bool
   113  	sessionAffinity bool
   114  	isIPv6          bool
   115  }
   116  
   117  // internal struct for string service information
   118  type serviceInfo struct {
   119  	*proxy.BaseServicePortInfo
   120  	targetPort             int
   121  	externalIPs            []*externalIPInfo
   122  	loadBalancerIngressIPs []*loadBalancerIngressInfo
   123  	hnsID                  string
   124  	nodePorthnsID          string
   125  	policyApplied          bool
   126  	remoteEndpoint         *endpointInfo
   127  	hns                    HostNetworkService
   128  	preserveDIP            bool
   129  	localTrafficDSR        bool
   130  	internalTrafficLocal   bool
   131  	winProxyOptimization   bool
   132  }
   133  
   134  type hnsNetworkInfo struct {
   135  	name          string
   136  	id            string
   137  	networkType   string
   138  	remoteSubnets []*remoteSubnetInfo
   139  }
   140  
   141  type remoteSubnetInfo struct {
   142  	destinationPrefix string
   143  	isolationID       uint16
   144  	providerAddress   string
   145  	drMacAddress      string
   146  }
   147  
   148  const (
   149  	NETWORK_TYPE_OVERLAY = "overlay"
   150  	// MAX_COUNT_STALE_LOADBALANCERS is the maximum number of stale loadbalancers which cleanedup in single syncproxyrules.
   151  	// If there are more stale loadbalancers to clean, it will go to next iteration of syncproxyrules.
   152  	MAX_COUNT_STALE_LOADBALANCERS = 20
   153  )
   154  
   155  func newHostNetworkService(hcnImpl HcnService) (HostNetworkService, hcn.SupportedFeatures) {
   156  	var h HostNetworkService
   157  	supportedFeatures := hcnImpl.GetSupportedFeatures()
   158  	if supportedFeatures.Api.V2 {
   159  		h = hns{
   160  			hcn: hcnImpl,
   161  		}
   162  	} else {
   163  		panic("Windows HNS Api V2 required. This version of windows does not support API V2")
   164  	}
   165  	return h, supportedFeatures
   166  }
   167  
   168  // logFormattedEndpoints will log all endpoints and its states which are taking part in endpointmap change.
   169  // This mostly for debugging purpose and verbosity is set to 5.
   170  func logFormattedEndpoints(logMsg string, logLevel klog.Level, svcPortName proxy.ServicePortName, eps []proxy.Endpoint) {
   171  	if klog.V(logLevel).Enabled() {
   172  		var epInfo string
   173  		for _, v := range eps {
   174  			epInfo = epInfo + fmt.Sprintf("\n  %s={Ready:%v,Serving:%v,Terminating:%v,IsRemote:%v}", v.String(), v.IsReady(), v.IsServing(), v.IsTerminating(), !v.IsLocal())
   175  		}
   176  		klog.V(logLevel).InfoS(logMsg, "svcPortName", svcPortName, "endpoints", epInfo)
   177  	}
   178  }
   179  
   180  // This will cleanup stale load balancers which are pending delete
   181  // in last iteration. This function will act like a self healing of stale
   182  // loadbalancer entries.
   183  func (proxier *Proxier) cleanupStaleLoadbalancers() {
   184  	i := 0
   185  	countStaleLB := len(proxier.mapStaleLoadbalancers)
   186  	if countStaleLB == 0 {
   187  		return
   188  	}
   189  	klog.V(3).InfoS("Cleanup of stale loadbalancers triggered", "LB Count", countStaleLB)
   190  	for lbID := range proxier.mapStaleLoadbalancers {
   191  		i++
   192  		if err := proxier.hns.deleteLoadBalancer(lbID); err == nil {
   193  			delete(proxier.mapStaleLoadbalancers, lbID)
   194  		}
   195  		if i == MAX_COUNT_STALE_LOADBALANCERS {
   196  			// The remaining stale loadbalancers will be cleaned up in next iteration
   197  			break
   198  		}
   199  	}
   200  	countStaleLB = len(proxier.mapStaleLoadbalancers)
   201  	if countStaleLB > 0 {
   202  		klog.V(3).InfoS("Stale loadbalancers still remaining", "LB Count", countStaleLB, "stale_lb_ids", proxier.mapStaleLoadbalancers)
   203  	}
   204  }
   205  
   206  func getNetworkName(hnsNetworkName string) (string, error) {
   207  	if len(hnsNetworkName) == 0 {
   208  		klog.V(3).InfoS("Flag --network-name not set, checking environment variable")
   209  		hnsNetworkName = os.Getenv("KUBE_NETWORK")
   210  		if len(hnsNetworkName) == 0 {
   211  			return "", fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
   212  		}
   213  	}
   214  	return hnsNetworkName, nil
   215  }
   216  
   217  func getNetworkInfo(hns HostNetworkService, hnsNetworkName string) (*hnsNetworkInfo, error) {
   218  	hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
   219  	for err != nil {
   220  		klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
   221  		time.Sleep(1 * time.Second)
   222  		hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
   223  	}
   224  	return hnsNetworkInfo, err
   225  }
   226  
   227  func isOverlay(hnsNetworkInfo *hnsNetworkInfo) bool {
   228  	return strings.EqualFold(hnsNetworkInfo.networkType, NETWORK_TYPE_OVERLAY)
   229  }
   230  
   231  // StackCompatTester tests whether the required kernel and network are dualstack capable
   232  type StackCompatTester interface {
   233  	DualStackCompatible(networkName string) bool
   234  }
   235  
   236  type DualStackCompatTester struct{}
   237  
   238  func (t DualStackCompatTester) DualStackCompatible(networkName string) bool {
   239  	hcnImpl := newHcnImpl()
   240  	// First tag of hcsshim that has a proper check for dual stack support is v0.8.22 due to a bug.
   241  	if err := hcnImpl.Ipv6DualStackSupported(); err != nil {
   242  		// Hcn *can* fail the query to grab the version of hcn itself (which this call will do internally before parsing
   243  		// to see if dual stack is supported), but the only time this can happen, at least that can be discerned, is if the host
   244  		// is pre-1803 and hcn didn't exist. hcsshim should truthfully return a known error if this happened that we can
   245  		// check against, and the case where 'err != this known error' would be the 'this feature isn't supported' case, as is being
   246  		// used here. For now, seeming as how nothing before ws2019 (1809) is listed as supported for k8s we can pretty much assume
   247  		// any error here isn't because the query failed, it's just that dualstack simply isn't supported on the host. With all
   248  		// that in mind, just log as info and not error to let the user know we're falling back.
   249  		klog.InfoS("This version of Windows does not support dual-stack, falling back to single-stack", "err", err.Error())
   250  		return false
   251  	}
   252  
   253  	// check if network is using overlay
   254  	hns, _ := newHostNetworkService(hcnImpl)
   255  	networkName, err := getNetworkName(networkName)
   256  	if err != nil {
   257  		klog.ErrorS(err, "Unable to determine dual-stack status, falling back to single-stack")
   258  		return false
   259  	}
   260  	networkInfo, err := getNetworkInfo(hns, networkName)
   261  	if err != nil {
   262  		klog.ErrorS(err, "Unable to determine dual-stack status, falling back to single-stack")
   263  		return false
   264  	}
   265  
   266  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) && isOverlay(networkInfo) {
   267  		// Overlay (VXLAN) networks on Windows do not support dual-stack networking today
   268  		klog.InfoS("Winoverlay does not support dual-stack, falling back to single-stack")
   269  		return false
   270  	}
   271  
   272  	return true
   273  }
   274  
   275  // internal struct for endpoints information
   276  type endpointInfo struct {
   277  	ip              string
   278  	port            uint16
   279  	isLocal         bool
   280  	macAddress      string
   281  	hnsID           string
   282  	refCount        *uint16
   283  	providerAddress string
   284  	hns             HostNetworkService
   285  
   286  	// conditions
   287  	ready       bool
   288  	serving     bool
   289  	terminating bool
   290  }
   291  
   292  // String is part of proxy.Endpoint interface.
   293  func (info *endpointInfo) String() string {
   294  	return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port)))
   295  }
   296  
   297  // IsLocal is part of proxy.Endpoint interface.
   298  func (info *endpointInfo) IsLocal() bool {
   299  	return info.isLocal
   300  }
   301  
   302  // IsReady returns true if an endpoint is ready and not terminating.
   303  func (info *endpointInfo) IsReady() bool {
   304  	return info.ready
   305  }
   306  
   307  // IsServing returns true if an endpoint is ready, regardless of it's terminating state.
   308  func (info *endpointInfo) IsServing() bool {
   309  	return info.serving
   310  }
   311  
   312  // IsTerminating returns true if an endpoint is terminating.
   313  func (info *endpointInfo) IsTerminating() bool {
   314  	return info.terminating
   315  }
   316  
   317  // ZoneHints returns the zone hints for the endpoint.
   318  func (info *endpointInfo) ZoneHints() sets.Set[string] {
   319  	return sets.Set[string]{}
   320  }
   321  
   322  // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
   323  func (info *endpointInfo) IP() string {
   324  	return info.ip
   325  }
   326  
   327  // Port returns just the Port part of the endpoint.
   328  func (info *endpointInfo) Port() int {
   329  	return int(info.port)
   330  }
   331  
   332  // Uses mac prefix and IPv4 address to return a mac address
   333  // This ensures mac addresses are unique for proper load balancing
   334  // There is a possibility of MAC collisions but this Mac address is used for remote endpoints only
   335  // and not sent on the wire.
   336  func conjureMac(macPrefix string, ip net.IP) string {
   337  	if ip4 := ip.To4(); ip4 != nil {
   338  		a, b, c, d := ip4[0], ip4[1], ip4[2], ip4[3]
   339  		return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
   340  	} else if ip6 := ip.To16(); ip6 != nil {
   341  		a, b, c, d := ip6[15], ip6[14], ip6[13], ip6[12]
   342  		return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
   343  	}
   344  	return "02-11-22-33-44-55"
   345  }
   346  
   347  func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) {
   348  	// This will optimize remote endpoint and loadbalancer deletion based on the annotation
   349  	var svcPortMap = make(map[proxy.ServicePortName]bool)
   350  	var logLevel klog.Level = 5
   351  	for svcPortName, eps := range oldEndpointsMap {
   352  		logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps)
   353  		svcPortMap[svcPortName] = true
   354  		proxier.onEndpointsMapChange(&svcPortName, false)
   355  	}
   356  
   357  	for svcPortName, eps := range newEndpointsMap {
   358  		logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps)
   359  		// redundantCleanup true means cleanup is called second time on the same svcPort
   360  		redundantCleanup := svcPortMap[svcPortName]
   361  		proxier.onEndpointsMapChange(&svcPortName, redundantCleanup)
   362  	}
   363  }
   364  
   365  func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, redundantCleanup bool) {
   366  
   367  	svc, exists := proxier.svcPortMap[*svcPortName]
   368  
   369  	if exists {
   370  		svcInfo, ok := svc.(*serviceInfo)
   371  
   372  		if !ok {
   373  			klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
   374  			return
   375  		}
   376  
   377  		if svcInfo.winProxyOptimization && redundantCleanup {
   378  			// This is a second cleanup call.
   379  			// Second cleanup on the same svcPort will be ignored if the
   380  			// winProxyOptimization is Enabled
   381  			return
   382  		}
   383  
   384  		klog.V(3).InfoS("Endpoints are modified. Service is stale", "servicePortName", svcPortName)
   385  		svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, true)
   386  	} else {
   387  		// If no service exists, just cleanup the remote endpoints
   388  		klog.V(3).InfoS("Endpoints are orphaned, cleaning up")
   389  		// Cleanup Endpoints references
   390  		epInfos, exists := proxier.endpointsMap[*svcPortName]
   391  
   392  		if exists {
   393  			// Cleanup Endpoints references
   394  			for _, ep := range epInfos {
   395  				epInfo, ok := ep.(*endpointInfo)
   396  
   397  				if ok {
   398  					epInfo.Cleanup()
   399  				}
   400  
   401  			}
   402  		}
   403  	}
   404  }
   405  
   406  func (proxier *Proxier) serviceMapChange(previous, current proxy.ServicePortMap) {
   407  	for svcPortName := range current {
   408  		proxier.onServiceMapChange(&svcPortName)
   409  	}
   410  
   411  	for svcPortName := range previous {
   412  		if _, ok := current[svcPortName]; ok {
   413  			continue
   414  		}
   415  		proxier.onServiceMapChange(&svcPortName)
   416  	}
   417  }
   418  
   419  func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
   420  
   421  	svc, exists := proxier.svcPortMap[*svcPortName]
   422  
   423  	if exists {
   424  		svcInfo, ok := svc.(*serviceInfo)
   425  
   426  		if !ok {
   427  			klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
   428  			return
   429  		}
   430  
   431  		klog.V(3).InfoS("Updating existing service port", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol())
   432  		svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, false)
   433  	}
   434  }
   435  
   436  // returns a new proxy.Endpoint which abstracts a endpointInfo
   437  func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
   438  
   439  	info := &endpointInfo{
   440  		ip:         baseInfo.IP(),
   441  		port:       uint16(baseInfo.Port()),
   442  		isLocal:    baseInfo.IsLocal(),
   443  		macAddress: conjureMac("02-11", netutils.ParseIPSloppy(baseInfo.IP())),
   444  		refCount:   new(uint16),
   445  		hnsID:      "",
   446  		hns:        proxier.hns,
   447  
   448  		ready:       baseInfo.IsReady(),
   449  		serving:     baseInfo.IsServing(),
   450  		terminating: baseInfo.IsTerminating(),
   451  	}
   452  
   453  	return info
   454  }
   455  
   456  func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) {
   457  	hnsEndpoint := &endpointInfo{
   458  		ip:              ip,
   459  		isLocal:         true,
   460  		macAddress:      mac,
   461  		providerAddress: providerAddress,
   462  
   463  		ready:       true,
   464  		serving:     true,
   465  		terminating: false,
   466  	}
   467  	ep, err := hns.createEndpoint(hnsEndpoint, network)
   468  	return ep, err
   469  }
   470  
   471  func (ep *endpointInfo) DecrementRefCount() {
   472  	klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep)
   473  	if !ep.IsLocal() && ep.refCount != nil && *ep.refCount > 0 {
   474  		*ep.refCount--
   475  	}
   476  }
   477  
   478  func (ep *endpointInfo) Cleanup() {
   479  	klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep)
   480  	if !ep.IsLocal() && ep.refCount != nil {
   481  		*ep.refCount--
   482  
   483  		// Remove the remote hns endpoint, if no service is referring it
   484  		// Never delete a Local Endpoint. Local Endpoints are already created by other entities.
   485  		// Remove only remote endpoints created by this service
   486  		if *ep.refCount <= 0 && !ep.IsLocal() {
   487  			klog.V(4).InfoS("Removing endpoints, since no one is referencing it", "endpoint", ep)
   488  			err := ep.hns.deleteEndpoint(ep.hnsID)
   489  			if err == nil {
   490  				ep.hnsID = ""
   491  			} else {
   492  				klog.ErrorS(err, "Endpoint deletion failed", "ip", ep.IP())
   493  			}
   494  		}
   495  
   496  		ep.refCount = nil
   497  	}
   498  }
   499  
   500  func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 {
   501  	refCount, exists := refCountMap[hnsID]
   502  	if !exists {
   503  		refCountMap[hnsID] = new(uint16)
   504  		refCount = refCountMap[hnsID]
   505  	}
   506  	return refCount
   507  }
   508  
   509  // returns a new proxy.ServicePort which abstracts a serviceInfo
   510  func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
   511  	info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo}
   512  	preserveDIP := service.Annotations["preserve-destination"] == "true"
   513  	// Annotation introduced to enable optimized loadbalancing
   514  	winProxyOptimization := !(strings.ToUpper(service.Annotations["winProxyOptimization"]) == "DISABLED")
   515  	localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal
   516  	var internalTrafficLocal bool
   517  	if service.Spec.InternalTrafficPolicy != nil {
   518  		internalTrafficLocal = *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal
   519  	}
   520  	hcnImpl := proxier.hcn
   521  	err := hcnImpl.DsrSupported()
   522  	if err != nil {
   523  		preserveDIP = false
   524  		localTrafficDSR = false
   525  	}
   526  	// targetPort is zero if it is specified as a name in port.TargetPort.
   527  	// Its real value would be got later from endpoints.
   528  	targetPort := 0
   529  	if port.TargetPort.Type == intstr.Int {
   530  		targetPort = port.TargetPort.IntValue()
   531  	}
   532  
   533  	info.preserveDIP = preserveDIP
   534  	info.targetPort = targetPort
   535  	info.hns = proxier.hns
   536  	info.localTrafficDSR = localTrafficDSR
   537  	info.internalTrafficLocal = internalTrafficLocal
   538  	info.winProxyOptimization = winProxyOptimization
   539  	klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "internalTrafficLocal", internalTrafficLocal, "preserveDIP", preserveDIP, "winProxyOptimization", winProxyOptimization)
   540  
   541  	for _, eip := range service.Spec.ExternalIPs {
   542  		info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
   543  	}
   544  
   545  	for _, ingress := range service.Status.LoadBalancer.Ingress {
   546  		if netutils.ParseIPSloppy(ingress.IP) != nil {
   547  			info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
   548  		}
   549  	}
   550  	return info
   551  }
   552  
   553  func (network hnsNetworkInfo) findRemoteSubnetProviderAddress(ip string) string {
   554  	var providerAddress string
   555  	for _, rs := range network.remoteSubnets {
   556  		_, ipNet, err := netutils.ParseCIDRSloppy(rs.destinationPrefix)
   557  		if err != nil {
   558  			klog.ErrorS(err, "Failed to parse CIDR")
   559  		}
   560  		if ipNet.Contains(netutils.ParseIPSloppy(ip)) {
   561  			providerAddress = rs.providerAddress
   562  		}
   563  		if ip == rs.providerAddress {
   564  			providerAddress = rs.providerAddress
   565  		}
   566  	}
   567  
   568  	return providerAddress
   569  }
   570  
   571  type endPointsReferenceCountMap map[string]*uint16
   572  
   573  // Proxier is an hns based proxy for connections between a localhost:lport
   574  // and services that provide the actual backends.
   575  type Proxier struct {
   576  	// ipFamily defines the IP family which this proxier is tracking.
   577  	ipFamily v1.IPFamily
   578  	// TODO(imroc): implement node handler for winkernel proxier.
   579  	proxyconfig.NoopNodeHandler
   580  
   581  	// endpointsChanges and serviceChanges contains all changes to endpoints and
   582  	// services that happened since policies were synced. For a single object,
   583  	// changes are accumulated, i.e. previous is state from before all of them,
   584  	// current is state after applying all of those.
   585  	endpointsChanges  *proxy.EndpointsChangeTracker
   586  	serviceChanges    *proxy.ServiceChangeTracker
   587  	endPointsRefCount endPointsReferenceCountMap
   588  	mu                sync.Mutex // protects the following fields
   589  	svcPortMap        proxy.ServicePortMap
   590  	endpointsMap      proxy.EndpointsMap
   591  	// endpointSlicesSynced and servicesSynced are set to true when corresponding
   592  	// objects are synced after startup. This is used to avoid updating hns policies
   593  	// with some partial data after kube-proxy restart.
   594  	endpointSlicesSynced bool
   595  	servicesSynced       bool
   596  	initialized          int32
   597  	syncRunner           *async.BoundedFrequencyRunner // governs calls to syncProxyRules
   598  	// These are effectively const and do not need the mutex to be held.
   599  	hostname string
   600  	nodeIP   net.IP
   601  	recorder events.EventRecorder
   602  
   603  	serviceHealthServer healthcheck.ServiceHealthServer
   604  	healthzServer       *healthcheck.ProxierHealthServer
   605  
   606  	hns               HostNetworkService
   607  	hcn               HcnService
   608  	network           hnsNetworkInfo
   609  	sourceVip         string
   610  	hostMac           string
   611  	isDSR             bool
   612  	supportedFeatures hcn.SupportedFeatures
   613  	healthzPort       int
   614  
   615  	forwardHealthCheckVip bool
   616  	rootHnsEndpointName   string
   617  	mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration
   618  }
   619  
   620  type localPort struct {
   621  	desc     string
   622  	ip       string
   623  	port     int
   624  	protocol string
   625  }
   626  
   627  func (lp *localPort) String() string {
   628  	return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
   629  }
   630  
   631  func Enum(p v1.Protocol) uint16 {
   632  	if p == v1.ProtocolTCP {
   633  		return 6
   634  	}
   635  	if p == v1.ProtocolUDP {
   636  		return 17
   637  	}
   638  	if p == v1.ProtocolSCTP {
   639  		return 132
   640  	}
   641  	return 0
   642  }
   643  
   644  type closeable interface {
   645  	Close() error
   646  }
   647  
   648  // Proxier implements proxy.Provider
   649  var _ proxy.Provider = &Proxier{}
   650  
   651  // NewProxier returns a new Proxier
   652  func NewProxier(
   653  	ipFamily v1.IPFamily,
   654  	syncPeriod time.Duration,
   655  	minSyncPeriod time.Duration,
   656  	hostname string,
   657  	nodeIP net.IP,
   658  	recorder events.EventRecorder,
   659  	healthzServer *healthcheck.ProxierHealthServer,
   660  	healthzBindAddress string,
   661  	config config.KubeProxyWinkernelConfiguration,
   662  ) (*Proxier, error) {
   663  	if nodeIP == nil {
   664  		klog.InfoS("Invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
   665  		nodeIP = netutils.ParseIPSloppy("127.0.0.1")
   666  	}
   667  
   668  	// windows listens to all node addresses
   669  	nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil, nil)
   670  	serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
   671  
   672  	var healthzPort int
   673  	if len(healthzBindAddress) > 0 {
   674  		_, port, _ := net.SplitHostPort(healthzBindAddress)
   675  		healthzPort, _ = strconv.Atoi(port)
   676  	}
   677  
   678  	hcnImpl := newHcnImpl()
   679  	hns, supportedFeatures := newHostNetworkService(hcnImpl)
   680  	hnsNetworkName, err := getNetworkName(config.NetworkName)
   681  	if err != nil {
   682  		return nil, err
   683  	}
   684  
   685  	klog.V(3).InfoS("Cleaning up old HNS policy lists")
   686  	hcnImpl.DeleteAllHnsLoadBalancerPolicy()
   687  
   688  	// Get HNS network information
   689  	hnsNetworkInfo, err := getNetworkInfo(hns, hnsNetworkName)
   690  	if err != nil {
   691  		return nil, err
   692  	}
   693  
   694  	// Network could have been detected before Remote Subnet Routes are applied or ManagementIP is updated
   695  	// Sleep and update the network to include new information
   696  	if isOverlay(hnsNetworkInfo) {
   697  		time.Sleep(10 * time.Second)
   698  		hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
   699  		if err != nil {
   700  			return nil, fmt.Errorf("could not find HNS network %s", hnsNetworkName)
   701  		}
   702  	}
   703  
   704  	klog.V(1).InfoS("Hns Network loaded", "hnsNetworkInfo", hnsNetworkInfo)
   705  	isDSR := config.EnableDSR
   706  	if isDSR && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinDSR) {
   707  		return nil, fmt.Errorf("WinDSR feature gate not enabled")
   708  	}
   709  
   710  	err = hcnImpl.DsrSupported()
   711  	if isDSR && err != nil {
   712  		return nil, err
   713  	}
   714  
   715  	var sourceVip string
   716  	var hostMac string
   717  	if isOverlay(hnsNetworkInfo) {
   718  		if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) {
   719  			return nil, fmt.Errorf("WinOverlay feature gate not enabled")
   720  		}
   721  		err = hcn.RemoteSubnetSupported()
   722  		if err != nil {
   723  			return nil, err
   724  		}
   725  		sourceVip = config.SourceVip
   726  		if len(sourceVip) == 0 {
   727  			return nil, fmt.Errorf("source-vip flag not set")
   728  		}
   729  
   730  		if nodeIP.IsUnspecified() {
   731  			// attempt to get the correct ip address
   732  			klog.V(2).InfoS("Node ip was unspecified, attempting to find node ip")
   733  			nodeIP, err = apiutil.ResolveBindAddress(nodeIP)
   734  			if err != nil {
   735  				klog.InfoS("Failed to find an ip. You may need set the --bind-address flag", "err", err)
   736  			}
   737  		}
   738  
   739  		interfaces, _ := net.Interfaces() //TODO create interfaces
   740  		for _, inter := range interfaces {
   741  			addresses, _ := inter.Addrs()
   742  			for _, addr := range addresses {
   743  				addrIP, _, _ := netutils.ParseCIDRSloppy(addr.String())
   744  				if addrIP.String() == nodeIP.String() {
   745  					klog.V(2).InfoS("Record Host MAC address", "addr", inter.HardwareAddr)
   746  					hostMac = inter.HardwareAddr.String()
   747  				}
   748  			}
   749  		}
   750  		if len(hostMac) == 0 {
   751  			return nil, fmt.Errorf("could not find host mac address for %s", nodeIP)
   752  		}
   753  	}
   754  
   755  	proxier := &Proxier{
   756  		ipFamily:              ipFamily,
   757  		endPointsRefCount:     make(endPointsReferenceCountMap),
   758  		svcPortMap:            make(proxy.ServicePortMap),
   759  		endpointsMap:          make(proxy.EndpointsMap),
   760  		hostname:              hostname,
   761  		nodeIP:                nodeIP,
   762  		recorder:              recorder,
   763  		serviceHealthServer:   serviceHealthServer,
   764  		healthzServer:         healthzServer,
   765  		hns:                   hns,
   766  		hcn:                   hcnImpl,
   767  		network:               *hnsNetworkInfo,
   768  		sourceVip:             sourceVip,
   769  		hostMac:               hostMac,
   770  		isDSR:                 isDSR,
   771  		supportedFeatures:     supportedFeatures,
   772  		healthzPort:           healthzPort,
   773  		rootHnsEndpointName:   config.RootHnsEndpointName,
   774  		forwardHealthCheckVip: config.ForwardHealthCheckVip,
   775  		mapStaleLoadbalancers: make(map[string]bool),
   776  	}
   777  
   778  	serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
   779  	endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
   780  	proxier.endpointsChanges = endPointChangeTracker
   781  	proxier.serviceChanges = serviceChanges
   782  
   783  	burstSyncs := 2
   784  	klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
   785  	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
   786  	return proxier, nil
   787  }
   788  
   789  func NewDualStackProxier(
   790  	syncPeriod time.Duration,
   791  	minSyncPeriod time.Duration,
   792  	hostname string,
   793  	nodeIPs map[v1.IPFamily]net.IP,
   794  	recorder events.EventRecorder,
   795  	healthzServer *healthcheck.ProxierHealthServer,
   796  	healthzBindAddress string,
   797  	config config.KubeProxyWinkernelConfiguration,
   798  ) (proxy.Provider, error) {
   799  
   800  	// Create an ipv4 instance of the single-stack proxier
   801  	ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
   802  		hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
   803  		healthzBindAddress, config)
   804  
   805  	if err != nil {
   806  		return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv4Protocol])
   807  	}
   808  
   809  	ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
   810  		hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
   811  		healthzBindAddress, config)
   812  	if err != nil {
   813  		return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv6Protocol])
   814  	}
   815  
   816  	// Return a meta-proxier that dispatch calls between the two
   817  	// single-stack proxier instances
   818  	return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
   819  }
   820  
   821  // CleanupLeftovers removes all hns rules created by the Proxier
   822  // It returns true if an error was encountered. Errors are logged.
   823  func CleanupLeftovers() (encounteredError bool) {
   824  	// Delete all Hns Load Balancer Policies
   825  	newHcnImpl().DeleteAllHnsLoadBalancerPolicy()
   826  	// TODO
   827  	// Delete all Hns Remote endpoints
   828  
   829  	return encounteredError
   830  }
   831  
   832  func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapStaleLoadbalancers map[string]bool, isEndpointChange bool) {
   833  	klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo)
   834  	// if it's an endpoint change and winProxyOptimization annotation enable, skip lb deletion and remoteEndpoint deletion
   835  	winProxyOptimization := isEndpointChange && svcInfo.winProxyOptimization
   836  	if winProxyOptimization {
   837  		klog.V(3).InfoS("Skipped loadbalancer deletion.", "hnsID", svcInfo.hnsID, "nodePorthnsID", svcInfo.nodePorthnsID, "winProxyOptimization", svcInfo.winProxyOptimization, "isEndpointChange", isEndpointChange)
   838  	} else {
   839  		// Skip the svcInfo.policyApplied check to remove all the policies
   840  		svcInfo.deleteLoadBalancerPolicy(mapStaleLoadbalancers)
   841  	}
   842  	// Cleanup Endpoints references
   843  	for _, ep := range endpoints {
   844  		epInfo, ok := ep.(*endpointInfo)
   845  		if ok {
   846  			if winProxyOptimization {
   847  				epInfo.DecrementRefCount()
   848  			} else {
   849  				epInfo.Cleanup()
   850  			}
   851  		}
   852  	}
   853  	if svcInfo.remoteEndpoint != nil {
   854  		svcInfo.remoteEndpoint.Cleanup()
   855  	}
   856  
   857  	svcInfo.policyApplied = false
   858  }
   859  
   860  func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[string]bool) {
   861  	// Remove the Hns Policy corresponding to this service
   862  	hns := svcInfo.hns
   863  	if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil {
   864  		mapStaleLoadbalancer[svcInfo.hnsID] = true
   865  		klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP())
   866  	} else {
   867  		// On successful delete, remove hnsId
   868  		svcInfo.hnsID = ""
   869  	}
   870  
   871  	if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil {
   872  		mapStaleLoadbalancer[svcInfo.nodePorthnsID] = true
   873  		klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort())
   874  	} else {
   875  		// On successful delete, remove hnsId
   876  		svcInfo.nodePorthnsID = ""
   877  	}
   878  
   879  	for _, externalIP := range svcInfo.externalIPs {
   880  		mapStaleLoadbalancer[externalIP.hnsID] = true
   881  		if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil {
   882  			klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip)
   883  		} else {
   884  			// On successful delete, remove hnsId
   885  			externalIP.hnsID = ""
   886  		}
   887  	}
   888  	for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
   889  		klog.V(3).InfoS("Loadbalancer Hns LoadBalancer delete triggered for loadBalancer Ingress resources in cleanup", "lbIngressIP", lbIngressIP)
   890  		if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil {
   891  			mapStaleLoadbalancer[lbIngressIP.hnsID] = true
   892  			klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip)
   893  		} else {
   894  			// On successful delete, remove hnsId
   895  			lbIngressIP.hnsID = ""
   896  		}
   897  
   898  		if lbIngressIP.healthCheckHnsID != "" {
   899  			if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil {
   900  				mapStaleLoadbalancer[lbIngressIP.healthCheckHnsID] = true
   901  				klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip)
   902  			} else {
   903  				// On successful delete, remove hnsId
   904  				lbIngressIP.healthCheckHnsID = ""
   905  			}
   906  		}
   907  	}
   908  }
   909  
   910  // Sync is called to synchronize the proxier state to hns as soon as possible.
   911  func (proxier *Proxier) Sync() {
   912  	if proxier.healthzServer != nil {
   913  		proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
   914  	}
   915  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   916  	proxier.syncRunner.Run()
   917  }
   918  
   919  // SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
   920  func (proxier *Proxier) SyncLoop() {
   921  	// Update healthz timestamp at beginning in case Sync() never succeeds.
   922  	if proxier.healthzServer != nil {
   923  		proxier.healthzServer.Updated(proxier.ipFamily)
   924  	}
   925  	// synthesize "last change queued" time as the informers are syncing.
   926  	metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
   927  	proxier.syncRunner.Loop(wait.NeverStop)
   928  }
   929  
   930  func (proxier *Proxier) setInitialized(value bool) {
   931  	var initialized int32
   932  	if value {
   933  		initialized = 1
   934  	}
   935  	atomic.StoreInt32(&proxier.initialized, initialized)
   936  }
   937  
   938  func (proxier *Proxier) isInitialized() bool {
   939  	return atomic.LoadInt32(&proxier.initialized) > 0
   940  }
   941  
   942  // OnServiceAdd is called whenever creation of new service object
   943  // is observed.
   944  func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
   945  	proxier.OnServiceUpdate(nil, service)
   946  }
   947  
   948  // OnServiceUpdate is called whenever modification of an existing
   949  // service object is observed.
   950  func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
   951  	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
   952  		proxier.Sync()
   953  	}
   954  }
   955  
   956  // OnServiceDelete is called whenever deletion of an existing service
   957  // object is observed.
   958  func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
   959  	proxier.OnServiceUpdate(service, nil)
   960  }
   961  
   962  // OnServiceSynced is called once all the initial event handlers were
   963  // called and the state is fully propagated to local cache.
   964  func (proxier *Proxier) OnServiceSynced() {
   965  	proxier.mu.Lock()
   966  	proxier.servicesSynced = true
   967  	proxier.setInitialized(proxier.endpointSlicesSynced)
   968  	proxier.mu.Unlock()
   969  
   970  	// Sync unconditionally - this is called once per lifetime.
   971  	proxier.syncProxyRules()
   972  }
   973  
   974  // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
   975  // is observed.
   976  func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
   977  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   978  		proxier.Sync()
   979  	}
   980  }
   981  
   982  // OnEndpointSliceUpdate is called whenever modification of an existing endpoint
   983  // slice object is observed.
   984  func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
   985  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
   986  		proxier.Sync()
   987  	}
   988  }
   989  
   990  // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
   991  // object is observed.
   992  func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
   993  	if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
   994  		proxier.Sync()
   995  	}
   996  }
   997  
   998  // OnEndpointSlicesSynced is called once all the initial event handlers were
   999  // called and the state is fully propagated to local cache.
  1000  func (proxier *Proxier) OnEndpointSlicesSynced() {
  1001  	proxier.mu.Lock()
  1002  	proxier.endpointSlicesSynced = true
  1003  	proxier.setInitialized(proxier.servicesSynced)
  1004  	proxier.mu.Unlock()
  1005  
  1006  	// Sync unconditionally - this is called once per lifetime.
  1007  	proxier.syncProxyRules()
  1008  }
  1009  
  1010  // OnServiceCIDRsChanged is called whenever a change is observed
  1011  // in any of the ServiceCIDRs, and provides complete list of service cidrs.
  1012  func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
  1013  
  1014  func (proxier *Proxier) cleanupAllPolicies() {
  1015  	for svcName, svc := range proxier.svcPortMap {
  1016  		svcInfo, ok := svc.(*serviceInfo)
  1017  		if !ok {
  1018  			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
  1019  			continue
  1020  		}
  1021  		svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName], proxier.mapStaleLoadbalancers, false)
  1022  	}
  1023  }
  1024  
  1025  func isNetworkNotFoundError(err error) bool {
  1026  	if err == nil {
  1027  		return false
  1028  	}
  1029  	if _, ok := err.(hcn.NetworkNotFoundError); ok {
  1030  		return true
  1031  	}
  1032  	if _, ok := err.(hcsshim.NetworkNotFoundError); ok {
  1033  		return true
  1034  	}
  1035  	return false
  1036  }
  1037  
  1038  // isAllEndpointsTerminating function will return true if all the endpoints are terminating.
  1039  // If atleast one is not terminating, then return false
  1040  func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
  1041  	for _, epInfo := range proxier.endpointsMap[svcName] {
  1042  		ep, ok := epInfo.(*endpointInfo)
  1043  		if !ok {
  1044  			continue
  1045  		}
  1046  		if isLocalTrafficDSR && !ep.IsLocal() {
  1047  			// KEP-1669: Ignore remote endpoints when the ExternalTrafficPolicy is Local (DSR Mode)
  1048  			continue
  1049  		}
  1050  		// If Readiness Probe fails and pod is not under delete, then
  1051  		// the state of the endpoint will be - Ready:False, Serving:False, Terminating:False
  1052  		if !ep.IsReady() && !ep.IsTerminating() {
  1053  			// Ready:false, Terminating:False, ignore
  1054  			continue
  1055  		}
  1056  		if !ep.IsTerminating() {
  1057  			return false
  1058  		}
  1059  	}
  1060  	return true
  1061  }
  1062  
  1063  // isAllEndpointsNonServing function will return true if all the endpoints are non serving.
  1064  // If atleast one is serving, then return false
  1065  func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
  1066  	for _, epInfo := range proxier.endpointsMap[svcName] {
  1067  		ep, ok := epInfo.(*endpointInfo)
  1068  		if !ok {
  1069  			continue
  1070  		}
  1071  		if isLocalTrafficDSR && !ep.IsLocal() {
  1072  			continue
  1073  		}
  1074  		if ep.IsServing() {
  1075  			return false
  1076  		}
  1077  	}
  1078  	return true
  1079  }
  1080  
  1081  // updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details
  1082  func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) {
  1083  	// store newly created endpoints in queriedEndpoints
  1084  	queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
  1085  	queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
  1086  }
  1087  
  1088  // This is where all of the hns save/restore calls happen.
  1089  // assumes proxier.mu is held
  1090  func (proxier *Proxier) syncProxyRules() {
  1091  	proxier.mu.Lock()
  1092  	defer proxier.mu.Unlock()
  1093  
  1094  	// don't sync rules till we've received services and endpoints
  1095  	if !proxier.isInitialized() {
  1096  		klog.V(2).InfoS("Not syncing hns until Services and Endpoints have been received from master")
  1097  		return
  1098  	}
  1099  
  1100  	// Keep track of how long syncs take.
  1101  	start := time.Now()
  1102  	defer func() {
  1103  		metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
  1104  		klog.V(4).InfoS("Syncing proxy rules complete", "elapsed", time.Since(start))
  1105  	}()
  1106  
  1107  	hnsNetworkName := proxier.network.name
  1108  	hns := proxier.hns
  1109  
  1110  	var gatewayHnsendpoint *endpointInfo
  1111  	if proxier.forwardHealthCheckVip {
  1112  		gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
  1113  	}
  1114  
  1115  	prevNetworkID := proxier.network.id
  1116  	updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
  1117  	if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
  1118  		klog.InfoS("The HNS network is not present or has changed since the last sync, please check the CNI deployment", "hnsNetworkName", hnsNetworkName)
  1119  		proxier.cleanupAllPolicies()
  1120  		if updatedNetwork != nil {
  1121  			proxier.network = *updatedNetwork
  1122  		}
  1123  		return
  1124  	}
  1125  
  1126  	// We assume that if this was called, we really want to sync them,
  1127  	// even if nothing changed in the meantime. In other words, callers are
  1128  	// responsible for detecting no-op changes and not calling this function.
  1129  	serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
  1130  	endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
  1131  
  1132  	deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
  1133  	// merge stale services gathered from EndpointsMap.Update
  1134  	for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
  1135  		if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
  1136  			klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
  1137  			deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
  1138  		}
  1139  	}
  1140  	// Query HNS for endpoints and load balancers
  1141  	queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
  1142  	if err != nil {
  1143  		klog.ErrorS(err, "Querying HNS for endpoints failed")
  1144  		return
  1145  	}
  1146  	if queriedEndpoints == nil {
  1147  		klog.V(4).InfoS("No existing endpoints found in HNS")
  1148  		queriedEndpoints = make(map[string]*(endpointInfo))
  1149  	}
  1150  	queriedLoadBalancers, err := hns.getAllLoadBalancers()
  1151  	if queriedLoadBalancers == nil {
  1152  		klog.V(4).InfoS("No existing load balancers found in HNS")
  1153  		queriedLoadBalancers = make(map[loadBalancerIdentifier]*(loadBalancerInfo))
  1154  	}
  1155  	if err != nil {
  1156  		klog.ErrorS(err, "Querying HNS for load balancers failed")
  1157  		return
  1158  	}
  1159  	if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
  1160  		if _, ok := queriedEndpoints[proxier.sourceVip]; !ok {
  1161  			_, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
  1162  			if err != nil {
  1163  				klog.ErrorS(err, "Source Vip endpoint creation failed")
  1164  				return
  1165  			}
  1166  		}
  1167  	}
  1168  
  1169  	klog.V(3).InfoS("Syncing Policies")
  1170  
  1171  	// Program HNS by adding corresponding policies for each service.
  1172  	for svcName, svc := range proxier.svcPortMap {
  1173  		svcInfo, ok := svc.(*serviceInfo)
  1174  		if !ok {
  1175  			klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
  1176  			continue
  1177  		}
  1178  
  1179  		if svcInfo.policyApplied {
  1180  			klog.V(4).InfoS("Policy already applied", "serviceInfo", svcInfo)
  1181  			continue
  1182  		}
  1183  
  1184  		if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
  1185  			serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
  1186  			if serviceVipEndpoint == nil {
  1187  				klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
  1188  				hnsEndpoint := &endpointInfo{
  1189  					ip:              svcInfo.ClusterIP().String(),
  1190  					isLocal:         false,
  1191  					macAddress:      proxier.hostMac,
  1192  					providerAddress: proxier.nodeIP.String(),
  1193  				}
  1194  
  1195  				newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1196  				if err != nil {
  1197  					klog.ErrorS(err, "Remote endpoint creation failed for service VIP")
  1198  					continue
  1199  				}
  1200  
  1201  				newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
  1202  				*newHnsEndpoint.refCount++
  1203  				svcInfo.remoteEndpoint = newHnsEndpoint
  1204  				updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
  1205  			}
  1206  		}
  1207  
  1208  		var hnsEndpoints []endpointInfo
  1209  		var hnsLocalEndpoints []endpointInfo
  1210  		klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName)
  1211  		// Create Remote endpoints for every endpoint, corresponding to the service
  1212  		containsPublicIP := false
  1213  		containsNodeIP := false
  1214  		var allEndpointsTerminating, allEndpointsNonServing bool
  1215  		someEndpointsServing := true
  1216  
  1217  		if len(svcInfo.loadBalancerIngressIPs) > 0 {
  1218  			// Check should be done only if comes under the feature gate or enabled
  1219  			// The check should be done only if Spec.Type == Loadbalancer.
  1220  			allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR)
  1221  			allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR)
  1222  			someEndpointsServing = !allEndpointsNonServing
  1223  			klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR)
  1224  		} else {
  1225  			klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs))
  1226  		}
  1227  
  1228  		for _, epInfo := range proxier.endpointsMap[svcName] {
  1229  			ep, ok := epInfo.(*endpointInfo)
  1230  			if !ok {
  1231  				klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName)
  1232  				continue
  1233  			}
  1234  
  1235  			if svcInfo.internalTrafficLocal && svcInfo.localTrafficDSR && !ep.IsLocal() {
  1236  				// No need to use or create remote endpoint when internal and external traffic policy is remote
  1237  				klog.V(3).InfoS("Skipping the endpoint. Both internalTraffic and external traffic policies are local", "EpIP", ep.ip, " EpPort", ep.port)
  1238  				continue
  1239  			}
  1240  
  1241  			if someEndpointsServing {
  1242  
  1243  				if !allEndpointsTerminating && !ep.IsReady() {
  1244  					klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady())
  1245  					continue
  1246  				}
  1247  				if !ep.IsServing() {
  1248  					klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing())
  1249  					continue
  1250  				}
  1251  
  1252  			}
  1253  
  1254  			var newHnsEndpoint *endpointInfo
  1255  			hnsNetworkName := proxier.network.name
  1256  			var err error
  1257  
  1258  			// targetPort is zero if it is specified as a name in port.TargetPort, so the real port should be got from endpoints.
  1259  			// Note that hcsshim.AddLoadBalancer() doesn't support endpoints with different ports, so only port from first endpoint is used.
  1260  			// TODO(feiskyer): add support of different endpoint ports after hcsshim.AddLoadBalancer() add that.
  1261  			if svcInfo.targetPort == 0 {
  1262  				svcInfo.targetPort = int(ep.port)
  1263  			}
  1264  			// There is a bug in Windows Server 2019 that can cause two endpoints to be created with the same IP address, so we need to check using endpoint ID first.
  1265  			// TODO: Remove lookup by endpoint ID, and use the IP address only, so we don't need to maintain multiple keys for lookup.
  1266  			if len(ep.hnsID) > 0 {
  1267  				newHnsEndpoint = queriedEndpoints[ep.hnsID]
  1268  			}
  1269  
  1270  			if newHnsEndpoint == nil {
  1271  				// First check if an endpoint resource exists for this IP, on the current host
  1272  				// A Local endpoint could exist here already
  1273  				// A remote endpoint was already created and proxy was restarted
  1274  				newHnsEndpoint = queriedEndpoints[ep.IP()]
  1275  			}
  1276  
  1277  			if newHnsEndpoint == nil {
  1278  				if ep.IsLocal() {
  1279  					klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
  1280  					continue
  1281  				}
  1282  
  1283  				if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
  1284  					klog.InfoS("Updating network to check for new remote subnet policies", "networkName", proxier.network.name)
  1285  					networkName := proxier.network.name
  1286  					updatedNetwork, err := hns.getNetworkByName(networkName)
  1287  					if err != nil {
  1288  						klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
  1289  						proxier.cleanupAllPolicies()
  1290  						return
  1291  					}
  1292  					proxier.network = *updatedNetwork
  1293  					providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
  1294  					if len(providerAddress) == 0 {
  1295  						klog.InfoS("Could not find provider address, assuming it is a public IP", "IP", ep.IP())
  1296  						providerAddress = proxier.nodeIP.String()
  1297  					}
  1298  
  1299  					hnsEndpoint := &endpointInfo{
  1300  						ip:              ep.ip,
  1301  						isLocal:         false,
  1302  						macAddress:      conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)),
  1303  						providerAddress: providerAddress,
  1304  					}
  1305  
  1306  					newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1307  					if err != nil {
  1308  						klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint)
  1309  						continue
  1310  					}
  1311  					updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
  1312  				} else {
  1313  
  1314  					hnsEndpoint := &endpointInfo{
  1315  						ip:         ep.ip,
  1316  						isLocal:    false,
  1317  						macAddress: ep.macAddress,
  1318  					}
  1319  
  1320  					newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
  1321  					if err != nil {
  1322  						klog.ErrorS(err, "Remote endpoint creation failed")
  1323  						continue
  1324  					}
  1325  					updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
  1326  				}
  1327  			}
  1328  			// For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as
  1329  			// a) Source VIP configured on kube-proxy (or)
  1330  			// b) Node IP of the current node
  1331  			//
  1332  			// For L2Bridge network the Source VIP is always the NodeIP of the current node and the same
  1333  			// would be configured on kube-proxy as SourceVIP
  1334  			//
  1335  			// The logic for choosing the SourceVIP in Overlay networks is based on the backend endpoints:
  1336  			// a) Endpoints are any IP's outside the cluster ==> Choose NodeIP as the SourceVIP
  1337  			// b) Endpoints are IP addresses of a remote node => Choose NodeIP as the SourceVIP
  1338  			// c) Everything else (Local POD's, Remote POD's, Node IP of current node) ==> Choose the configured SourceVIP
  1339  			if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) && !ep.IsLocal() {
  1340  				providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
  1341  
  1342  				isNodeIP := (ep.IP() == providerAddress)
  1343  				isPublicIP := (len(providerAddress) == 0)
  1344  				klog.InfoS("Endpoint on overlay network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName, "isNodeIP", isNodeIP, "isPublicIP", isPublicIP)
  1345  
  1346  				containsNodeIP = containsNodeIP || isNodeIP
  1347  				containsPublicIP = containsPublicIP || isPublicIP
  1348  			}
  1349  
  1350  			// Save the hnsId for reference
  1351  			klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint)
  1352  
  1353  			hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
  1354  			if newHnsEndpoint.IsLocal() {
  1355  				hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
  1356  			} else {
  1357  				// We only share the refCounts for remote endpoints
  1358  				ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
  1359  				*ep.refCount++
  1360  			}
  1361  
  1362  			ep.hnsID = newHnsEndpoint.hnsID
  1363  
  1364  			klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep)
  1365  		}
  1366  
  1367  		klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName)
  1368  
  1369  		if len(svcInfo.hnsID) > 0 {
  1370  			// This should not happen
  1371  			klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
  1372  		}
  1373  
  1374  		// In ETP:Cluster, if all endpoints are under termination,
  1375  		// it will have serving and terminating, else only ready and serving
  1376  		if len(hnsEndpoints) == 0 {
  1377  			if svcInfo.winProxyOptimization {
  1378  				// Deleting loadbalancers when there are no endpoints to serve.
  1379  				klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName)
  1380  				svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
  1381  			}
  1382  			klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
  1383  			continue
  1384  		}
  1385  
  1386  		klog.V(4).InfoS("Trying to apply Policies for service", "serviceInfo", svcInfo)
  1387  		var hnsLoadBalancer *loadBalancerInfo
  1388  		var sourceVip = proxier.sourceVip
  1389  		if containsPublicIP || containsNodeIP {
  1390  			sourceVip = proxier.nodeIP.String()
  1391  		}
  1392  
  1393  		sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
  1394  		if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
  1395  			klog.InfoS("Session Affinity is not supported on this version of Windows")
  1396  		}
  1397  
  1398  		endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing
  1399  		proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
  1400  
  1401  		// clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer.
  1402  		clusterIPEndpoints := hnsEndpoints
  1403  		if svcInfo.internalTrafficLocal {
  1404  			// Take local endpoints for clusterip loadbalancer when internal traffic policy is local.
  1405  			clusterIPEndpoints = hnsLocalEndpoints
  1406  		}
  1407  
  1408  		if len(clusterIPEndpoints) > 0 {
  1409  
  1410  			// If all endpoints are terminating, then no need to create Cluster IP LoadBalancer
  1411  			// Cluster IP LoadBalancer creation
  1412  			hnsLoadBalancer, err := hns.getLoadBalancer(
  1413  				clusterIPEndpoints,
  1414  				loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
  1415  				sourceVip,
  1416  				svcInfo.ClusterIP().String(),
  1417  				Enum(svcInfo.Protocol()),
  1418  				uint16(svcInfo.targetPort),
  1419  				uint16(svcInfo.Port()),
  1420  				queriedLoadBalancers,
  1421  			)
  1422  			if err != nil {
  1423  				klog.ErrorS(err, "Policy creation failed")
  1424  				continue
  1425  			}
  1426  
  1427  			svcInfo.hnsID = hnsLoadBalancer.hnsID
  1428  			klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
  1429  
  1430  		} else {
  1431  			klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
  1432  		}
  1433  
  1434  		// If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints
  1435  		if svcInfo.NodePort() > 0 {
  1436  			// If the preserve-destination service annotation is present, we will disable routing mesh for NodePort.
  1437  			// This means that health services can use Node Port without falsely getting results from a different node.
  1438  			nodePortEndpoints := hnsEndpoints
  1439  			if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
  1440  				nodePortEndpoints = hnsLocalEndpoints
  1441  			}
  1442  
  1443  			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers)
  1444  
  1445  			if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
  1446  				// If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer
  1447  				hnsLoadBalancer, err := hns.getLoadBalancer(
  1448  					nodePortEndpoints,
  1449  					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
  1450  					sourceVip,
  1451  					"",
  1452  					Enum(svcInfo.Protocol()),
  1453  					uint16(svcInfo.targetPort),
  1454  					uint16(svcInfo.NodePort()),
  1455  					queriedLoadBalancers,
  1456  				)
  1457  				if err != nil {
  1458  					klog.ErrorS(err, "Policy creation failed")
  1459  					continue
  1460  				}
  1461  
  1462  				svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
  1463  				klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
  1464  			} else {
  1465  				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
  1466  			}
  1467  		}
  1468  
  1469  		// Create a Load Balancer Policy for each external IP
  1470  		for _, externalIP := range svcInfo.externalIPs {
  1471  			// Disable routing mesh if ExternalTrafficPolicy is set to local
  1472  			externalIPEndpoints := hnsEndpoints
  1473  			if svcInfo.localTrafficDSR {
  1474  				externalIPEndpoints = hnsLocalEndpoints
  1475  			}
  1476  
  1477  			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
  1478  
  1479  			if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
  1480  				// If all endpoints are in terminating stage, then no need to External IP LoadBalancer
  1481  				// Try loading existing policies, if already available
  1482  				hnsLoadBalancer, err = hns.getLoadBalancer(
  1483  					externalIPEndpoints,
  1484  					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
  1485  					sourceVip,
  1486  					externalIP.ip,
  1487  					Enum(svcInfo.Protocol()),
  1488  					uint16(svcInfo.targetPort),
  1489  					uint16(svcInfo.Port()),
  1490  					queriedLoadBalancers,
  1491  				)
  1492  				if err != nil {
  1493  					klog.ErrorS(err, "Policy creation failed")
  1494  					continue
  1495  				}
  1496  				externalIP.hnsID = hnsLoadBalancer.hnsID
  1497  				klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
  1498  			} else {
  1499  				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
  1500  			}
  1501  		}
  1502  		// Create a Load Balancer Policy for each loadbalancer ingress
  1503  		for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
  1504  			// Try loading existing policies, if already available
  1505  			lbIngressEndpoints := hnsEndpoints
  1506  			if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
  1507  				lbIngressEndpoints = hnsLocalEndpoints
  1508  			}
  1509  
  1510  			proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
  1511  
  1512  			if len(lbIngressEndpoints) > 0 {
  1513  				hnsLoadBalancer, err := hns.getLoadBalancer(
  1514  					lbIngressEndpoints,
  1515  					loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
  1516  					sourceVip,
  1517  					lbIngressIP.ip,
  1518  					Enum(svcInfo.Protocol()),
  1519  					uint16(svcInfo.targetPort),
  1520  					uint16(svcInfo.Port()),
  1521  					queriedLoadBalancers,
  1522  				)
  1523  				if err != nil {
  1524  					klog.ErrorS(err, "Policy creation failed")
  1525  					continue
  1526  				}
  1527  				lbIngressIP.hnsID = hnsLoadBalancer.hnsID
  1528  				klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
  1529  			} else {
  1530  				klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
  1531  			}
  1532  
  1533  			if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB {
  1534  				// Avoid creating health check loadbalancer if all the endpoints are terminating
  1535  				nodeport := proxier.healthzPort
  1536  				if svcInfo.HealthCheckNodePort() != 0 {
  1537  					nodeport = svcInfo.HealthCheckNodePort()
  1538  				}
  1539  
  1540  				proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
  1541  
  1542  				hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
  1543  					[]endpointInfo{*gatewayHnsendpoint},
  1544  					loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
  1545  					sourceVip,
  1546  					lbIngressIP.ip,
  1547  					Enum(svcInfo.Protocol()),
  1548  					uint16(nodeport),
  1549  					uint16(nodeport),
  1550  					queriedLoadBalancers,
  1551  				)
  1552  				if err != nil {
  1553  					klog.ErrorS(err, "Policy creation failed")
  1554  					continue
  1555  				}
  1556  				lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
  1557  				klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
  1558  			} else {
  1559  				klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating)
  1560  			}
  1561  		}
  1562  		svcInfo.policyApplied = true
  1563  		klog.V(2).InfoS("Policy successfully applied for service", "serviceInfo", svcInfo)
  1564  	}
  1565  
  1566  	if proxier.healthzServer != nil {
  1567  		proxier.healthzServer.Updated(proxier.ipFamily)
  1568  	}
  1569  	metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
  1570  
  1571  	// Update service healthchecks.  The endpoints list might include services that are
  1572  	// not "OnlyLocal", but the services list will not, and the serviceHealthServer
  1573  	// will just drop those endpoints.
  1574  	if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
  1575  		klog.ErrorS(err, "Error syncing healthcheck services")
  1576  	}
  1577  	if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
  1578  		klog.ErrorS(err, "Error syncing healthcheck endpoints")
  1579  	}
  1580  
  1581  	// Finish housekeeping.
  1582  	// TODO: these could be made more consistent.
  1583  	for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
  1584  		// TODO : Check if this is required to cleanup stale services here
  1585  		klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
  1586  	}
  1587  
  1588  	// remove stale endpoint refcount entries
  1589  	for hnsID, referenceCount := range proxier.endPointsRefCount {
  1590  		if *referenceCount <= 0 {
  1591  			klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID)
  1592  			proxier.hns.deleteEndpoint(hnsID)
  1593  			delete(proxier.endPointsRefCount, hnsID)
  1594  		}
  1595  	}
  1596  	// This will cleanup stale load balancers which are pending delete
  1597  	// in last iteration
  1598  	proxier.cleanupStaleLoadbalancers()
  1599  }
  1600  
  1601  // deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not.
  1602  // If it is needed, the function will delete the existing loadbalancer and return true, else false.
  1603  func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
  1604  
  1605  	if !winProxyOptimization || *lbHnsID == "" {
  1606  		// Loadbalancer delete not needed
  1607  		return false
  1608  	}
  1609  
  1610  	lbID, lbIdErr := findLoadBalancerID(
  1611  		endpoints,
  1612  		sourceVip,
  1613  		protocol,
  1614  		intPort,
  1615  		extPort,
  1616  	)
  1617  
  1618  	if lbIdErr != nil {
  1619  		return proxier.deleteLoadBalancer(hns, lbHnsID)
  1620  	}
  1621  
  1622  	if _, ok := queriedLoadBalancers[lbID]; ok {
  1623  		// The existing loadbalancer in the system is same as what we try to delete and recreate. So we skip deleting.
  1624  		return false
  1625  	}
  1626  
  1627  	return proxier.deleteLoadBalancer(hns, lbHnsID)
  1628  }
  1629  
  1630  func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool {
  1631  	klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID)
  1632  	if err := hns.deleteLoadBalancer(*lbHnsID); err != nil {
  1633  		// This will be cleanup by cleanupStaleLoadbalancer fnction.
  1634  		proxier.mapStaleLoadbalancers[*lbHnsID] = true
  1635  	}
  1636  	*lbHnsID = ""
  1637  	return true
  1638  }
  1639  

View as plain text