1
16
17 package endpointslicemirroring
18
19 import (
20 "fmt"
21 "strings"
22
23 corev1 "k8s.io/api/core/v1"
24 discovery "k8s.io/api/discovery/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/runtime/schema"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/client-go/tools/cache"
29 "k8s.io/client-go/tools/leaderelection/resourcelock"
30 endpointsliceutil "k8s.io/endpointslice/util"
31 "k8s.io/kubernetes/pkg/apis/discovery/validation"
32 netutils "k8s.io/utils/net"
33 )
34
35
36
37 type addrTypePortMapKey string
38
39
40 func newAddrTypePortMapKey(endpointPorts []discovery.EndpointPort, addrType discovery.AddressType) addrTypePortMapKey {
41 pmk := fmt.Sprintf("%s-%s", addrType, endpointsliceutil.NewPortMapKey(endpointPorts))
42 return addrTypePortMapKey(pmk)
43 }
44
45 func (pk addrTypePortMapKey) addressType() discovery.AddressType {
46 if strings.HasPrefix(string(pk), string(discovery.AddressTypeIPv6)) {
47 return discovery.AddressTypeIPv6
48 }
49 return discovery.AddressTypeIPv4
50 }
51
52 func getAddressType(address string) *discovery.AddressType {
53 ip := netutils.ParseIPSloppy(address)
54 if ip == nil {
55 return nil
56 }
57 addressType := discovery.AddressTypeIPv4
58 if ip.To4() == nil {
59 addressType = discovery.AddressTypeIPv6
60 }
61 return &addressType
62 }
63
64
65
66 func newEndpointSlice(endpoints *corev1.Endpoints, ports []discovery.EndpointPort, addrType discovery.AddressType, sliceName string) *discovery.EndpointSlice {
67 gvk := schema.GroupVersionKind{Version: "v1", Kind: "Endpoints"}
68 ownerRef := metav1.NewControllerRef(endpoints, gvk)
69 epSlice := &discovery.EndpointSlice{
70 ObjectMeta: metav1.ObjectMeta{
71 Labels: map[string]string{},
72 Annotations: map[string]string{},
73 OwnerReferences: []metav1.OwnerReference{*ownerRef},
74 Namespace: endpoints.Namespace,
75 },
76 Ports: ports,
77 AddressType: addrType,
78 Endpoints: []discovery.Endpoint{},
79 }
80
81
82 for label, val := range endpoints.Labels {
83 epSlice.Labels[label] = val
84 }
85
86
87 epSlice.Labels[discovery.LabelServiceName] = endpoints.Name
88 epSlice.Labels[discovery.LabelManagedBy] = controllerName
89
90
91 for annotation, val := range endpoints.Annotations {
92 if annotation == corev1.EndpointsLastChangeTriggerTime || annotation == corev1.LastAppliedConfigAnnotation {
93 continue
94 }
95 epSlice.Annotations[annotation] = val
96 }
97
98 if sliceName == "" {
99 epSlice.GenerateName = getEndpointSlicePrefix(endpoints.Name)
100 } else {
101 epSlice.Name = sliceName
102 }
103
104 return epSlice
105 }
106
107
108 func getEndpointSlicePrefix(serviceName string) string {
109
110 prefix := fmt.Sprintf("%s-", serviceName)
111 if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 {
112 prefix = serviceName
113 }
114 return prefix
115 }
116
117
118
119 func addressToEndpoint(address corev1.EndpointAddress, ready bool) *discovery.Endpoint {
120 endpoint := &discovery.Endpoint{
121 Addresses: []string{address.IP},
122 Conditions: discovery.EndpointConditions{
123 Ready: &ready,
124 },
125 TargetRef: address.TargetRef,
126 }
127
128 if address.NodeName != nil {
129 endpoint.NodeName = address.NodeName
130 }
131 if address.Hostname != "" {
132 endpoint.Hostname = &address.Hostname
133 }
134
135 return endpoint
136 }
137
138
139
140 func epPortsToEpsPorts(epPorts []corev1.EndpointPort) []discovery.EndpointPort {
141 epsPorts := []discovery.EndpointPort{}
142 for _, epPort := range epPorts {
143 epp := epPort.DeepCopy()
144 epsPorts = append(epsPorts, discovery.EndpointPort{
145 Name: &epp.Name,
146 Port: &epp.Port,
147 Protocol: &epp.Protocol,
148 AppProtocol: epp.AppProtocol,
149 })
150 }
151 return epsPorts
152 }
153
154
155
156 func getServiceFromDeleteAction(obj interface{}) *corev1.Service {
157 if service, ok := obj.(*corev1.Service); ok {
158 return service
159 }
160
161
162 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
163 if !ok {
164 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
165 return nil
166 }
167 service, ok := tombstone.Obj.(*corev1.Service)
168 if !ok {
169 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Service resource: %#v", obj))
170 return nil
171 }
172 return service
173 }
174
175
176
177 func getEndpointsFromDeleteAction(obj interface{}) *corev1.Endpoints {
178 if endpoints, ok := obj.(*corev1.Endpoints); ok {
179 return endpoints
180 }
181
182
183 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
184 if !ok {
185 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
186 return nil
187 }
188 endpoints, ok := tombstone.Obj.(*corev1.Endpoints)
189 if !ok {
190 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not an Endpoints resource: %#v", obj))
191 return nil
192 }
193 return endpoints
194 }
195
196
197 func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
198 if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
199 return endpointSlice
200 }
201
202
203 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
204 if !ok {
205 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
206 return nil
207 }
208 endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
209 if !ok {
210 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not an EndpointSlice resource: %#v", obj))
211 return nil
212 }
213 return endpointSlice
214 }
215
216
217
218 func endpointsControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) {
219 if endpointSlice == nil {
220 return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()")
221 }
222 serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
223 if !ok || serviceName == "" {
224 return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName)
225 }
226 return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
227 }
228
229
230
231 func skipMirror(labels map[string]string) bool {
232 skipMirror, _ := labels[discovery.LabelSkipMirror]
233 return skipMirror == "true"
234 }
235
236
237
238 func hasLeaderElection(annotations map[string]string) bool {
239 _, ok := annotations[resourcelock.LeaderElectionRecordAnnotationKey]
240 return ok
241 }
242
243
244
245 func cloneAndRemoveKeys(a map[string]string, keys ...string) map[string]string {
246 if len(keys) == 0 {
247
248 return a
249 }
250
251 newMap := map[string]string{}
252 for k, v := range a {
253 newMap[k] = v
254 }
255
256 for _, key := range keys {
257 delete(newMap, key)
258 }
259 return newMap
260 }
261
262
263
264 func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool {
265 return managedByController(endpointSlice1) != managedByController(endpointSlice2)
266 }
267
268
269
270 func managedByController(endpointSlice *discovery.EndpointSlice) bool {
271 managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy]
272 return managedBy == controllerName
273 }
274
View as plain text