1
16
17 package proxy
18
19 import (
20 "sync"
21 "time"
22
23 v1 "k8s.io/api/core/v1"
24 discovery "k8s.io/api/discovery/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/apimachinery/pkg/util/sets"
27 "k8s.io/client-go/tools/events"
28 "k8s.io/klog/v2"
29 "k8s.io/kubernetes/pkg/proxy/metrics"
30 )
31
32 var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
33 discovery.AddressTypeIPv4,
34 discovery.AddressTypeIPv6,
35 )
36
37
38
39 type EndpointsChangeTracker struct {
40
41 lock sync.Mutex
42
43
44
45
46 processEndpointsMapChange processEndpointsMapChangeFunc
47
48
49 endpointSliceCache *EndpointSliceCache
50
51
52
53
54 lastChangeTriggerTimes map[types.NamespacedName][]time.Time
55
56
57
58 trackerStartTime time.Time
59 }
60
61 type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
62 type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
63
64
65 func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
66 return &EndpointsChangeTracker{
67 lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
68 trackerStartTime: time.Now(),
69 processEndpointsMapChange: processEndpointsMapChange,
70 endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
71 }
72 }
73
74
75
76
77
78 func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
79 if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
80 klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
81 return false
82 }
83
84
85 if endpointSlice == nil {
86 klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
87 return false
88 }
89
90 namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
91 if err != nil {
92 klog.InfoS("Error getting endpoint slice cache keys", "err", err)
93 return false
94 }
95
96 metrics.EndpointChangesTotal.Inc()
97
98 ect.lock.Lock()
99 defer ect.lock.Unlock()
100
101 changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
102
103 if changeNeeded {
104 metrics.EndpointChangesPending.Inc()
105
106
107
108
109
110 if removeSlice {
111 delete(ect.lastChangeTriggerTimes, namespacedName)
112 } else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
113 ect.lastChangeTriggerTimes[namespacedName] =
114 append(ect.lastChangeTriggerTimes[namespacedName], t)
115 }
116 }
117
118 return changeNeeded
119 }
120
121
122
123 func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
124 metrics.EndpointChangesPending.Set(0)
125
126 return ect.endpointSliceCache.checkoutChanges()
127 }
128
129
130
131 func (ect *EndpointsChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
132 ect.lock.Lock()
133 defer ect.lock.Unlock()
134
135 for k, v := range ect.lastChangeTriggerTimes {
136 prev, ok := (*lastChangeTriggerTimes)[k]
137 if !ok {
138 (*lastChangeTriggerTimes)[k] = v
139 } else {
140 (*lastChangeTriggerTimes)[k] = append(prev, v...)
141 }
142 }
143 ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
144 }
145
146
147
148
149
150 func getLastChangeTriggerTime(annotations map[string]string) time.Time {
151
152 if _, ok := annotations[v1.EndpointsLastChangeTriggerTime]; !ok {
153
154
155
156 return time.Time{}
157 }
158 val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime])
159 if err != nil {
160 klog.ErrorS(err, "Error while parsing EndpointsLastChangeTriggerTimeAnnotation",
161 "value", annotations[v1.EndpointsLastChangeTriggerTime])
162
163 }
164 return val
165 }
166
167
168
169
170
171 type endpointsChange struct {
172 previous EndpointsMap
173 current EndpointsMap
174 }
175
176
177 type UpdateEndpointsMapResult struct {
178
179
180 UpdatedServices sets.Set[types.NamespacedName]
181
182
183
184
185 DeletedUDPEndpoints []ServiceEndpoint
186
187
188
189
190
191 NewlyActiveUDPServices []ServicePortName
192
193
194
195
196 LastChangeTriggerTimes map[types.NamespacedName][]time.Time
197 }
198
199
200 type EndpointsMap map[ServicePortName][]Endpoint
201
202
203
204
205 func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
206 result := UpdateEndpointsMapResult{
207 UpdatedServices: sets.New[types.NamespacedName](),
208 DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
209 NewlyActiveUDPServices: make([]ServicePortName, 0),
210 LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
211 }
212 if ect == nil {
213 return result
214 }
215
216 changes := ect.checkoutChanges()
217 for nn, change := range changes {
218 if ect.processEndpointsMapChange != nil {
219 ect.processEndpointsMapChange(change.previous, change.current)
220 }
221 result.UpdatedServices.Insert(nn)
222
223 em.unmerge(change.previous)
224 em.merge(change.current)
225 detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
226 }
227 ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes)
228
229 return result
230 }
231
232
233 func (em EndpointsMap) merge(other EndpointsMap) {
234 for svcPortName := range other {
235 em[svcPortName] = other[svcPortName]
236 }
237 }
238
239
240 func (em EndpointsMap) unmerge(other EndpointsMap) {
241 for svcPortName := range other {
242 delete(em, svcPortName)
243 }
244 }
245
246
247 func (em EndpointsMap) getLocalReadyEndpointIPs() map[types.NamespacedName]sets.Set[string] {
248 localIPs := make(map[types.NamespacedName]sets.Set[string])
249 for svcPortName, epList := range em {
250 for _, ep := range epList {
251
252
253 if !ep.IsReady() {
254 continue
255 }
256
257 if ep.IsLocal() {
258 nsn := svcPortName.NamespacedName
259 if localIPs[nsn] == nil {
260 localIPs[nsn] = sets.New[string]()
261 }
262 localIPs[nsn].Insert(ep.IP())
263 }
264 }
265 }
266 return localIPs
267 }
268
269
270
271 func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
272
273
274
275
276
277
278
279
280 eps := make(map[types.NamespacedName]int)
281 localIPs := em.getLocalReadyEndpointIPs()
282 for nsn, ips := range localIPs {
283 eps[nsn] = len(ips)
284 }
285 return eps
286 }
287
288
289
290 func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
291
292
293
294 for svcPortName, epList := range oldEndpointsMap {
295 if svcPortName.Protocol != v1.ProtocolUDP {
296 continue
297 }
298
299 for _, ep := range epList {
300
301
302 if !ep.IsServing() {
303 continue
304 }
305
306 deleted := true
307
308
309
310 for i := range newEndpointsMap[svcPortName] {
311 if newEndpointsMap[svcPortName][i].String() == ep.String() {
312 deleted = false
313 break
314 }
315 }
316 if deleted {
317 klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
318 *deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
319 }
320 }
321 }
322
323
324
325
326
327 for svcPortName, epList := range newEndpointsMap {
328 if svcPortName.Protocol != v1.ProtocolUDP {
329 continue
330 }
331
332 epServing := 0
333 for _, ep := range epList {
334 if ep.IsServing() {
335 epServing++
336 }
337 }
338
339 oldEpServing := 0
340 for _, ep := range oldEndpointsMap[svcPortName] {
341 if ep.IsServing() {
342 oldEpServing++
343 }
344 }
345
346 if epServing > 0 && oldEpServing == 0 {
347 *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
348 }
349 }
350 }
351
View as plain text