1
2
3
4
19
20 package winkernel
21
22 import (
23 "fmt"
24 "net"
25 "os"
26 "strconv"
27 "strings"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "github.com/Microsoft/hcsshim"
33 "github.com/Microsoft/hcsshim/hcn"
34 v1 "k8s.io/api/core/v1"
35 discovery "k8s.io/api/discovery/v1"
36 "k8s.io/apimachinery/pkg/util/intstr"
37 apiutil "k8s.io/apimachinery/pkg/util/net"
38 "k8s.io/apimachinery/pkg/util/sets"
39 "k8s.io/apimachinery/pkg/util/wait"
40 utilfeature "k8s.io/apiserver/pkg/util/feature"
41 "k8s.io/client-go/tools/events"
42 "k8s.io/klog/v2"
43 kubefeatures "k8s.io/kubernetes/pkg/features"
44 "k8s.io/kubernetes/pkg/proxy"
45 "k8s.io/kubernetes/pkg/proxy/apis/config"
46 proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
47 "k8s.io/kubernetes/pkg/proxy/healthcheck"
48 "k8s.io/kubernetes/pkg/proxy/metaproxier"
49 "k8s.io/kubernetes/pkg/proxy/metrics"
50 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
51 "k8s.io/kubernetes/pkg/util/async"
52 netutils "k8s.io/utils/net"
53 )
54
55
56
57 type KernelCompatTester interface {
58 IsCompatible() error
59 }
60
61
62
63
64 func CanUseWinKernelProxier(kcompat KernelCompatTester) (bool, error) {
65
66 if err := kcompat.IsCompatible(); err != nil {
67 return false, err
68 }
69 return true, nil
70 }
71
72 type WindowsKernelCompatTester struct{}
73
74
75 func (lkct WindowsKernelCompatTester) IsCompatible() error {
76 _, err := hcsshim.HNSListPolicyListRequest()
77 if err != nil {
78 return fmt.Errorf("Windows kernel is not compatible for Kernel mode")
79 }
80 return nil
81 }
82
83 type externalIPInfo struct {
84 ip string
85 hnsID string
86 }
87
88 type loadBalancerIngressInfo struct {
89 ip string
90 hnsID string
91 healthCheckHnsID string
92 }
93
94 type loadBalancerInfo struct {
95 hnsID string
96 }
97
98 type loadBalancerIdentifier struct {
99 protocol uint16
100 internalPort uint16
101 externalPort uint16
102 vip string
103 endpointsHash [20]byte
104 }
105
106 type loadBalancerFlags struct {
107 isILB bool
108 isDSR bool
109 isVipExternalIP bool
110 localRoutedVIP bool
111 useMUX bool
112 preserveDIP bool
113 sessionAffinity bool
114 isIPv6 bool
115 }
116
117
118 type serviceInfo struct {
119 *proxy.BaseServicePortInfo
120 targetPort int
121 externalIPs []*externalIPInfo
122 loadBalancerIngressIPs []*loadBalancerIngressInfo
123 hnsID string
124 nodePorthnsID string
125 policyApplied bool
126 remoteEndpoint *endpointInfo
127 hns HostNetworkService
128 preserveDIP bool
129 localTrafficDSR bool
130 internalTrafficLocal bool
131 winProxyOptimization bool
132 }
133
134 type hnsNetworkInfo struct {
135 name string
136 id string
137 networkType string
138 remoteSubnets []*remoteSubnetInfo
139 }
140
141 type remoteSubnetInfo struct {
142 destinationPrefix string
143 isolationID uint16
144 providerAddress string
145 drMacAddress string
146 }
147
148 const (
149 NETWORK_TYPE_OVERLAY = "overlay"
150
151
152 MAX_COUNT_STALE_LOADBALANCERS = 20
153 )
154
155 func newHostNetworkService(hcnImpl HcnService) (HostNetworkService, hcn.SupportedFeatures) {
156 var h HostNetworkService
157 supportedFeatures := hcnImpl.GetSupportedFeatures()
158 if supportedFeatures.Api.V2 {
159 h = hns{
160 hcn: hcnImpl,
161 }
162 } else {
163 panic("Windows HNS Api V2 required. This version of windows does not support API V2")
164 }
165 return h, supportedFeatures
166 }
167
168
169
170 func logFormattedEndpoints(logMsg string, logLevel klog.Level, svcPortName proxy.ServicePortName, eps []proxy.Endpoint) {
171 if klog.V(logLevel).Enabled() {
172 var epInfo string
173 for _, v := range eps {
174 epInfo = epInfo + fmt.Sprintf("\n %s={Ready:%v,Serving:%v,Terminating:%v,IsRemote:%v}", v.String(), v.IsReady(), v.IsServing(), v.IsTerminating(), !v.IsLocal())
175 }
176 klog.V(logLevel).InfoS(logMsg, "svcPortName", svcPortName, "endpoints", epInfo)
177 }
178 }
179
180
181
182
183 func (proxier *Proxier) cleanupStaleLoadbalancers() {
184 i := 0
185 countStaleLB := len(proxier.mapStaleLoadbalancers)
186 if countStaleLB == 0 {
187 return
188 }
189 klog.V(3).InfoS("Cleanup of stale loadbalancers triggered", "LB Count", countStaleLB)
190 for lbID := range proxier.mapStaleLoadbalancers {
191 i++
192 if err := proxier.hns.deleteLoadBalancer(lbID); err == nil {
193 delete(proxier.mapStaleLoadbalancers, lbID)
194 }
195 if i == MAX_COUNT_STALE_LOADBALANCERS {
196
197 break
198 }
199 }
200 countStaleLB = len(proxier.mapStaleLoadbalancers)
201 if countStaleLB > 0 {
202 klog.V(3).InfoS("Stale loadbalancers still remaining", "LB Count", countStaleLB, "stale_lb_ids", proxier.mapStaleLoadbalancers)
203 }
204 }
205
206 func getNetworkName(hnsNetworkName string) (string, error) {
207 if len(hnsNetworkName) == 0 {
208 klog.V(3).InfoS("Flag --network-name not set, checking environment variable")
209 hnsNetworkName = os.Getenv("KUBE_NETWORK")
210 if len(hnsNetworkName) == 0 {
211 return "", fmt.Errorf("Environment variable KUBE_NETWORK and network-flag not initialized")
212 }
213 }
214 return hnsNetworkName, nil
215 }
216
217 func getNetworkInfo(hns HostNetworkService, hnsNetworkName string) (*hnsNetworkInfo, error) {
218 hnsNetworkInfo, err := hns.getNetworkByName(hnsNetworkName)
219 for err != nil {
220 klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
221 time.Sleep(1 * time.Second)
222 hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
223 }
224 return hnsNetworkInfo, err
225 }
226
227 func isOverlay(hnsNetworkInfo *hnsNetworkInfo) bool {
228 return strings.EqualFold(hnsNetworkInfo.networkType, NETWORK_TYPE_OVERLAY)
229 }
230
231
232 type StackCompatTester interface {
233 DualStackCompatible(networkName string) bool
234 }
235
236 type DualStackCompatTester struct{}
237
238 func (t DualStackCompatTester) DualStackCompatible(networkName string) bool {
239 hcnImpl := newHcnImpl()
240
241 if err := hcnImpl.Ipv6DualStackSupported(); err != nil {
242
243
244
245
246
247
248
249 klog.InfoS("This version of Windows does not support dual-stack, falling back to single-stack", "err", err.Error())
250 return false
251 }
252
253
254 hns, _ := newHostNetworkService(hcnImpl)
255 networkName, err := getNetworkName(networkName)
256 if err != nil {
257 klog.ErrorS(err, "Unable to determine dual-stack status, falling back to single-stack")
258 return false
259 }
260 networkInfo, err := getNetworkInfo(hns, networkName)
261 if err != nil {
262 klog.ErrorS(err, "Unable to determine dual-stack status, falling back to single-stack")
263 return false
264 }
265
266 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) && isOverlay(networkInfo) {
267
268 klog.InfoS("Winoverlay does not support dual-stack, falling back to single-stack")
269 return false
270 }
271
272 return true
273 }
274
275
276 type endpointInfo struct {
277 ip string
278 port uint16
279 isLocal bool
280 macAddress string
281 hnsID string
282 refCount *uint16
283 providerAddress string
284 hns HostNetworkService
285
286
287 ready bool
288 serving bool
289 terminating bool
290 }
291
292
293 func (info *endpointInfo) String() string {
294 return net.JoinHostPort(info.ip, strconv.Itoa(int(info.port)))
295 }
296
297
298 func (info *endpointInfo) IsLocal() bool {
299 return info.isLocal
300 }
301
302
303 func (info *endpointInfo) IsReady() bool {
304 return info.ready
305 }
306
307
308 func (info *endpointInfo) IsServing() bool {
309 return info.serving
310 }
311
312
313 func (info *endpointInfo) IsTerminating() bool {
314 return info.terminating
315 }
316
317
318 func (info *endpointInfo) ZoneHints() sets.Set[string] {
319 return sets.Set[string]{}
320 }
321
322
323 func (info *endpointInfo) IP() string {
324 return info.ip
325 }
326
327
328 func (info *endpointInfo) Port() int {
329 return int(info.port)
330 }
331
332
333
334
335
336 func conjureMac(macPrefix string, ip net.IP) string {
337 if ip4 := ip.To4(); ip4 != nil {
338 a, b, c, d := ip4[0], ip4[1], ip4[2], ip4[3]
339 return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
340 } else if ip6 := ip.To16(); ip6 != nil {
341 a, b, c, d := ip6[15], ip6[14], ip6[13], ip6[12]
342 return fmt.Sprintf("%v-%02x-%02x-%02x-%02x", macPrefix, a, b, c, d)
343 }
344 return "02-11-22-33-44-55"
345 }
346
347 func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) {
348
349 var svcPortMap = make(map[proxy.ServicePortName]bool)
350 var logLevel klog.Level = 5
351 for svcPortName, eps := range oldEndpointsMap {
352 logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps)
353 svcPortMap[svcPortName] = true
354 proxier.onEndpointsMapChange(&svcPortName, false)
355 }
356
357 for svcPortName, eps := range newEndpointsMap {
358 logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps)
359
360 redundantCleanup := svcPortMap[svcPortName]
361 proxier.onEndpointsMapChange(&svcPortName, redundantCleanup)
362 }
363 }
364
365 func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, redundantCleanup bool) {
366
367 svc, exists := proxier.svcPortMap[*svcPortName]
368
369 if exists {
370 svcInfo, ok := svc.(*serviceInfo)
371
372 if !ok {
373 klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
374 return
375 }
376
377 if svcInfo.winProxyOptimization && redundantCleanup {
378
379
380
381 return
382 }
383
384 klog.V(3).InfoS("Endpoints are modified. Service is stale", "servicePortName", svcPortName)
385 svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, true)
386 } else {
387
388 klog.V(3).InfoS("Endpoints are orphaned, cleaning up")
389
390 epInfos, exists := proxier.endpointsMap[*svcPortName]
391
392 if exists {
393
394 for _, ep := range epInfos {
395 epInfo, ok := ep.(*endpointInfo)
396
397 if ok {
398 epInfo.Cleanup()
399 }
400
401 }
402 }
403 }
404 }
405
406 func (proxier *Proxier) serviceMapChange(previous, current proxy.ServicePortMap) {
407 for svcPortName := range current {
408 proxier.onServiceMapChange(&svcPortName)
409 }
410
411 for svcPortName := range previous {
412 if _, ok := current[svcPortName]; ok {
413 continue
414 }
415 proxier.onServiceMapChange(&svcPortName)
416 }
417 }
418
419 func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
420
421 svc, exists := proxier.svcPortMap[*svcPortName]
422
423 if exists {
424 svcInfo, ok := svc.(*serviceInfo)
425
426 if !ok {
427 klog.ErrorS(nil, "Failed to cast serviceInfo", "servicePortName", svcPortName)
428 return
429 }
430
431 klog.V(3).InfoS("Updating existing service port", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol())
432 svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, false)
433 }
434 }
435
436
437 func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
438
439 info := &endpointInfo{
440 ip: baseInfo.IP(),
441 port: uint16(baseInfo.Port()),
442 isLocal: baseInfo.IsLocal(),
443 macAddress: conjureMac("02-11", netutils.ParseIPSloppy(baseInfo.IP())),
444 refCount: new(uint16),
445 hnsID: "",
446 hns: proxier.hns,
447
448 ready: baseInfo.IsReady(),
449 serving: baseInfo.IsServing(),
450 terminating: baseInfo.IsTerminating(),
451 }
452
453 return info
454 }
455
456 func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, providerAddress string) (*endpointInfo, error) {
457 hnsEndpoint := &endpointInfo{
458 ip: ip,
459 isLocal: true,
460 macAddress: mac,
461 providerAddress: providerAddress,
462
463 ready: true,
464 serving: true,
465 terminating: false,
466 }
467 ep, err := hns.createEndpoint(hnsEndpoint, network)
468 return ep, err
469 }
470
471 func (ep *endpointInfo) DecrementRefCount() {
472 klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointInfo", ep)
473 if !ep.IsLocal() && ep.refCount != nil && *ep.refCount > 0 {
474 *ep.refCount--
475 }
476 }
477
478 func (ep *endpointInfo) Cleanup() {
479 klog.V(3).InfoS("Endpoint cleanup", "endpointInfo", ep)
480 if !ep.IsLocal() && ep.refCount != nil {
481 *ep.refCount--
482
483
484
485
486 if *ep.refCount <= 0 && !ep.IsLocal() {
487 klog.V(4).InfoS("Removing endpoints, since no one is referencing it", "endpoint", ep)
488 err := ep.hns.deleteEndpoint(ep.hnsID)
489 if err == nil {
490 ep.hnsID = ""
491 } else {
492 klog.ErrorS(err, "Endpoint deletion failed", "ip", ep.IP())
493 }
494 }
495
496 ep.refCount = nil
497 }
498 }
499
500 func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 {
501 refCount, exists := refCountMap[hnsID]
502 if !exists {
503 refCountMap[hnsID] = new(uint16)
504 refCount = refCountMap[hnsID]
505 }
506 return refCount
507 }
508
509
510 func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort {
511 info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo}
512 preserveDIP := service.Annotations["preserve-destination"] == "true"
513
514 winProxyOptimization := !(strings.ToUpper(service.Annotations["winProxyOptimization"]) == "DISABLED")
515 localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyLocal
516 var internalTrafficLocal bool
517 if service.Spec.InternalTrafficPolicy != nil {
518 internalTrafficLocal = *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal
519 }
520 hcnImpl := proxier.hcn
521 err := hcnImpl.DsrSupported()
522 if err != nil {
523 preserveDIP = false
524 localTrafficDSR = false
525 }
526
527
528 targetPort := 0
529 if port.TargetPort.Type == intstr.Int {
530 targetPort = port.TargetPort.IntValue()
531 }
532
533 info.preserveDIP = preserveDIP
534 info.targetPort = targetPort
535 info.hns = proxier.hns
536 info.localTrafficDSR = localTrafficDSR
537 info.internalTrafficLocal = internalTrafficLocal
538 info.winProxyOptimization = winProxyOptimization
539 klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "internalTrafficLocal", internalTrafficLocal, "preserveDIP", preserveDIP, "winProxyOptimization", winProxyOptimization)
540
541 for _, eip := range service.Spec.ExternalIPs {
542 info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip})
543 }
544
545 for _, ingress := range service.Status.LoadBalancer.Ingress {
546 if netutils.ParseIPSloppy(ingress.IP) != nil {
547 info.loadBalancerIngressIPs = append(info.loadBalancerIngressIPs, &loadBalancerIngressInfo{ip: ingress.IP})
548 }
549 }
550 return info
551 }
552
553 func (network hnsNetworkInfo) findRemoteSubnetProviderAddress(ip string) string {
554 var providerAddress string
555 for _, rs := range network.remoteSubnets {
556 _, ipNet, err := netutils.ParseCIDRSloppy(rs.destinationPrefix)
557 if err != nil {
558 klog.ErrorS(err, "Failed to parse CIDR")
559 }
560 if ipNet.Contains(netutils.ParseIPSloppy(ip)) {
561 providerAddress = rs.providerAddress
562 }
563 if ip == rs.providerAddress {
564 providerAddress = rs.providerAddress
565 }
566 }
567
568 return providerAddress
569 }
570
571 type endPointsReferenceCountMap map[string]*uint16
572
573
574
575 type Proxier struct {
576
577 ipFamily v1.IPFamily
578
579 proxyconfig.NoopNodeHandler
580
581
582
583
584
585 endpointsChanges *proxy.EndpointsChangeTracker
586 serviceChanges *proxy.ServiceChangeTracker
587 endPointsRefCount endPointsReferenceCountMap
588 mu sync.Mutex
589 svcPortMap proxy.ServicePortMap
590 endpointsMap proxy.EndpointsMap
591
592
593
594 endpointSlicesSynced bool
595 servicesSynced bool
596 initialized int32
597 syncRunner *async.BoundedFrequencyRunner
598
599 hostname string
600 nodeIP net.IP
601 recorder events.EventRecorder
602
603 serviceHealthServer healthcheck.ServiceHealthServer
604 healthzServer *healthcheck.ProxierHealthServer
605
606 hns HostNetworkService
607 hcn HcnService
608 network hnsNetworkInfo
609 sourceVip string
610 hostMac string
611 isDSR bool
612 supportedFeatures hcn.SupportedFeatures
613 healthzPort int
614
615 forwardHealthCheckVip bool
616 rootHnsEndpointName string
617 mapStaleLoadbalancers map[string]bool
618 }
619
620 type localPort struct {
621 desc string
622 ip string
623 port int
624 protocol string
625 }
626
627 func (lp *localPort) String() string {
628 return fmt.Sprintf("%q (%s:%d/%s)", lp.desc, lp.ip, lp.port, lp.protocol)
629 }
630
631 func Enum(p v1.Protocol) uint16 {
632 if p == v1.ProtocolTCP {
633 return 6
634 }
635 if p == v1.ProtocolUDP {
636 return 17
637 }
638 if p == v1.ProtocolSCTP {
639 return 132
640 }
641 return 0
642 }
643
644 type closeable interface {
645 Close() error
646 }
647
648
649 var _ proxy.Provider = &Proxier{}
650
651
652 func NewProxier(
653 ipFamily v1.IPFamily,
654 syncPeriod time.Duration,
655 minSyncPeriod time.Duration,
656 hostname string,
657 nodeIP net.IP,
658 recorder events.EventRecorder,
659 healthzServer *healthcheck.ProxierHealthServer,
660 healthzBindAddress string,
661 config config.KubeProxyWinkernelConfiguration,
662 ) (*Proxier, error) {
663 if nodeIP == nil {
664 klog.InfoS("Invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
665 nodeIP = netutils.ParseIPSloppy("127.0.0.1")
666 }
667
668
669 nodePortAddresses := proxyutil.NewNodePortAddresses(ipFamily, nil, nil)
670 serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder, nodePortAddresses, healthzServer)
671
672 var healthzPort int
673 if len(healthzBindAddress) > 0 {
674 _, port, _ := net.SplitHostPort(healthzBindAddress)
675 healthzPort, _ = strconv.Atoi(port)
676 }
677
678 hcnImpl := newHcnImpl()
679 hns, supportedFeatures := newHostNetworkService(hcnImpl)
680 hnsNetworkName, err := getNetworkName(config.NetworkName)
681 if err != nil {
682 return nil, err
683 }
684
685 klog.V(3).InfoS("Cleaning up old HNS policy lists")
686 hcnImpl.DeleteAllHnsLoadBalancerPolicy()
687
688
689 hnsNetworkInfo, err := getNetworkInfo(hns, hnsNetworkName)
690 if err != nil {
691 return nil, err
692 }
693
694
695
696 if isOverlay(hnsNetworkInfo) {
697 time.Sleep(10 * time.Second)
698 hnsNetworkInfo, err = hns.getNetworkByName(hnsNetworkName)
699 if err != nil {
700 return nil, fmt.Errorf("could not find HNS network %s", hnsNetworkName)
701 }
702 }
703
704 klog.V(1).InfoS("Hns Network loaded", "hnsNetworkInfo", hnsNetworkInfo)
705 isDSR := config.EnableDSR
706 if isDSR && !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinDSR) {
707 return nil, fmt.Errorf("WinDSR feature gate not enabled")
708 }
709
710 err = hcnImpl.DsrSupported()
711 if isDSR && err != nil {
712 return nil, err
713 }
714
715 var sourceVip string
716 var hostMac string
717 if isOverlay(hnsNetworkInfo) {
718 if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.WinOverlay) {
719 return nil, fmt.Errorf("WinOverlay feature gate not enabled")
720 }
721 err = hcn.RemoteSubnetSupported()
722 if err != nil {
723 return nil, err
724 }
725 sourceVip = config.SourceVip
726 if len(sourceVip) == 0 {
727 return nil, fmt.Errorf("source-vip flag not set")
728 }
729
730 if nodeIP.IsUnspecified() {
731
732 klog.V(2).InfoS("Node ip was unspecified, attempting to find node ip")
733 nodeIP, err = apiutil.ResolveBindAddress(nodeIP)
734 if err != nil {
735 klog.InfoS("Failed to find an ip. You may need set the --bind-address flag", "err", err)
736 }
737 }
738
739 interfaces, _ := net.Interfaces()
740 for _, inter := range interfaces {
741 addresses, _ := inter.Addrs()
742 for _, addr := range addresses {
743 addrIP, _, _ := netutils.ParseCIDRSloppy(addr.String())
744 if addrIP.String() == nodeIP.String() {
745 klog.V(2).InfoS("Record Host MAC address", "addr", inter.HardwareAddr)
746 hostMac = inter.HardwareAddr.String()
747 }
748 }
749 }
750 if len(hostMac) == 0 {
751 return nil, fmt.Errorf("could not find host mac address for %s", nodeIP)
752 }
753 }
754
755 proxier := &Proxier{
756 ipFamily: ipFamily,
757 endPointsRefCount: make(endPointsReferenceCountMap),
758 svcPortMap: make(proxy.ServicePortMap),
759 endpointsMap: make(proxy.EndpointsMap),
760 hostname: hostname,
761 nodeIP: nodeIP,
762 recorder: recorder,
763 serviceHealthServer: serviceHealthServer,
764 healthzServer: healthzServer,
765 hns: hns,
766 hcn: hcnImpl,
767 network: *hnsNetworkInfo,
768 sourceVip: sourceVip,
769 hostMac: hostMac,
770 isDSR: isDSR,
771 supportedFeatures: supportedFeatures,
772 healthzPort: healthzPort,
773 rootHnsEndpointName: config.RootHnsEndpointName,
774 forwardHealthCheckVip: config.ForwardHealthCheckVip,
775 mapStaleLoadbalancers: make(map[string]bool),
776 }
777
778 serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
779 endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
780 proxier.endpointsChanges = endPointChangeTracker
781 proxier.serviceChanges = serviceChanges
782
783 burstSyncs := 2
784 klog.V(3).InfoS("Record sync param", "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
785 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
786 return proxier, nil
787 }
788
789 func NewDualStackProxier(
790 syncPeriod time.Duration,
791 minSyncPeriod time.Duration,
792 hostname string,
793 nodeIPs map[v1.IPFamily]net.IP,
794 recorder events.EventRecorder,
795 healthzServer *healthcheck.ProxierHealthServer,
796 healthzBindAddress string,
797 config config.KubeProxyWinkernelConfiguration,
798 ) (proxy.Provider, error) {
799
800
801 ipv4Proxier, err := NewProxier(v1.IPv4Protocol, syncPeriod, minSyncPeriod,
802 hostname, nodeIPs[v1.IPv4Protocol], recorder, healthzServer,
803 healthzBindAddress, config)
804
805 if err != nil {
806 return nil, fmt.Errorf("unable to create ipv4 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv4Protocol])
807 }
808
809 ipv6Proxier, err := NewProxier(v1.IPv6Protocol, syncPeriod, minSyncPeriod,
810 hostname, nodeIPs[v1.IPv6Protocol], recorder, healthzServer,
811 healthzBindAddress, config)
812 if err != nil {
813 return nil, fmt.Errorf("unable to create ipv6 proxier: %v, hostname: %s, nodeIP:%v", err, hostname, nodeIPs[v1.IPv6Protocol])
814 }
815
816
817
818 return metaproxier.NewMetaProxier(ipv4Proxier, ipv6Proxier), nil
819 }
820
821
822
823 func CleanupLeftovers() (encounteredError bool) {
824
825 newHcnImpl().DeleteAllHnsLoadBalancerPolicy()
826
827
828
829 return encounteredError
830 }
831
832 func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapStaleLoadbalancers map[string]bool, isEndpointChange bool) {
833 klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo)
834
835 winProxyOptimization := isEndpointChange && svcInfo.winProxyOptimization
836 if winProxyOptimization {
837 klog.V(3).InfoS("Skipped loadbalancer deletion.", "hnsID", svcInfo.hnsID, "nodePorthnsID", svcInfo.nodePorthnsID, "winProxyOptimization", svcInfo.winProxyOptimization, "isEndpointChange", isEndpointChange)
838 } else {
839
840 svcInfo.deleteLoadBalancerPolicy(mapStaleLoadbalancers)
841 }
842
843 for _, ep := range endpoints {
844 epInfo, ok := ep.(*endpointInfo)
845 if ok {
846 if winProxyOptimization {
847 epInfo.DecrementRefCount()
848 } else {
849 epInfo.Cleanup()
850 }
851 }
852 }
853 if svcInfo.remoteEndpoint != nil {
854 svcInfo.remoteEndpoint.Cleanup()
855 }
856
857 svcInfo.policyApplied = false
858 }
859
860 func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[string]bool) {
861
862 hns := svcInfo.hns
863 if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil {
864 mapStaleLoadbalancer[svcInfo.hnsID] = true
865 klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP())
866 } else {
867
868 svcInfo.hnsID = ""
869 }
870
871 if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil {
872 mapStaleLoadbalancer[svcInfo.nodePorthnsID] = true
873 klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort())
874 } else {
875
876 svcInfo.nodePorthnsID = ""
877 }
878
879 for _, externalIP := range svcInfo.externalIPs {
880 mapStaleLoadbalancer[externalIP.hnsID] = true
881 if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil {
882 klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip)
883 } else {
884
885 externalIP.hnsID = ""
886 }
887 }
888 for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
889 klog.V(3).InfoS("Loadbalancer Hns LoadBalancer delete triggered for loadBalancer Ingress resources in cleanup", "lbIngressIP", lbIngressIP)
890 if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil {
891 mapStaleLoadbalancer[lbIngressIP.hnsID] = true
892 klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip)
893 } else {
894
895 lbIngressIP.hnsID = ""
896 }
897
898 if lbIngressIP.healthCheckHnsID != "" {
899 if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil {
900 mapStaleLoadbalancer[lbIngressIP.healthCheckHnsID] = true
901 klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip)
902 } else {
903
904 lbIngressIP.healthCheckHnsID = ""
905 }
906 }
907 }
908 }
909
910
911 func (proxier *Proxier) Sync() {
912 if proxier.healthzServer != nil {
913 proxier.healthzServer.QueuedUpdate(proxier.ipFamily)
914 }
915 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
916 proxier.syncRunner.Run()
917 }
918
919
920 func (proxier *Proxier) SyncLoop() {
921
922 if proxier.healthzServer != nil {
923 proxier.healthzServer.Updated(proxier.ipFamily)
924 }
925
926 metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
927 proxier.syncRunner.Loop(wait.NeverStop)
928 }
929
930 func (proxier *Proxier) setInitialized(value bool) {
931 var initialized int32
932 if value {
933 initialized = 1
934 }
935 atomic.StoreInt32(&proxier.initialized, initialized)
936 }
937
938 func (proxier *Proxier) isInitialized() bool {
939 return atomic.LoadInt32(&proxier.initialized) > 0
940 }
941
942
943
944 func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
945 proxier.OnServiceUpdate(nil, service)
946 }
947
948
949
950 func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
951 if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
952 proxier.Sync()
953 }
954 }
955
956
957
958 func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
959 proxier.OnServiceUpdate(service, nil)
960 }
961
962
963
964 func (proxier *Proxier) OnServiceSynced() {
965 proxier.mu.Lock()
966 proxier.servicesSynced = true
967 proxier.setInitialized(proxier.endpointSlicesSynced)
968 proxier.mu.Unlock()
969
970
971 proxier.syncProxyRules()
972 }
973
974
975
976 func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
977 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
978 proxier.Sync()
979 }
980 }
981
982
983
984 func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
985 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
986 proxier.Sync()
987 }
988 }
989
990
991
992 func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
993 if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
994 proxier.Sync()
995 }
996 }
997
998
999
1000 func (proxier *Proxier) OnEndpointSlicesSynced() {
1001 proxier.mu.Lock()
1002 proxier.endpointSlicesSynced = true
1003 proxier.setInitialized(proxier.servicesSynced)
1004 proxier.mu.Unlock()
1005
1006
1007 proxier.syncProxyRules()
1008 }
1009
1010
1011
1012 func (proxier *Proxier) OnServiceCIDRsChanged(_ []string) {}
1013
1014 func (proxier *Proxier) cleanupAllPolicies() {
1015 for svcName, svc := range proxier.svcPortMap {
1016 svcInfo, ok := svc.(*serviceInfo)
1017 if !ok {
1018 klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
1019 continue
1020 }
1021 svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName], proxier.mapStaleLoadbalancers, false)
1022 }
1023 }
1024
1025 func isNetworkNotFoundError(err error) bool {
1026 if err == nil {
1027 return false
1028 }
1029 if _, ok := err.(hcn.NetworkNotFoundError); ok {
1030 return true
1031 }
1032 if _, ok := err.(hcsshim.NetworkNotFoundError); ok {
1033 return true
1034 }
1035 return false
1036 }
1037
1038
1039
1040 func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
1041 for _, epInfo := range proxier.endpointsMap[svcName] {
1042 ep, ok := epInfo.(*endpointInfo)
1043 if !ok {
1044 continue
1045 }
1046 if isLocalTrafficDSR && !ep.IsLocal() {
1047
1048 continue
1049 }
1050
1051
1052 if !ep.IsReady() && !ep.IsTerminating() {
1053
1054 continue
1055 }
1056 if !ep.IsTerminating() {
1057 return false
1058 }
1059 }
1060 return true
1061 }
1062
1063
1064
1065 func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool {
1066 for _, epInfo := range proxier.endpointsMap[svcName] {
1067 ep, ok := epInfo.(*endpointInfo)
1068 if !ok {
1069 continue
1070 }
1071 if isLocalTrafficDSR && !ep.IsLocal() {
1072 continue
1073 }
1074 if ep.IsServing() {
1075 return false
1076 }
1077 }
1078 return true
1079 }
1080
1081
1082 func updateQueriedEndpoints(newHnsEndpoint *endpointInfo, queriedEndpoints map[string]*endpointInfo) {
1083
1084 queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint
1085 queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint
1086 }
1087
1088
1089
1090 func (proxier *Proxier) syncProxyRules() {
1091 proxier.mu.Lock()
1092 defer proxier.mu.Unlock()
1093
1094
1095 if !proxier.isInitialized() {
1096 klog.V(2).InfoS("Not syncing hns until Services and Endpoints have been received from master")
1097 return
1098 }
1099
1100
1101 start := time.Now()
1102 defer func() {
1103 metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
1104 klog.V(4).InfoS("Syncing proxy rules complete", "elapsed", time.Since(start))
1105 }()
1106
1107 hnsNetworkName := proxier.network.name
1108 hns := proxier.hns
1109
1110 var gatewayHnsendpoint *endpointInfo
1111 if proxier.forwardHealthCheckVip {
1112 gatewayHnsendpoint, _ = hns.getEndpointByName(proxier.rootHnsEndpointName)
1113 }
1114
1115 prevNetworkID := proxier.network.id
1116 updatedNetwork, err := hns.getNetworkByName(hnsNetworkName)
1117 if updatedNetwork == nil || updatedNetwork.id != prevNetworkID || isNetworkNotFoundError(err) {
1118 klog.InfoS("The HNS network is not present or has changed since the last sync, please check the CNI deployment", "hnsNetworkName", hnsNetworkName)
1119 proxier.cleanupAllPolicies()
1120 if updatedNetwork != nil {
1121 proxier.network = *updatedNetwork
1122 }
1123 return
1124 }
1125
1126
1127
1128
1129 serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
1130 endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
1131
1132 deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
1133
1134 for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
1135 if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
1136 klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
1137 deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
1138 }
1139 }
1140
1141 queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
1142 if err != nil {
1143 klog.ErrorS(err, "Querying HNS for endpoints failed")
1144 return
1145 }
1146 if queriedEndpoints == nil {
1147 klog.V(4).InfoS("No existing endpoints found in HNS")
1148 queriedEndpoints = make(map[string]*(endpointInfo))
1149 }
1150 queriedLoadBalancers, err := hns.getAllLoadBalancers()
1151 if queriedLoadBalancers == nil {
1152 klog.V(4).InfoS("No existing load balancers found in HNS")
1153 queriedLoadBalancers = make(map[loadBalancerIdentifier]*(loadBalancerInfo))
1154 }
1155 if err != nil {
1156 klog.ErrorS(err, "Querying HNS for load balancers failed")
1157 return
1158 }
1159 if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
1160 if _, ok := queriedEndpoints[proxier.sourceVip]; !ok {
1161 _, err = newSourceVIP(hns, hnsNetworkName, proxier.sourceVip, proxier.hostMac, proxier.nodeIP.String())
1162 if err != nil {
1163 klog.ErrorS(err, "Source Vip endpoint creation failed")
1164 return
1165 }
1166 }
1167 }
1168
1169 klog.V(3).InfoS("Syncing Policies")
1170
1171
1172 for svcName, svc := range proxier.svcPortMap {
1173 svcInfo, ok := svc.(*serviceInfo)
1174 if !ok {
1175 klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
1176 continue
1177 }
1178
1179 if svcInfo.policyApplied {
1180 klog.V(4).InfoS("Policy already applied", "serviceInfo", svcInfo)
1181 continue
1182 }
1183
1184 if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
1185 serviceVipEndpoint := queriedEndpoints[svcInfo.ClusterIP().String()]
1186 if serviceVipEndpoint == nil {
1187 klog.V(4).InfoS("No existing remote endpoint", "IP", svcInfo.ClusterIP())
1188 hnsEndpoint := &endpointInfo{
1189 ip: svcInfo.ClusterIP().String(),
1190 isLocal: false,
1191 macAddress: proxier.hostMac,
1192 providerAddress: proxier.nodeIP.String(),
1193 }
1194
1195 newHnsEndpoint, err := hns.createEndpoint(hnsEndpoint, hnsNetworkName)
1196 if err != nil {
1197 klog.ErrorS(err, "Remote endpoint creation failed for service VIP")
1198 continue
1199 }
1200
1201 newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
1202 *newHnsEndpoint.refCount++
1203 svcInfo.remoteEndpoint = newHnsEndpoint
1204 updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
1205 }
1206 }
1207
1208 var hnsEndpoints []endpointInfo
1209 var hnsLocalEndpoints []endpointInfo
1210 klog.V(4).InfoS("Applying Policy", "serviceInfo", svcName)
1211
1212 containsPublicIP := false
1213 containsNodeIP := false
1214 var allEndpointsTerminating, allEndpointsNonServing bool
1215 someEndpointsServing := true
1216
1217 if len(svcInfo.loadBalancerIngressIPs) > 0 {
1218
1219
1220 allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR)
1221 allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR)
1222 someEndpointsServing = !allEndpointsNonServing
1223 klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR)
1224 } else {
1225 klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs))
1226 }
1227
1228 for _, epInfo := range proxier.endpointsMap[svcName] {
1229 ep, ok := epInfo.(*endpointInfo)
1230 if !ok {
1231 klog.ErrorS(nil, "Failed to cast endpointInfo", "serviceName", svcName)
1232 continue
1233 }
1234
1235 if svcInfo.internalTrafficLocal && svcInfo.localTrafficDSR && !ep.IsLocal() {
1236
1237 klog.V(3).InfoS("Skipping the endpoint. Both internalTraffic and external traffic policies are local", "EpIP", ep.ip, " EpPort", ep.port)
1238 continue
1239 }
1240
1241 if someEndpointsServing {
1242
1243 if !allEndpointsTerminating && !ep.IsReady() {
1244 klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady())
1245 continue
1246 }
1247 if !ep.IsServing() {
1248 klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing())
1249 continue
1250 }
1251
1252 }
1253
1254 var newHnsEndpoint *endpointInfo
1255 hnsNetworkName := proxier.network.name
1256 var err error
1257
1258
1259
1260
1261 if svcInfo.targetPort == 0 {
1262 svcInfo.targetPort = int(ep.port)
1263 }
1264
1265
1266 if len(ep.hnsID) > 0 {
1267 newHnsEndpoint = queriedEndpoints[ep.hnsID]
1268 }
1269
1270 if newHnsEndpoint == nil {
1271
1272
1273
1274 newHnsEndpoint = queriedEndpoints[ep.IP()]
1275 }
1276
1277 if newHnsEndpoint == nil {
1278 if ep.IsLocal() {
1279 klog.ErrorS(err, "Local endpoint not found: on network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName)
1280 continue
1281 }
1282
1283 if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) {
1284 klog.InfoS("Updating network to check for new remote subnet policies", "networkName", proxier.network.name)
1285 networkName := proxier.network.name
1286 updatedNetwork, err := hns.getNetworkByName(networkName)
1287 if err != nil {
1288 klog.ErrorS(err, "Unable to find HNS Network specified, please check network name and CNI deployment", "hnsNetworkName", hnsNetworkName)
1289 proxier.cleanupAllPolicies()
1290 return
1291 }
1292 proxier.network = *updatedNetwork
1293 providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
1294 if len(providerAddress) == 0 {
1295 klog.InfoS("Could not find provider address, assuming it is a public IP", "IP", ep.IP())
1296 providerAddress = proxier.nodeIP.String()
1297 }
1298
1299 hnsEndpoint := &endpointInfo{
1300 ip: ep.ip,
1301 isLocal: false,
1302 macAddress: conjureMac("02-11", netutils.ParseIPSloppy(ep.ip)),
1303 providerAddress: providerAddress,
1304 }
1305
1306 newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
1307 if err != nil {
1308 klog.ErrorS(err, "Remote endpoint creation failed", "endpointInfo", hnsEndpoint)
1309 continue
1310 }
1311 updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
1312 } else {
1313
1314 hnsEndpoint := &endpointInfo{
1315 ip: ep.ip,
1316 isLocal: false,
1317 macAddress: ep.macAddress,
1318 }
1319
1320 newHnsEndpoint, err = hns.createEndpoint(hnsEndpoint, hnsNetworkName)
1321 if err != nil {
1322 klog.ErrorS(err, "Remote endpoint creation failed")
1323 continue
1324 }
1325 updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints)
1326 }
1327 }
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339 if strings.EqualFold(proxier.network.networkType, NETWORK_TYPE_OVERLAY) && !ep.IsLocal() {
1340 providerAddress := proxier.network.findRemoteSubnetProviderAddress(ep.IP())
1341
1342 isNodeIP := (ep.IP() == providerAddress)
1343 isPublicIP := (len(providerAddress) == 0)
1344 klog.InfoS("Endpoint on overlay network", "ip", ep.IP(), "hnsNetworkName", hnsNetworkName, "isNodeIP", isNodeIP, "isPublicIP", isPublicIP)
1345
1346 containsNodeIP = containsNodeIP || isNodeIP
1347 containsPublicIP = containsPublicIP || isPublicIP
1348 }
1349
1350
1351 klog.V(1).InfoS("Hns endpoint resource", "endpointInfo", newHnsEndpoint)
1352
1353 hnsEndpoints = append(hnsEndpoints, *newHnsEndpoint)
1354 if newHnsEndpoint.IsLocal() {
1355 hnsLocalEndpoints = append(hnsLocalEndpoints, *newHnsEndpoint)
1356 } else {
1357
1358 ep.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID)
1359 *ep.refCount++
1360 }
1361
1362 ep.hnsID = newHnsEndpoint.hnsID
1363
1364 klog.V(3).InfoS("Endpoint resource found", "endpointInfo", ep)
1365 }
1366
1367 klog.V(3).InfoS("Associated endpoints for service", "endpointInfo", hnsEndpoints, "serviceName", svcName)
1368
1369 if len(svcInfo.hnsID) > 0 {
1370
1371 klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID)
1372 }
1373
1374
1375
1376 if len(hnsEndpoints) == 0 {
1377 if svcInfo.winProxyOptimization {
1378
1379 klog.V(3).InfoS("Cleanup existing ", "endpointInfo", hnsEndpoints, "serviceName", svcName)
1380 svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers)
1381 }
1382 klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName)
1383 continue
1384 }
1385
1386 klog.V(4).InfoS("Trying to apply Policies for service", "serviceInfo", svcInfo)
1387 var hnsLoadBalancer *loadBalancerInfo
1388 var sourceVip = proxier.sourceVip
1389 if containsPublicIP || containsNodeIP {
1390 sourceVip = proxier.nodeIP.String()
1391 }
1392
1393 sessionAffinityClientIP := svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP
1394 if sessionAffinityClientIP && !proxier.supportedFeatures.SessionAffinity {
1395 klog.InfoS("Session Affinity is not supported on this version of Windows")
1396 }
1397
1398 endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing
1399 proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers)
1400
1401
1402 clusterIPEndpoints := hnsEndpoints
1403 if svcInfo.internalTrafficLocal {
1404
1405 clusterIPEndpoints = hnsLocalEndpoints
1406 }
1407
1408 if len(clusterIPEndpoints) > 0 {
1409
1410
1411
1412 hnsLoadBalancer, err := hns.getLoadBalancer(
1413 clusterIPEndpoints,
1414 loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.ipFamily == v1.IPv6Protocol, sessionAffinity: sessionAffinityClientIP},
1415 sourceVip,
1416 svcInfo.ClusterIP().String(),
1417 Enum(svcInfo.Protocol()),
1418 uint16(svcInfo.targetPort),
1419 uint16(svcInfo.Port()),
1420 queriedLoadBalancers,
1421 )
1422 if err != nil {
1423 klog.ErrorS(err, "Policy creation failed")
1424 continue
1425 }
1426
1427 svcInfo.hnsID = hnsLoadBalancer.hnsID
1428 klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID)
1429
1430 } else {
1431 klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
1432 }
1433
1434
1435 if svcInfo.NodePort() > 0 {
1436
1437
1438 nodePortEndpoints := hnsEndpoints
1439 if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
1440 nodePortEndpoints = hnsLocalEndpoints
1441 }
1442
1443 proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers)
1444
1445 if len(nodePortEndpoints) > 0 && endpointsAvailableForLB {
1446
1447 hnsLoadBalancer, err := hns.getLoadBalancer(
1448 nodePortEndpoints,
1449 loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
1450 sourceVip,
1451 "",
1452 Enum(svcInfo.Protocol()),
1453 uint16(svcInfo.targetPort),
1454 uint16(svcInfo.NodePort()),
1455 queriedLoadBalancers,
1456 )
1457 if err != nil {
1458 klog.ErrorS(err, "Policy creation failed")
1459 continue
1460 }
1461
1462 svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID
1463 klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID)
1464 } else {
1465 klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating)
1466 }
1467 }
1468
1469
1470 for _, externalIP := range svcInfo.externalIPs {
1471
1472 externalIPEndpoints := hnsEndpoints
1473 if svcInfo.localTrafficDSR {
1474 externalIPEndpoints = hnsLocalEndpoints
1475 }
1476
1477 proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers)
1478
1479 if len(externalIPEndpoints) > 0 && endpointsAvailableForLB {
1480
1481
1482 hnsLoadBalancer, err = hns.getLoadBalancer(
1483 externalIPEndpoints,
1484 loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
1485 sourceVip,
1486 externalIP.ip,
1487 Enum(svcInfo.Protocol()),
1488 uint16(svcInfo.targetPort),
1489 uint16(svcInfo.Port()),
1490 queriedLoadBalancers,
1491 )
1492 if err != nil {
1493 klog.ErrorS(err, "Policy creation failed")
1494 continue
1495 }
1496 externalIP.hnsID = hnsLoadBalancer.hnsID
1497 klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID)
1498 } else {
1499 klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating)
1500 }
1501 }
1502
1503 for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs {
1504
1505 lbIngressEndpoints := hnsEndpoints
1506 if svcInfo.preserveDIP || svcInfo.localTrafficDSR {
1507 lbIngressEndpoints = hnsLocalEndpoints
1508 }
1509
1510 proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers)
1511
1512 if len(lbIngressEndpoints) > 0 {
1513 hnsLoadBalancer, err := hns.getLoadBalancer(
1514 lbIngressEndpoints,
1515 loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.ipFamily == v1.IPv6Protocol},
1516 sourceVip,
1517 lbIngressIP.ip,
1518 Enum(svcInfo.Protocol()),
1519 uint16(svcInfo.targetPort),
1520 uint16(svcInfo.Port()),
1521 queriedLoadBalancers,
1522 )
1523 if err != nil {
1524 klog.ErrorS(err, "Policy creation failed")
1525 continue
1526 }
1527 lbIngressIP.hnsID = hnsLoadBalancer.hnsID
1528 klog.V(3).InfoS("Hns LoadBalancer resource created for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
1529 } else {
1530 klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP)
1531 }
1532
1533 if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB {
1534
1535 nodeport := proxier.healthzPort
1536 if svcInfo.HealthCheckNodePort() != 0 {
1537 nodeport = svcInfo.HealthCheckNodePort()
1538 }
1539
1540 proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointInfo{*gatewayHnsendpoint}, queriedLoadBalancers)
1541
1542 hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer(
1543 []endpointInfo{*gatewayHnsendpoint},
1544 loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP},
1545 sourceVip,
1546 lbIngressIP.ip,
1547 Enum(svcInfo.Protocol()),
1548 uint16(nodeport),
1549 uint16(nodeport),
1550 queriedLoadBalancers,
1551 )
1552 if err != nil {
1553 klog.ErrorS(err, "Policy creation failed")
1554 continue
1555 }
1556 lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID
1557 klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP)
1558 } else {
1559 klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating)
1560 }
1561 }
1562 svcInfo.policyApplied = true
1563 klog.V(2).InfoS("Policy successfully applied for service", "serviceInfo", svcInfo)
1564 }
1565
1566 if proxier.healthzServer != nil {
1567 proxier.healthzServer.Updated(proxier.ipFamily)
1568 }
1569 metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()
1570
1571
1572
1573
1574 if err := proxier.serviceHealthServer.SyncServices(proxier.svcPortMap.HealthCheckNodePorts()); err != nil {
1575 klog.ErrorS(err, "Error syncing healthcheck services")
1576 }
1577 if err := proxier.serviceHealthServer.SyncEndpoints(proxier.endpointsMap.LocalReadyEndpoints()); err != nil {
1578 klog.ErrorS(err, "Error syncing healthcheck endpoints")
1579 }
1580
1581
1582
1583 for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
1584
1585 klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
1586 }
1587
1588
1589 for hnsID, referenceCount := range proxier.endPointsRefCount {
1590 if *referenceCount <= 0 {
1591 klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID)
1592 proxier.hns.deleteEndpoint(hnsID)
1593 delete(proxier.endPointsRefCount, hnsID)
1594 }
1595 }
1596
1597
1598 proxier.cleanupStaleLoadbalancers()
1599 }
1600
1601
1602
1603 func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool {
1604
1605 if !winProxyOptimization || *lbHnsID == "" {
1606
1607 return false
1608 }
1609
1610 lbID, lbIdErr := findLoadBalancerID(
1611 endpoints,
1612 sourceVip,
1613 protocol,
1614 intPort,
1615 extPort,
1616 )
1617
1618 if lbIdErr != nil {
1619 return proxier.deleteLoadBalancer(hns, lbHnsID)
1620 }
1621
1622 if _, ok := queriedLoadBalancers[lbID]; ok {
1623
1624 return false
1625 }
1626
1627 return proxier.deleteLoadBalancer(hns, lbHnsID)
1628 }
1629
1630 func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool {
1631 klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID)
1632 if err := hns.deleteLoadBalancer(*lbHnsID); err != nil {
1633
1634 proxier.mapStaleLoadbalancers[*lbHnsID] = true
1635 }
1636 *lbHnsID = ""
1637 return true
1638 }
1639
View as plain text