1
16
17 package proxy
18
19 import (
20 "fmt"
21 "reflect"
22 "sort"
23 "strings"
24 "sync"
25
26 v1 "k8s.io/api/core/v1"
27 discovery "k8s.io/api/discovery/v1"
28 "k8s.io/apimachinery/pkg/types"
29 "k8s.io/apimachinery/pkg/util/sets"
30 utilfeature "k8s.io/apiserver/pkg/util/feature"
31 "k8s.io/client-go/tools/events"
32 "k8s.io/klog/v2"
33 "k8s.io/kubernetes/pkg/features"
34 proxyutil "k8s.io/kubernetes/pkg/proxy/util"
35 utilnet "k8s.io/utils/net"
36 )
37
38
39 type EndpointSliceCache struct {
40
41 lock sync.Mutex
42
43
44
45
46
47
48
49 trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker
50
51 makeEndpointInfo makeEndpointFunc
52 hostname string
53 ipFamily v1.IPFamily
54 recorder events.EventRecorder
55 }
56
57
58
59
60 type endpointSliceTracker struct {
61 applied endpointSliceDataByName
62 pending endpointSliceDataByName
63 }
64
65
66
67 type endpointSliceDataByName map[string]*endpointSliceData
68
69
70
71 type endpointSliceData struct {
72 Ports []discovery.EndpointPort
73 Endpoints []*endpointData
74 Remove bool
75 }
76
77
78
79
80 type endpointData struct {
81 Addresses []string
82 NodeName *string
83 Zone *string
84 ZoneHints sets.Set[string]
85
86 Ready bool
87 Serving bool
88 Terminating bool
89 }
90
91
92 func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
93 if makeEndpointInfo == nil {
94 makeEndpointInfo = standardEndpointInfo
95 }
96 return &EndpointSliceCache{
97 trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
98 hostname: hostname,
99 ipFamily: ipFamily,
100 makeEndpointInfo: makeEndpointInfo,
101 recorder: recorder,
102 }
103 }
104
105
106 func newEndpointSliceTracker() *endpointSliceTracker {
107 return &endpointSliceTracker{
108 applied: endpointSliceDataByName{},
109 pending: endpointSliceDataByName{},
110 }
111 }
112
113
114 func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData {
115 esData := &endpointSliceData{
116 Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
117 Endpoints: []*endpointData{},
118 Remove: remove,
119 }
120
121
122 copy(esData.Ports, endpointSlice.Ports)
123 sort.Sort(byPort(esData.Ports))
124
125 if !remove {
126 for _, endpoint := range endpointSlice.Endpoints {
127 epData := &endpointData{
128 Addresses: endpoint.Addresses,
129 Zone: endpoint.Zone,
130 NodeName: endpoint.NodeName,
131
132
133 Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready,
134 Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving,
135 Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating,
136 }
137
138 if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
139 if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
140 epData.ZoneHints = sets.New[string]()
141 for _, zone := range endpoint.Hints.ForZones {
142 epData.ZoneHints.Insert(zone.Name)
143 }
144 }
145 }
146
147 esData.Endpoints = append(esData.Endpoints, epData)
148 }
149
150 sort.Sort(byAddress(esData.Endpoints))
151 }
152
153 return esData
154 }
155
156
157 func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint {
158 return ep
159 }
160
161
162 func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
163 serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
164 if err != nil {
165 klog.ErrorS(err, "Error getting endpoint slice cache keys")
166 return false
167 }
168
169 esData := newEndpointSliceData(endpointSlice, remove)
170
171 cache.lock.Lock()
172 defer cache.lock.Unlock()
173
174 if _, ok := cache.trackerByServiceMap[serviceKey]; !ok {
175 cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
176 }
177
178 changed := cache.esDataChanged(serviceKey, sliceKey, esData)
179
180 if changed {
181 cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esData
182 }
183
184 return changed
185 }
186
187
188
189 func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*endpointsChange {
190 changes := make(map[types.NamespacedName]*endpointsChange)
191
192 cache.lock.Lock()
193 defer cache.lock.Unlock()
194
195 for serviceNN, esTracker := range cache.trackerByServiceMap {
196 if len(esTracker.pending) == 0 {
197 continue
198 }
199
200 change := &endpointsChange{}
201
202 change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
203
204 for name, sliceData := range esTracker.pending {
205 if sliceData.Remove {
206 delete(esTracker.applied, name)
207 } else {
208 esTracker.applied[name] = sliceData
209 }
210
211 delete(esTracker.pending, name)
212 }
213
214 change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
215 changes[serviceNN] = change
216 }
217
218 return changes
219 }
220
221
222
223 type spToEndpointMap map[ServicePortName]map[string]Endpoint
224
225
226 func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) EndpointsMap {
227 endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceDataByName)
228 return endpointsMapFromEndpointInfo(endpointInfoBySP)
229 }
230
231
232 func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceDataByName endpointSliceDataByName) spToEndpointMap {
233 endpointInfoBySP := spToEndpointMap{}
234
235 for _, sliceData := range sliceDataByName {
236 for _, port := range sliceData.Ports {
237 if port.Name == nil {
238 klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name)
239 continue
240 }
241
242 if port.Port == nil || *port.Port == int32(0) {
243 klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", *port.Name)
244 continue
245 }
246
247 svcPortName := ServicePortName{
248 NamespacedName: serviceNN,
249 Port: *port.Name,
250 Protocol: *port.Protocol,
251 }
252
253 endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints)
254 }
255 }
256
257 return endpointInfoBySP
258 }
259
260
261 func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint {
262 if endpointSet == nil {
263 endpointSet = map[string]Endpoint{}
264 }
265
266
267 for _, endpoint := range endpoints {
268 if len(endpoint.Addresses) == 0 {
269 klog.ErrorS(nil, "Ignoring invalid endpoint port with empty address", "endpoint", endpoint)
270 continue
271 }
272
273
274
275 if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
276
277
278 proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
279 continue
280 }
281
282 isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
283
284 endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal,
285 endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints)
286
287
288
289
290 if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal {
291 endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo, svcPortName)
292 }
293 }
294
295 return endpointSet
296 }
297
298 func (cache *EndpointSliceCache) isLocal(hostname string) bool {
299 return len(cache.hostname) > 0 && hostname == cache.hostname
300 }
301
302
303
304 func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName, sliceKey string, esData *endpointSliceData) bool {
305 if _, ok := cache.trackerByServiceMap[serviceKey]; ok {
306 appliedData, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
307 pendingData, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
308
309
310
311 if pendingOk {
312 return !reflect.DeepEqual(esData, pendingData)
313 }
314
315
316
317 if appliedOk {
318 return !reflect.DeepEqual(esData, appliedData)
319 }
320 }
321
322
323
324 if esData.Remove {
325 return false
326 }
327
328
329 return true
330 }
331
332
333
334 func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
335 endpointsMap := EndpointsMap{}
336
337
338 for svcPortName, endpointSet := range endpointInfoBySP {
339 if len(endpointSet) > 0 {
340 endpointsMap[svcPortName] = []Endpoint{}
341 for _, endpointInfo := range endpointSet {
342 endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo)
343
344 }
345
346 sort.Sort(byEndpoint(endpointsMap[svcPortName]))
347
348 klog.V(3).InfoS("Setting endpoints for service port name", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName]))
349 }
350 }
351
352 return endpointsMap
353 }
354
355
356 func formatEndpointsList(endpoints []Endpoint) []string {
357 var formattedList []string
358 for _, ep := range endpoints {
359 formattedList = append(formattedList, ep.String())
360 }
361 return formattedList
362 }
363
364
365 func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.NamespacedName, string, error) {
366 var err error
367 serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
368 if !ok || serviceName == "" {
369 err = fmt.Errorf("no %s label set on endpoint slice: %s", discovery.LabelServiceName, endpointSlice.Name)
370 } else if endpointSlice.Namespace == "" || endpointSlice.Name == "" {
371 err = fmt.Errorf("expected EndpointSlice name and namespace to be set: %v", endpointSlice)
372 }
373 return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
374 }
375
376
377 type byAddress []*endpointData
378
379 func (e byAddress) Len() int {
380 return len(e)
381 }
382 func (e byAddress) Swap(i, j int) {
383 e[i], e[j] = e[j], e[i]
384 }
385 func (e byAddress) Less(i, j int) bool {
386 return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
387 }
388
389
390 type byEndpoint []Endpoint
391
392 func (e byEndpoint) Len() int {
393 return len(e)
394 }
395 func (e byEndpoint) Swap(i, j int) {
396 e[i], e[j] = e[j], e[i]
397 }
398 func (e byEndpoint) Less(i, j int) bool {
399 return e[i].String() < e[j].String()
400 }
401
402
403 type byPort []discovery.EndpointPort
404
405 func (p byPort) Len() int {
406 return len(p)
407 }
408 func (p byPort) Swap(i, j int) {
409 p[i], p[j] = p[j], p[i]
410 }
411 func (p byPort) Less(i, j int) bool {
412 return *p[i].Port < *p[j].Port
413 }
414
View as plain text