...
1
16
17 package proxy
18
19 import (
20 v1 "k8s.io/api/core/v1"
21 utilfeature "k8s.io/apiserver/pkg/util/feature"
22 "k8s.io/klog/v2"
23 "k8s.io/kubernetes/pkg/features"
24 )
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
41 var useTopology, useServingTerminatingEndpoints bool
42
43 if svcInfo.UsesClusterEndpoints() {
44 useTopology = canUseTopology(endpoints, svcInfo, nodeLabels)
45 clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
46 if !ep.IsReady() {
47 return false
48 }
49 if useTopology && !availableForTopology(ep, nodeLabels) {
50 return false
51 }
52 return true
53 })
54
55
56
57
58 if len(clusterEndpoints) == 0 {
59 clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
60 if ep.IsServing() && ep.IsTerminating() {
61 return true
62 }
63
64 return false
65 })
66 }
67
68
69
70 if len(clusterEndpoints) > 0 {
71 hasAnyEndpoints = true
72 }
73 }
74
75 if !svcInfo.UsesLocalEndpoints() {
76 allReachableEndpoints = clusterEndpoints
77 return
78 }
79
80
81
82
83 var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
84 for _, ep := range endpoints {
85 if ep.IsReady() {
86 hasAnyEndpoints = true
87 if ep.IsLocal() {
88 hasLocalReadyEndpoints = true
89 }
90 } else if ep.IsServing() && ep.IsTerminating() {
91 hasAnyEndpoints = true
92 if ep.IsLocal() {
93 hasLocalServingTerminatingEndpoints = true
94 }
95 }
96 }
97
98 if hasLocalReadyEndpoints {
99 localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
100 return ep.IsLocal() && ep.IsReady()
101 })
102 } else if hasLocalServingTerminatingEndpoints {
103 useServingTerminatingEndpoints = true
104 localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
105 return ep.IsLocal() && ep.IsServing() && ep.IsTerminating()
106 })
107 }
108
109 if !svcInfo.UsesClusterEndpoints() {
110 allReachableEndpoints = localEndpoints
111 return
112 }
113
114 if !useTopology && !useServingTerminatingEndpoints {
115
116
117
118 allReachableEndpoints = clusterEndpoints
119 return
120 }
121
122
123
124
125 endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
126 for _, ep := range clusterEndpoints {
127 endpointsMap[ep.String()] = ep
128 }
129 for _, ep := range localEndpoints {
130 endpointsMap[ep.String()] = ep
131 }
132 allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
133 for _, ep := range endpointsMap {
134 allReachableEndpoints = append(allReachableEndpoints, ep)
135 }
136
137 return
138 }
139
140
141
142
143
144
145
146
147
148 func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
149 if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) && !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
150 return false
151 }
152
153
154
155 if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
156
157 hintsAnnotation := svcInfo.HintsAnnotation()
158 if hintsAnnotation == "" || hintsAnnotation == "disabled" || hintsAnnotation == "Disabled" {
159 return false
160 }
161 }
162
163 zone, ok := nodeLabels[v1.LabelTopologyZone]
164 if !ok || zone == "" {
165 klog.V(2).InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
166 return false
167 }
168
169 hasEndpointForZone := false
170 for _, endpoint := range endpoints {
171 if !endpoint.IsReady() {
172 continue
173 }
174 if endpoint.ZoneHints().Len() == 0 {
175 klog.V(2).InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint", "endpoint", endpoint)
176 return false
177 }
178
179 if endpoint.ZoneHints().Has(zone) {
180 hasEndpointForZone = true
181 }
182 }
183
184 if !hasEndpointForZone {
185 klog.V(2).InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
186 return false
187 }
188
189 return true
190 }
191
192
193
194 func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
195 zone := nodeLabels[v1.LabelTopologyZone]
196 return endpoint.ZoneHints().Has(zone)
197 }
198
199
200 func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
201 filteredEndpoints := make([]Endpoint, 0, len(endpoints))
202
203 for _, ep := range endpoints {
204 if predicate(ep) {
205 filteredEndpoints = append(filteredEndpoints, ep)
206 }
207 }
208
209 return filteredEndpoints
210 }
211
View as plain text