1
2
3
4
19
20 package conntrack
21
22 import (
23 v1 "k8s.io/api/core/v1"
24 "k8s.io/apimachinery/pkg/util/sets"
25 "k8s.io/klog/v2"
26 "k8s.io/kubernetes/pkg/proxy"
27 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
28 netutils "k8s.io/utils/net"
29 )
30
31
32 func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap,
33 serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
34 deleteStaleServiceConntrackEntries(ct, svcPortMap, serviceUpdateResult, endpointsUpdateResult)
35 deleteStaleEndpointConntrackEntries(ct, svcPortMap, endpointsUpdateResult)
36 }
37
38
39
40
41
42 func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
43 conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
44 conntrackCleanupServiceNodePorts := sets.New[int]()
45 isIPv6 := false
46
47
48
49 for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices {
50 if svcInfo, ok := svcPortMap[svcPortName]; ok {
51 klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
52 conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
53 for _, extIP := range svcInfo.ExternalIPs() {
54 conntrackCleanupServiceIPs.Insert(extIP.String())
55 }
56 for _, lbIP := range svcInfo.LoadBalancerVIPs() {
57 conntrackCleanupServiceIPs.Insert(lbIP.String())
58 }
59 nodePort := svcInfo.NodePort()
60 if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
61 conntrackCleanupServiceNodePorts.Insert(nodePort)
62 isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
63 }
64 }
65 }
66
67 klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
68 for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
69 if err := ct.ClearEntriesForIP(svcIP, v1.ProtocolUDP); err != nil {
70 klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
71 }
72 }
73 klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
74 for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
75 err := ct.ClearEntriesForPort(nodePort, isIPv6, v1.ProtocolUDP)
76 if err != nil {
77 klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
78 }
79 }
80 }
81
82
83
84
85 func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
86 for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
87 if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
88 endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
89 nodePort := svcInfo.NodePort()
90 var err error
91 if nodePort != 0 {
92 err = ct.ClearEntriesForPortNAT(endpointIP, nodePort, v1.ProtocolUDP)
93 if err != nil {
94 klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
95 }
96 }
97 err = ct.ClearEntriesForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
98 if err != nil {
99 klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
100 }
101 for _, extIP := range svcInfo.ExternalIPs() {
102 err := ct.ClearEntriesForNAT(extIP.String(), endpointIP, v1.ProtocolUDP)
103 if err != nil {
104 klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
105 }
106 }
107 for _, lbIP := range svcInfo.LoadBalancerVIPs() {
108 err := ct.ClearEntriesForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP)
109 if err != nil {
110 klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
111 }
112 }
113 }
114 }
115 }
116
View as plain text