1
16
17 package endpointslicemirroring
18
19 import (
20 "context"
21 "fmt"
22
23 corev1 "k8s.io/api/core/v1"
24 discovery "k8s.io/api/discovery/v1"
25 apiequality "k8s.io/apimachinery/pkg/api/equality"
26 "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/types"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/client-go/tools/record"
31 endpointsliceutil "k8s.io/endpointslice/util"
32 "k8s.io/klog/v2"
33 endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
34 "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
35 )
36
37
38
39 type reconciler struct {
40 client clientset.Interface
41
42
43
44
45 endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
46
47
48
49 eventRecorder record.EventRecorder
50
51
52
53
54 maxEndpointsPerSubset int32
55
56
57
58 metricsCache *metrics.Cache
59 }
60
61
62
63
64 func (r *reconciler) reconcile(logger klog.Logger, endpoints *corev1.Endpoints, existingSlices []*discovery.EndpointSlice) error {
65
66 d := newDesiredCalc()
67
68 numInvalidAddresses := 0
69 addressesSkipped := 0
70
71
72 subsets := endpointsv1.RepackSubsets(endpoints.Subsets)
73 for _, subset := range subsets {
74 multiKey := d.initPorts(subset.Ports)
75
76 totalAddresses := len(subset.Addresses) + len(subset.NotReadyAddresses)
77 totalAddressesAdded := 0
78
79 for _, address := range subset.Addresses {
80
81
82
83 if totalAddressesAdded >= int(r.maxEndpointsPerSubset) {
84 break
85 }
86 if ok := d.addAddress(address, multiKey, true); ok {
87 totalAddressesAdded++
88 } else {
89 numInvalidAddresses++
90 logger.Info("Address in Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice", "endpoints", klog.KObj(endpoints), "IP", address.IP)
91 }
92 }
93
94 for _, address := range subset.NotReadyAddresses {
95
96
97
98 if totalAddressesAdded >= int(r.maxEndpointsPerSubset) {
99 break
100 }
101 if ok := d.addAddress(address, multiKey, false); ok {
102 totalAddressesAdded++
103 } else {
104 numInvalidAddresses++
105 logger.Info("Address in Endpoints is not a valid IP, it will not be mirrored to an EndpointSlice", "endpoints", klog.KObj(endpoints), "IP", address.IP)
106 }
107 }
108
109 addressesSkipped += totalAddresses - totalAddressesAdded
110 }
111
112
113
114 metrics.AddressesSkippedPerSync.WithLabelValues().Observe(float64(addressesSkipped))
115
116
117
118 if numInvalidAddresses > 0 {
119 r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, InvalidIPAddress,
120 "Skipped %d invalid IP addresses when mirroring to EndpointSlices", numInvalidAddresses)
121 }
122
123
124
125 if addressesSkipped > numInvalidAddresses {
126 logger.Info("Addresses in Endpoints were skipped due to exceeding MaxEndpointsPerSubset", "skippedAddresses", addressesSkipped, "endpoints", klog.KObj(endpoints))
127 r.eventRecorder.Eventf(endpoints, corev1.EventTypeWarning, TooManyAddressesToMirror,
128 "A max of %d addresses can be mirrored to EndpointSlices per Endpoints subset. %d addresses were skipped", r.maxEndpointsPerSubset, addressesSkipped)
129 }
130
131
132 existingSlicesByKey := endpointSlicesByKey(existingSlices)
133
134
135 epMetrics := metrics.NewEndpointPortCache()
136 totals := totalsByAction{}
137 slices := slicesByAction{}
138
139 for portKey, desiredEndpoints := range d.endpointsByKey {
140 numEndpoints := len(desiredEndpoints)
141 pmSlices, pmTotals := r.reconcileByPortMapping(
142 endpoints, existingSlicesByKey[portKey], desiredEndpoints, d.portsByKey[portKey], portKey.addressType())
143
144 slices.append(pmSlices)
145 totals.add(pmTotals)
146
147 epMetrics.Set(endpointsliceutil.PortMapKey(portKey), metrics.EfficiencyInfo{
148 Endpoints: numEndpoints,
149 Slices: len(existingSlicesByKey[portKey]) + len(pmSlices.toCreate) - len(pmSlices.toDelete),
150 })
151 }
152
153
154
155 for portKey, existingSlices := range existingSlicesByKey {
156 if _, ok := d.endpointsByKey[portKey]; !ok {
157 for _, existingSlice := range existingSlices {
158 slices.toDelete = append(slices.toDelete, existingSlice)
159 }
160 }
161 }
162
163 metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totals.added))
164 metrics.EndpointsUpdatedPerSync.WithLabelValues().Observe(float64(totals.updated))
165 metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totals.removed))
166
167 endpointsNN := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
168 r.metricsCache.UpdateEndpointPortCache(endpointsNN, epMetrics)
169
170 return r.finalize(endpoints, slices)
171 }
172
173
174
175
176 func (r *reconciler) reconcileByPortMapping(
177 endpoints *corev1.Endpoints,
178 existingSlices []*discovery.EndpointSlice,
179 desiredSet endpointsliceutil.EndpointSet,
180 endpointPorts []discovery.EndpointPort,
181 addressType discovery.AddressType,
182 ) (slicesByAction, totalsByAction) {
183 slices := slicesByAction{}
184 totals := totalsByAction{}
185
186
187
188 if desiredSet.Len() == 0 {
189 slices.toDelete = existingSlices
190 for _, epSlice := range existingSlices {
191 totals.removed += len(epSlice.Endpoints)
192 }
193 return slices, totals
194 }
195
196 if len(existingSlices) == 0 {
197
198 totals.added = desiredSet.Len()
199 } else {
200
201 slices.toDelete = existingSlices[1:]
202
203
204 compareAnnotations := cloneAndRemoveKeys(endpoints.Annotations, corev1.EndpointsLastChangeTriggerTime, corev1.LastAppliedConfigAnnotation)
205 compareLabels := cloneAndRemoveKeys(existingSlices[0].Labels, discovery.LabelManagedBy, discovery.LabelServiceName)
206
207 totals = totalChanges(existingSlices[0], desiredSet)
208 if totals.added == 0 && totals.updated == 0 && totals.removed == 0 &&
209 apiequality.Semantic.DeepEqual(endpoints.Labels, compareLabels) &&
210 apiequality.Semantic.DeepEqual(compareAnnotations, existingSlices[0].Annotations) {
211 return slices, totals
212 }
213 }
214
215
216 var sliceName string
217 if len(existingSlices) > 0 {
218 sliceName = existingSlices[0].Name
219 }
220 newSlice := newEndpointSlice(endpoints, endpointPorts, addressType, sliceName)
221 for desiredSet.Len() > 0 && len(newSlice.Endpoints) < int(r.maxEndpointsPerSubset) {
222 endpoint, _ := desiredSet.PopAny()
223 newSlice.Endpoints = append(newSlice.Endpoints, *endpoint)
224 }
225
226 if newSlice.Name != "" {
227 slices.toUpdate = []*discovery.EndpointSlice{newSlice}
228 } else {
229 slices.toCreate = []*discovery.EndpointSlice{newSlice}
230 }
231
232 return slices, totals
233 }
234
235
236 func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction) error {
237
238
239
240 recycleSlices(&slices)
241
242 epsClient := r.client.DiscoveryV1().EndpointSlices(endpoints.Namespace)
243
244
245
246 if endpoints.DeletionTimestamp == nil {
247 for _, endpointSlice := range slices.toCreate {
248 createdSlice, err := epsClient.Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
249 if err != nil {
250
251 if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
252 return nil
253 }
254 return fmt.Errorf("failed to create EndpointSlice for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err)
255 }
256 r.endpointSliceTracker.Update(createdSlice)
257 metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
258 }
259 }
260
261 for _, endpointSlice := range slices.toUpdate {
262 updatedSlice, err := epsClient.Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
263 if err != nil {
264 return fmt.Errorf("failed to update %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
265 }
266 r.endpointSliceTracker.Update(updatedSlice)
267 metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
268 }
269
270 for _, endpointSlice := range slices.toDelete {
271 err := epsClient.Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
272 if err != nil {
273 return fmt.Errorf("failed to delete %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err)
274 }
275 r.endpointSliceTracker.ExpectDeletion(endpointSlice)
276 metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
277 }
278
279 return nil
280 }
281
282
283
284 func (r *reconciler) deleteEndpoints(namespace, name string, endpointSlices []*discovery.EndpointSlice) error {
285 r.metricsCache.DeleteEndpoints(types.NamespacedName{Namespace: namespace, Name: name})
286 var errs []error
287 for _, endpointSlice := range endpointSlices {
288 err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
289 if err != nil {
290 errs = append(errs, err)
291 }
292 }
293 if len(errs) > 0 {
294 return fmt.Errorf("error(s) deleting %d/%d EndpointSlices for %s/%s Endpoints, including: %s", len(errs), len(endpointSlices), namespace, name, errs[0])
295 }
296 return nil
297 }
298
299
300
301 func endpointSlicesByKey(existingSlices []*discovery.EndpointSlice) map[addrTypePortMapKey][]*discovery.EndpointSlice {
302 slicesByKey := map[addrTypePortMapKey][]*discovery.EndpointSlice{}
303 for _, existingSlice := range existingSlices {
304 epKey := newAddrTypePortMapKey(existingSlice.Ports, existingSlice.AddressType)
305 slicesByKey[epKey] = append(slicesByKey[epKey], existingSlice)
306 }
307 return slicesByKey
308 }
309
310
311
312 func totalChanges(existingSlice *discovery.EndpointSlice, desiredSet endpointsliceutil.EndpointSet) totalsByAction {
313 totals := totalsByAction{}
314 existingMatches := 0
315
316 for _, endpoint := range existingSlice.Endpoints {
317 got := desiredSet.Get(&endpoint)
318 if got == nil {
319
320 totals.removed++
321 } else {
322 existingMatches++
323
324
325
326 if !endpointsliceutil.EndpointsEqualBeyondHash(got, &endpoint) {
327 totals.updated++
328 }
329 }
330 }
331
332
333
334 totals.added = desiredSet.Len() - existingMatches
335 return totals
336 }
337
View as plain text