1 package destination
2
3 import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "net"
8 "strconv"
9 "strings"
10
11 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
12 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
13 sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
14 "github.com/linkerd/linkerd2/controller/k8s"
15 labels "github.com/linkerd/linkerd2/pkg/k8s"
16 "github.com/linkerd/linkerd2/pkg/prometheus"
17 "github.com/linkerd/linkerd2/pkg/util"
18 logging "github.com/sirupsen/logrus"
19 "google.golang.org/grpc"
20 "google.golang.org/grpc/codes"
21 "google.golang.org/grpc/peer"
22 "google.golang.org/grpc/status"
23 corev1 "k8s.io/api/core/v1"
24 kerrors "k8s.io/apimachinery/pkg/api/errors"
25 )
26
27 type (
28 Config struct {
29 ControllerNS,
30 IdentityTrustDomain,
31 ClusterDomain string
32
33 EnableH2Upgrade,
34 EnableEndpointSlices,
35 EnableIPv6,
36 ExtEndpointZoneWeights bool
37
38 MeshedHttp2ClientParams *pb.Http2ClientParams
39
40 DefaultOpaquePorts map[uint32]struct{}
41 }
42
43 server struct {
44 pb.UnimplementedDestinationServer
45
46 config Config
47
48 workloads *watcher.WorkloadWatcher
49 endpoints *watcher.EndpointsWatcher
50 opaquePorts *watcher.OpaquePortsWatcher
51 profiles *watcher.ProfileWatcher
52 clusterStore *watcher.ClusterStore
53
54 k8sAPI *k8s.API
55 metadataAPI *k8s.MetadataAPI
56 log *logging.Entry
57 shutdown <-chan struct{}
58 }
59 )
60
61
62
63
64
65
66
67
68
69
70
71
72
73 func NewServer(
74 addr string,
75 config Config,
76 k8sAPI *k8s.API,
77 metadataAPI *k8s.MetadataAPI,
78 clusterStore *watcher.ClusterStore,
79 shutdown <-chan struct{},
80 ) (*grpc.Server, error) {
81 log := logging.WithFields(logging.Fields{
82 "addr": addr,
83 "component": "server",
84 })
85
86
87 err := watcher.InitializeIndexers(k8sAPI)
88 if err != nil {
89 return nil, err
90 }
91
92 workloads, err := watcher.NewWorkloadWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, config.DefaultOpaquePorts)
93 if err != nil {
94 return nil, err
95 }
96 endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, "local")
97 if err != nil {
98 return nil, err
99 }
100 opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, config.DefaultOpaquePorts)
101 if err != nil {
102 return nil, err
103 }
104 profiles, err := watcher.NewProfileWatcher(k8sAPI, log)
105 if err != nil {
106 return nil, err
107 }
108
109 srv := server{
110 pb.UnimplementedDestinationServer{},
111 config,
112 workloads,
113 endpoints,
114 opaquePorts,
115 profiles,
116 clusterStore,
117 k8sAPI,
118 metadataAPI,
119 log,
120 shutdown,
121 }
122
123 s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
124
125 pb.RegisterDestinationServer(s, &srv)
126 return s, nil
127 }
128
129 func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) error {
130 log := s.log
131
132 client, _ := peer.FromContext(stream.Context())
133 if client != nil {
134 log = log.WithField("remote", client.Addr)
135 }
136
137 var token contextToken
138 if dest.GetContextToken() != "" {
139 log.Debugf("Dest token: %q", dest.GetContextToken())
140 token = s.parseContextToken(dest.GetContextToken())
141 log = log.WithFields(logging.Fields{"context-pod": token.Pod, "context-ns": token.Ns})
142 }
143
144 log.Debugf("Get %s", dest.GetPath())
145
146 streamEnd := make(chan struct{})
147
148 host, port, err := getHostAndPort(dest.GetPath())
149 if err != nil {
150 log.Debugf("Invalid service %s", dest.GetPath())
151 return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
152 }
153
154
155 if ip := net.ParseIP(host); ip != nil {
156 return status.Errorf(codes.InvalidArgument, "IP queries not supported by Get API: host=%s", host)
157 }
158
159 service, instanceID, err := parseK8sServiceName(host, s.config.ClusterDomain)
160 if err != nil {
161 log.Debugf("Invalid service %s", dest.GetPath())
162 return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
163 }
164
165 svc, err := s.k8sAPI.Svc().Lister().Services(service.Namespace).Get(service.Name)
166 if err != nil {
167 if kerrors.IsNotFound(err) {
168 log.Debugf("Service not found %s", service)
169 return status.Errorf(codes.NotFound, "Service %s.%s not found", service.Name, service.Namespace)
170 }
171 log.Debugf("Failed to get service %s: %v", service, err)
172 return status.Errorf(codes.Internal, "Failed to get service %s", dest.GetPath())
173 }
174
175 if cluster, found := svc.Labels[labels.RemoteDiscoveryLabel]; found {
176
177 remoteSvc, found := svc.Labels[labels.RemoteServiceLabel]
178 if !found {
179 log.Debugf("Remote discovery service missing remote service name %s", service)
180 return status.Errorf(codes.FailedPrecondition, "Remote discovery service missing remote service name %s", dest.GetPath())
181 }
182 remoteWatcher, remoteConfig, found := s.clusterStore.Get(cluster)
183 if !found {
184 log.Errorf("Failed to get remote cluster %s", cluster)
185 return status.Errorf(codes.NotFound, "Remote cluster not found: %s", cluster)
186 }
187 translator := newEndpointTranslator(
188 s.config.ControllerNS,
189 remoteConfig.TrustDomain,
190 s.config.EnableH2Upgrade,
191 false,
192 s.config.EnableIPv6,
193 s.config.ExtEndpointZoneWeights,
194 s.config.MeshedHttp2ClientParams,
195 fmt.Sprintf("%s.%s.svc.%s:%d", remoteSvc, service.Namespace, remoteConfig.ClusterDomain, port),
196 token.NodeName,
197 s.config.DefaultOpaquePorts,
198 s.metadataAPI,
199 stream,
200 streamEnd,
201 log,
202 )
203 translator.Start()
204 defer translator.Stop()
205
206 err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator)
207 if err != nil {
208 var ise watcher.InvalidService
209 if errors.As(err, &ise) {
210 log.Debugf("Invalid remote discovery service %s", dest.GetPath())
211 return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
212 }
213 log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", dest.GetPath(), cluster, err)
214 return err
215 }
216 defer remoteWatcher.Unsubscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator)
217
218 } else {
219
220 translator := newEndpointTranslator(
221 s.config.ControllerNS,
222 s.config.IdentityTrustDomain,
223 s.config.EnableH2Upgrade,
224 true,
225 s.config.EnableIPv6,
226 s.config.ExtEndpointZoneWeights,
227 s.config.MeshedHttp2ClientParams,
228 dest.GetPath(),
229 token.NodeName,
230 s.config.DefaultOpaquePorts,
231 s.metadataAPI,
232 stream,
233 streamEnd,
234 log,
235 )
236 translator.Start()
237 defer translator.Stop()
238
239 err = s.endpoints.Subscribe(service, port, instanceID, translator)
240 if err != nil {
241 var ise watcher.InvalidService
242 if errors.As(err, &ise) {
243 log.Debugf("Invalid service %s", dest.GetPath())
244 return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
245 }
246 log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err)
247 return err
248 }
249 defer s.endpoints.Unsubscribe(service, port, instanceID, translator)
250 }
251
252 select {
253 case <-s.shutdown:
254 case <-stream.Context().Done():
255 log.Debugf("Get %s cancelled", dest.GetPath())
256 case <-streamEnd:
257 log.Errorf("Get %s stream aborted", dest.GetPath())
258 }
259
260 return nil
261 }
262
263 func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetProfileServer) error {
264 log := s.log
265
266 client, _ := peer.FromContext(stream.Context())
267 if client != nil {
268 log = log.WithField("remote", client.Addr)
269 }
270
271 var token contextToken
272 if dest.GetContextToken() != "" {
273 log.Debugf("Dest token: %q", dest.GetContextToken())
274 token = s.parseContextToken(dest.GetContextToken())
275 log = log.WithFields(logging.Fields{"context-pod": token.Pod, "context-ns": token.Ns})
276 }
277
278 log.Debugf("Getting profile for %s", dest.GetPath())
279
280
281 host, port, err := getHostAndPort(dest.GetPath())
282 if err != nil {
283 log.Debugf("Invalid address %q", dest.GetPath())
284 return status.Errorf(codes.InvalidArgument, "invalid authority: %q: %q", dest.GetPath(), err)
285 }
286
287 if ip := net.ParseIP(host); ip != nil {
288 err = s.getProfileByIP(token, ip, port, log, stream)
289 if err != nil {
290 var ise watcher.InvalidService
291 if errors.As(err, &ise) {
292 log.Debugf("Invalid service %s", dest.GetPath())
293 return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
294 }
295 log.Errorf("Failed to subscribe to profile by ip %q: %q", dest.GetPath(), err)
296 }
297 return err
298 }
299
300 err = s.getProfileByName(token, host, port, log, stream)
301 if err != nil {
302 var ise watcher.InvalidService
303 if errors.As(err, &ise) {
304 log.Debugf("Invalid service %s", dest.GetPath())
305 return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
306 }
307 log.Errorf("Failed to subscribe to profile by name %q: %q", dest.GetPath(), err)
308 }
309 return err
310 }
311
312 func (s *server) getProfileByIP(
313 token contextToken,
314 ip net.IP,
315 port uint32,
316 log *logging.Entry,
317 stream pb.Destination_GetProfileServer,
318 ) error {
319
320 svcID, err := getSvcID(s.k8sAPI, ip.String(), s.log)
321 if err != nil {
322 return err
323 }
324
325 if svcID == nil {
326 return s.subscribeToEndpointProfile(nil, "", ip.String(), port, log, stream)
327 }
328
329 fqn := fmt.Sprintf("%s.%s.svc.%s", svcID.Name, svcID.Namespace, s.config.ClusterDomain)
330 return s.subscribeToServiceProfile(*svcID, token, fqn, port, log, stream)
331 }
332
333 func (s *server) getProfileByName(
334 token contextToken,
335 host string,
336 port uint32,
337 log *logging.Entry,
338 stream pb.Destination_GetProfileServer,
339 ) error {
340 service, hostname, err := parseK8sServiceName(host, s.config.ClusterDomain)
341 if err != nil {
342 s.log.Debugf("Invalid service %s", host)
343 return status.Errorf(codes.InvalidArgument, "invalid service %q: %q", host, err)
344 }
345
346
347
348
349 if hostname != "" {
350 return s.subscribeToEndpointProfile(&service, hostname, "", port, log, stream)
351 }
352
353 return s.subscribeToServiceProfile(service, token, host, port, log, stream)
354 }
355
356
357
358
359 func (s *server) subscribeToServiceProfile(
360 service watcher.ID,
361 token contextToken,
362 fqn string,
363 port uint32,
364 log *logging.Entry,
365 stream pb.Destination_GetProfileServer,
366 ) error {
367 log = log.
368 WithField("ns", service.Namespace).
369 WithField("svc", service.Name).
370 WithField("port", port)
371
372 canceled := stream.Context().Done()
373 streamEnd := make(chan struct{})
374
375
376
377
378 translator := newProfileTranslator(stream, log, fqn, port, streamEnd)
379 translator.Start()
380 defer translator.Stop()
381
382
383
384
385 opaquePortsAdaptor := newOpaquePortsAdaptor(translator)
386
387
388
389 err := s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
390 if err != nil {
391 log.Warnf("Failed to subscribe to service updates for %s: %s", service, err)
392 return err
393 }
394 defer s.opaquePorts.Unsubscribe(service, opaquePortsAdaptor)
395
396
397
398
399 dup := newDedupProfileListener(opaquePortsAdaptor, log)
400 defaultProfile := sp.ServiceProfile{}
401 listener := newDefaultProfileListener(&defaultProfile, dup, log)
402
403
404
405
406 if token.Ns == "" {
407 return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log, streamEnd)
408 }
409 return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log, streamEnd)
410 }
411
412
413
414
415
416
417
418 func (s *server) subscribeToServicesWithContext(
419 fqn string,
420 token contextToken,
421 listener watcher.ProfileUpdateListener,
422 canceled <-chan struct{},
423 log *logging.Entry,
424 streamEnd <-chan struct{},
425 ) error {
426
427
428
429
430
431 primary, backup := newFallbackProfileListener(listener, log)
432
433
434
435 backupID, err := profileID(fqn, contextToken{}, s.config.ClusterDomain)
436 if err != nil {
437 log.Debug("Invalid service")
438 return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
439 }
440 err = s.profiles.Subscribe(backupID, backup)
441 if err != nil {
442 log.Warnf("Failed to subscribe to profile: %s", err)
443 return err
444 }
445 defer s.profiles.Unsubscribe(backupID, backup)
446
447 primaryID, err := profileID(fqn, token, s.config.ClusterDomain)
448 if err != nil {
449 log.Debug("Invalid service")
450 return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
451 }
452 err = s.profiles.Subscribe(primaryID, primary)
453 if err != nil {
454 log.Warnf("Failed to subscribe to profile: %s", err)
455 return err
456 }
457 defer s.profiles.Unsubscribe(primaryID, primary)
458
459 select {
460 case <-s.shutdown:
461 case <-canceled:
462 log.Debugf("GetProfile %s cancelled", fqn)
463 case <-streamEnd:
464 log.Errorf("GetProfile %s stream aborted", fqn)
465 }
466 return nil
467 }
468
469
470
471 func (s *server) subscribeToServiceWithoutContext(
472 fqn string,
473 listener watcher.ProfileUpdateListener,
474 canceled <-chan struct{},
475 log *logging.Entry,
476 streamEnd <-chan struct{},
477 ) error {
478 id, err := profileID(fqn, contextToken{}, s.config.ClusterDomain)
479 if err != nil {
480 log.Debug("Invalid service")
481 return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
482 }
483 err = s.profiles.Subscribe(id, listener)
484 if err != nil {
485 log.Warnf("Failed to subscribe to profile: %s", err)
486 return err
487 }
488 defer s.profiles.Unsubscribe(id, listener)
489
490 select {
491 case <-s.shutdown:
492 case <-canceled:
493 log.Debugf("GetProfile %s cancelled", fqn)
494 case <-streamEnd:
495 log.Errorf("GetProfile %s stream aborted", fqn)
496 }
497 return nil
498 }
499
500
501
502
503
504 func (s *server) subscribeToEndpointProfile(
505 service *watcher.ServiceID,
506 hostname,
507 ip string,
508 port uint32,
509 log *logging.Entry,
510 stream pb.Destination_GetProfileServer,
511 ) error {
512 canceled := stream.Context().Done()
513 streamEnd := make(chan struct{})
514 translator := newEndpointProfileTranslator(
515 s.config.EnableH2Upgrade,
516 s.config.ControllerNS,
517 s.config.IdentityTrustDomain,
518 s.config.DefaultOpaquePorts,
519 s.config.MeshedHttp2ClientParams,
520 stream,
521 streamEnd,
522 log,
523 )
524 translator.Start()
525 defer translator.Stop()
526
527 var err error
528 ip, err = s.workloads.Subscribe(service, hostname, ip, port, translator)
529 if err != nil {
530 return err
531 }
532 defer s.workloads.Unsubscribe(ip, port, translator)
533
534 select {
535 case <-s.shutdown:
536 case <-canceled:
537 s.log.Debugf("Cancelled")
538 case <-streamEnd:
539 log.Errorf("GetProfile %s:%d stream aborted", ip, port)
540 }
541 return nil
542 }
543
544
545
546 func getSvcID(k8sAPI *k8s.API, clusterIP string, log *logging.Entry) (*watcher.ServiceID, error) {
547 objs, err := k8sAPI.Svc().Informer().GetIndexer().ByIndex(watcher.PodIPIndex, clusterIP)
548 if err != nil {
549 return nil, status.Error(codes.Unknown, err.Error())
550 }
551 services := make([]*corev1.Service, 0)
552 for _, obj := range objs {
553 service := obj.(*corev1.Service)
554 services = append(services, service)
555 }
556 if len(services) > 1 {
557 conflictingServices := []string{}
558 for _, service := range services {
559 conflictingServices = append(conflictingServices, fmt.Sprintf("%s:%s", service.Namespace, service.Name))
560 }
561 log.Warnf("found conflicting %s cluster IP: %s", clusterIP, strings.Join(conflictingServices, ","))
562 return nil, status.Errorf(codes.FailedPrecondition, "found %d services with conflicting cluster IP %s", len(services), clusterIP)
563 }
564 if len(services) == 0 {
565 return nil, nil
566 }
567 service := &watcher.ServiceID{
568 Namespace: services[0].Namespace,
569 Name: services[0].Name,
570 }
571 return service, nil
572 }
573
574
575
576
577
578 type contextToken struct {
579 Ns string `json:"ns,omitempty"`
580 NodeName string `json:"nodeName,omitempty"`
581 Pod string `json:"pod,omitempty"`
582 }
583
584 func (s *server) parseContextToken(token string) contextToken {
585 ctxToken := contextToken{}
586 if token == "" {
587 return ctxToken
588 }
589 if err := json.Unmarshal([]byte(token), &ctxToken); err != nil {
590
591 parts := strings.Split(token, ":")
592 if len(parts) == 2 && parts[0] == "ns" {
593 s.log.Warnf("context token %s using old token format", token)
594 ctxToken = contextToken{
595 Ns: parts[1],
596 }
597 } else {
598 s.log.Errorf("context token %s is invalid: %s", token, err)
599 }
600 }
601 return ctxToken
602 }
603
604 func profileID(authority string, ctxToken contextToken, clusterDomain string) (watcher.ProfileID, error) {
605 host, _, err := getHostAndPort(authority)
606 if err != nil {
607 return watcher.ProfileID{}, fmt.Errorf("invalid authority: %w", err)
608 }
609 service, _, err := parseK8sServiceName(host, clusterDomain)
610 if err != nil {
611 return watcher.ProfileID{}, fmt.Errorf("invalid k8s service name: %w", err)
612 }
613 id := watcher.ProfileID{
614 Name: fmt.Sprintf("%s.%s.svc.%s", service.Name, service.Namespace, clusterDomain),
615 Namespace: service.Namespace,
616 }
617 if ctxToken.Ns != "" {
618 id.Namespace = ctxToken.Ns
619 }
620 return id, nil
621 }
622
623 func getHostAndPort(authority string) (string, watcher.Port, error) {
624 if !strings.Contains(authority, ":") {
625 return authority, watcher.Port(80), nil
626 }
627
628 host, sport, err := net.SplitHostPort(authority)
629 if err != nil {
630 return "", 0, fmt.Errorf("invalid destination: %w", err)
631 }
632 port, err := strconv.Atoi(sport)
633 if err != nil {
634 return "", 0, fmt.Errorf("invalid port %s: %w", sport, err)
635 }
636 if port <= 0 || port > 65535 {
637 return "", 0, fmt.Errorf("invalid port %d", port)
638 }
639 return host, watcher.Port(port), nil
640 }
641
642 type instanceID = string
643
644
645
646
647
648
649
650 func parseK8sServiceName(fqdn, clusterDomain string) (watcher.ServiceID, instanceID, error) {
651 labels := strings.Split(fqdn, ".")
652 suffix := append([]string{"svc"}, strings.Split(clusterDomain, ".")...)
653
654 if !hasSuffix(labels, suffix) {
655 return watcher.ServiceID{}, "", fmt.Errorf("name %s does not match cluster domain %s", fqdn, clusterDomain)
656 }
657
658 n := len(labels)
659 if n == 2+len(suffix) {
660
661 service := watcher.ServiceID{
662 Name: labels[0],
663 Namespace: labels[1],
664 }
665 return service, "", nil
666 }
667
668 if n == 3+len(suffix) {
669
670 instanceID := labels[0]
671 service := watcher.ServiceID{
672 Name: labels[1],
673 Namespace: labels[2],
674 }
675 return service, instanceID, nil
676 }
677
678 return watcher.ServiceID{}, "", fmt.Errorf("invalid k8s service %s", fqdn)
679 }
680
681 func hasSuffix(slice []string, suffix []string) bool {
682 if len(slice) < len(suffix) {
683 return false
684 }
685 for i, s := range slice[len(slice)-len(suffix):] {
686 if s != suffix[i] {
687 return false
688 }
689 }
690 return true
691 }
692
693 func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) map[uint32]struct{} {
694 annotation, ok := pod.Annotations[labels.ProxyIgnoreInboundPortsAnnotation]
695 if !ok || annotation == "" {
696 return nil
697 }
698
699 return util.ParsePorts(annotation)
700 }
701
View as plain text