1 package servicemirror
2
3 import (
4 "context"
5 "fmt"
6
7 consts "github.com/linkerd/linkerd2/pkg/k8s"
8 logging "github.com/sirupsen/logrus"
9 corev1 "k8s.io/api/core/v1"
10 kerrors "k8s.io/apimachinery/pkg/api/errors"
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 "k8s.io/apimachinery/pkg/labels"
13 )
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error {
37 exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name)
38 if err != nil {
39 rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
40 return fmt.Errorf("error retrieving exported service %s/%s: %w", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
41 }
42
43
44
45
46
47
48 if len(exportedService.Spec.Ports) == 0 {
49 rcsw.recorder.Event(exportedService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports")
50 rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name)
51 return nil
52 }
53
54 mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
55 mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName)
56 if err != nil {
57 if !kerrors.IsNotFound(err) {
58 return err
59 }
60
61
62
63 mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints)
64 if err != nil {
65 return err
66 }
67 }
68
69 headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name)
70 headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName)
71 if err != nil {
72 if !kerrors.IsNotFound(err) {
73 return err
74 }
75
76 if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
77 return rcsw.createGatewayEndpoints(ctx, exportedService)
78 }
79
80
81 if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil {
82 rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
83 return err
84 }
85
86 return nil
87 }
88
89
90
91
92
93 if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
94 return nil
95 }
96
97 mirrorEndpoints := headlessMirrorEndpoints.DeepCopy()
98 endpointMirrors := make(map[string]struct{})
99 newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
100 for _, subset := range exportedEndpoints.Subsets {
101 newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
102 for _, address := range subset.Addresses {
103 if address.Hostname == "" {
104 continue
105 }
106
107 endpointMirrorName := rcsw.mirroredResourceName(address.Hostname)
108 endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName)
109 if err != nil {
110 if !kerrors.IsNotFound(err) {
111 return err
112 }
113
114
115 endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
116 if err != nil {
117 return err
118 }
119 }
120
121 endpointMirrors[endpointMirrorName] = struct{}{}
122 newAddresses = append(newAddresses, corev1.EndpointAddress{
123 Hostname: address.Hostname,
124 IP: endpointMirrorService.Spec.ClusterIP,
125 })
126 }
127
128 if len(newAddresses) == 0 {
129 continue
130 }
131
132
133 newSubsets = append(newSubsets, corev1.EndpointSubset{
134 Addresses: newAddresses,
135 Ports: subset.DeepCopy().Ports,
136 })
137 }
138
139 headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name)
140 matchLabels := map[string]string{
141 consts.MirroredHeadlessSvcNameLabel: headlessMirrorName,
142 }
143
144
145 endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
146 if err != nil {
147 return err
148 }
149
150 var errors []error
151 for _, service := range endpointMirrorServices {
152
153
154 if _, found := endpointMirrors[service.Name]; found {
155 continue
156 }
157 err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{})
158 if err != nil {
159 if !kerrors.IsNotFound(err) {
160 errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %w", service.Namespace, service.Name, err))
161 }
162 }
163 }
164
165 if len(errors) > 0 {
166 return RetryableError{errors}
167 }
168
169
170 mirrorEndpoints.Subsets = newSubsets
171 err = rcsw.updateMirrorEndpoints(ctx, mirrorEndpoints)
172 if err != nil {
173 return RetryableError{[]error{err}}
174 }
175
176 return nil
177 }
178
179
180
181
182
183
184
185
186
187
188
189 func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) {
190
191
192
193
194 if len(exportedEndpoints.Subsets) == 0 {
195 return &corev1.Service{}, nil
196 }
197
198 remoteService := exportedService.DeepCopy()
199 serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
200 localServiceName := rcsw.mirroredResourceName(remoteService.Name)
201
202
203 if _, err := rcsw.localAPIClient.NS().Lister().Get(remoteService.Namespace); err != nil {
204 if kerrors.IsNotFound(err) {
205 rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
206 return &corev1.Service{}, nil
207 }
208
209 return nil, RetryableError{[]error{err}}
210 }
211
212 serviceToCreate := &corev1.Service{
213 ObjectMeta: metav1.ObjectMeta{
214 Name: localServiceName,
215 Namespace: remoteService.Namespace,
216 Annotations: rcsw.getMirroredServiceAnnotations(remoteService),
217 Labels: rcsw.getMirroredServiceLabels(remoteService),
218 },
219 Spec: corev1.ServiceSpec{
220 Ports: remapRemoteServicePorts(remoteService.Spec.Ports),
221 },
222 }
223
224 if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) {
225 serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone
226 rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo)
227 } else {
228 rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo)
229 }
230
231 svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{})
232 if err != nil {
233 if !kerrors.IsAlreadyExists(err) {
234
235 return &corev1.Service{}, RetryableError{[]error{err}}
236 }
237 }
238
239 return svc, err
240 }
241
242
243
244
245
246 func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error {
247 exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
248 endpointsHostnames := make(map[string]struct{})
249 subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
250 for _, subset := range exportedEndpoints.Subsets {
251 newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
252 for _, addr := range subset.Addresses {
253 if addr.Hostname == "" {
254 continue
255 }
256
257 endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname)
258 createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
259 if err != nil {
260 rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err)
261 continue
262 }
263
264 endpointsHostnames[addr.Hostname] = struct{}{}
265 newAddresses = append(newAddresses, corev1.EndpointAddress{
266 Hostname: addr.TargetRef.Name,
267 IP: createdService.Spec.ClusterIP,
268 })
269
270 }
271
272 if len(newAddresses) == 0 {
273 continue
274 }
275
276 subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{
277 Addresses: newAddresses,
278 Ports: subset.DeepCopy().Ports,
279 })
280 }
281
282 headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
283 headlessMirrorEndpoints := &corev1.Endpoints{
284 ObjectMeta: metav1.ObjectMeta{
285 Name: headlessMirrorServiceName,
286 Namespace: exportedService.Namespace,
287 Labels: map[string]string{
288 consts.MirroredResourceLabel: "true",
289 consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
290 },
291 Annotations: map[string]string{
292 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
293 },
294 },
295 Subsets: subsetsToCreate,
296 }
297
298 if rcsw.link.GatewayIdentity != "" {
299 headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
300 }
301
302 rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace)
303
304
305
306 _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{})
307 if err != nil {
308 if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{}); svcErr != nil {
309 rcsw.log.Errorf("failed to delete Service %s after Endpoints creation failed: %s", headlessMirrorServiceName, svcErr)
310 }
311 return RetryableError{[]error{err}}
312 }
313
314 return nil
315 }
316
317
318
319
320
321
322
323 func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context.Context, endpointHostname, resourceVersion, endpointMirrorName string, exportedService *corev1.Service) (*corev1.Service, error) {
324 gatewayAddresses, err := rcsw.resolveGatewayAddress()
325 if err != nil {
326 return nil, err
327 }
328
329 endpointMirrorAnnotations := map[string]string{
330 consts.RemoteResourceVersionAnnotation: resourceVersion,
331 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.%s.svc.%s", endpointHostname, exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
332 }
333
334 endpointMirrorLabels := rcsw.getMirroredServiceLabels(nil)
335 mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
336 endpointMirrorLabels[consts.MirroredHeadlessSvcNameLabel] = mirrorServiceName
337
338
339 endpointMirrorService := &corev1.Service{
340 ObjectMeta: metav1.ObjectMeta{
341 Name: endpointMirrorName,
342 Namespace: exportedService.Namespace,
343 Annotations: endpointMirrorAnnotations,
344 Labels: endpointMirrorLabels,
345 },
346 Spec: corev1.ServiceSpec{
347 Ports: remapRemoteServicePorts(exportedService.Spec.Ports),
348 },
349 }
350 endpointMirrorEndpoints := &corev1.Endpoints{
351 ObjectMeta: metav1.ObjectMeta{
352 Name: endpointMirrorService.Name,
353 Namespace: endpointMirrorService.Namespace,
354 Labels: endpointMirrorLabels,
355 Annotations: map[string]string{
356 consts.RemoteServiceFqName: endpointMirrorService.Annotations[consts.RemoteServiceFqName],
357 },
358 },
359 Subsets: []corev1.EndpointSubset{
360 {
361 Addresses: gatewayAddresses,
362 Ports: rcsw.getEndpointsPorts(exportedService),
363 },
364 },
365 }
366
367 if rcsw.link.GatewayIdentity != "" {
368 endpointMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
369 }
370
371 exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
372 endpointMirrorInfo := fmt.Sprintf("%s/%s", endpointMirrorService.Namespace, endpointMirrorName)
373 rcsw.log.Infof("Creating a new endpoint mirror service %s for exported headless service %s", endpointMirrorInfo, exportedServiceInfo)
374 createdService, err := rcsw.localAPIClient.Client.CoreV1().Services(endpointMirrorService.Namespace).Create(ctx, endpointMirrorService, metav1.CreateOptions{})
375 if err != nil {
376 if !kerrors.IsAlreadyExists(err) {
377
378 return createdService, RetryableError{[]error{err}}
379 }
380 }
381
382 rcsw.log.Infof("Creating a new endpoints object for endpoint mirror service %s", endpointMirrorInfo)
383 err = rcsw.createMirrorEndpoints(ctx, endpointMirrorEndpoints)
384 if err != nil {
385 if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(endpointMirrorService.Namespace).Delete(ctx, endpointMirrorName, metav1.DeleteOptions{}); svcErr != nil {
386 rcsw.log.Errorf("Failed to delete service %s after endpoints creation failed: %s", endpointMirrorName, svcErr)
387 }
388 return createdService, RetryableError{[]error{err}}
389 }
390 return createdService, nil
391 }
392
393
394
395
396
397
398
399 func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool {
400 for _, subset := range endpoints.Subsets {
401 for _, addr := range subset.Addresses {
402 if addr.Hostname != "" {
403 return true
404 }
405 }
406
407 for _, addr := range subset.NotReadyAddresses {
408 if addr.Hostname != "" {
409 return true
410 }
411 }
412 }
413 log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name)
414 return false
415 }
416
417
418
419 func isHeadlessEndpoints(ep *corev1.Endpoints, log *logging.Entry) bool {
420 if _, found := ep.Labels[corev1.IsHeadlessService]; !found {
421
422
423 log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService)
424 return false
425 }
426
427 return true
428 }
429
View as plain text