1 package destination
2
3 import (
4 "fmt"
5 "net/netip"
6 "reflect"
7
8 pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
9 "github.com/linkerd/linkerd2-proxy-api/go/net"
10 "github.com/linkerd/linkerd2/controller/api/destination/watcher"
11 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
12 "github.com/linkerd/linkerd2/controller/k8s"
13 "github.com/linkerd/linkerd2/pkg/addr"
14 pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
15 "github.com/prometheus/client_golang/prometheus"
16 "github.com/prometheus/client_golang/prometheus/promauto"
17 logging "github.com/sirupsen/logrus"
18 corev1 "k8s.io/api/core/v1"
19 )
20
21 const (
22 defaultWeight uint32 = 10000
23
24
25
26 envInboundListenAddr = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"
27
28 updateQueueCapacity = 100
29 )
30
31
32
33 type (
34 endpointTranslator struct {
35 controllerNS string
36 identityTrustDomain string
37 nodeTopologyZone string
38 nodeName string
39 defaultOpaquePorts map[uint32]struct{}
40
41 enableH2Upgrade,
42 enableEndpointFiltering,
43 enableIPv6,
44
45 extEndpointZoneWeights bool
46
47 meshedHTTP2ClientParams *pb.Http2ClientParams
48
49 availableEndpoints watcher.AddressSet
50 filteredSnapshot watcher.AddressSet
51 stream pb.Destination_GetServer
52 endStream chan struct{}
53 log *logging.Entry
54 overflowCounter prometheus.Counter
55
56 updates chan interface{}
57 stop chan struct{}
58 }
59
60 addUpdate struct {
61 set watcher.AddressSet
62 }
63
64 removeUpdate struct {
65 set watcher.AddressSet
66 }
67
68 noEndpointsUpdate struct {
69 exists bool
70 }
71 )
72
73 var updatesQueueOverflowCounter = promauto.NewCounterVec(
74 prometheus.CounterOpts{
75 Name: "endpoint_updates_queue_overflow",
76 Help: "A counter incremented whenever the endpoint updates queue overflows",
77 },
78 []string{
79 "service",
80 },
81 )
82
83 func newEndpointTranslator(
84 controllerNS string,
85 identityTrustDomain string,
86 enableH2Upgrade,
87 enableEndpointFiltering,
88 enableIPv6,
89 extEndpointZoneWeights bool,
90 meshedHTTP2ClientParams *pb.Http2ClientParams,
91 service string,
92 srcNodeName string,
93 defaultOpaquePorts map[uint32]struct{},
94 k8sAPI *k8s.MetadataAPI,
95 stream pb.Destination_GetServer,
96 endStream chan struct{},
97 log *logging.Entry,
98 ) *endpointTranslator {
99 log = log.WithFields(logging.Fields{
100 "component": "endpoint-translator",
101 "service": service,
102 })
103
104 nodeTopologyZone, err := getNodeTopologyZone(k8sAPI, srcNodeName)
105 if err != nil {
106 log.Errorf("Failed to get node topology zone for node %s: %s", srcNodeName, err)
107 }
108 availableEndpoints := newEmptyAddressSet()
109
110 filteredSnapshot := newEmptyAddressSet()
111
112 return &endpointTranslator{
113 controllerNS,
114 identityTrustDomain,
115 nodeTopologyZone,
116 srcNodeName,
117 defaultOpaquePorts,
118 enableH2Upgrade,
119 enableEndpointFiltering,
120 enableIPv6,
121 extEndpointZoneWeights,
122 meshedHTTP2ClientParams,
123
124 availableEndpoints,
125 filteredSnapshot,
126 stream,
127 endStream,
128 log,
129 updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
130 make(chan interface{}, updateQueueCapacity),
131 make(chan struct{}),
132 }
133 }
134
135 func (et *endpointTranslator) Add(set watcher.AddressSet) {
136 et.enqueueUpdate(&addUpdate{set})
137 }
138
139 func (et *endpointTranslator) Remove(set watcher.AddressSet) {
140 et.enqueueUpdate(&removeUpdate{set})
141 }
142
143 func (et *endpointTranslator) NoEndpoints(exists bool) {
144 et.enqueueUpdate(&noEndpointsUpdate{exists})
145 }
146
147
148
149
150
151
152
153 func (et *endpointTranslator) enqueueUpdate(update interface{}) {
154 select {
155 case et.updates <- update:
156
157 default:
158
159
160 et.overflowCounter.Inc()
161 select {
162 case <-et.endStream:
163
164
165 default:
166 et.log.Error("endpoint update queue full; aborting stream")
167 close(et.endStream)
168 }
169 }
170 }
171
172
173
174
175
176 func (et *endpointTranslator) Start() {
177 go func() {
178 for {
179 select {
180 case update := <-et.updates:
181 et.processUpdate(update)
182 case <-et.stop:
183 return
184 }
185 }
186 }()
187 }
188
189
190 func (et *endpointTranslator) Stop() {
191 close(et.stop)
192 }
193
194 func (et *endpointTranslator) processUpdate(update interface{}) {
195 switch update := update.(type) {
196 case *addUpdate:
197 et.add(update.set)
198 case *removeUpdate:
199 et.remove(update.set)
200 case *noEndpointsUpdate:
201 et.noEndpoints(update.exists)
202 }
203 }
204
205 func (et *endpointTranslator) add(set watcher.AddressSet) {
206 for id, address := range set.Addresses {
207 et.availableEndpoints.Addresses[id] = address
208 }
209
210 et.availableEndpoints.Labels = set.Labels
211 et.availableEndpoints.LocalTrafficPolicy = set.LocalTrafficPolicy
212
213 et.sendFilteredUpdate()
214 }
215
216 func (et *endpointTranslator) remove(set watcher.AddressSet) {
217 for id := range set.Addresses {
218 delete(et.availableEndpoints.Addresses, id)
219 }
220
221 et.sendFilteredUpdate()
222 }
223
224 func (et *endpointTranslator) noEndpoints(exists bool) {
225 et.log.Debugf("NoEndpoints(%+v)", exists)
226
227 et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
228 et.filteredSnapshot.Addresses = map[watcher.ID]watcher.Address{}
229
230 u := &pb.Update{
231 Update: &pb.Update_NoEndpoints{
232 NoEndpoints: &pb.NoEndpoints{
233 Exists: exists,
234 },
235 },
236 }
237
238 et.log.Debugf("Sending destination no endpoints: %+v", u)
239 if err := et.stream.Send(u); err != nil {
240 et.log.Debugf("Failed to send address update: %s", err)
241 }
242 }
243
244 func (et *endpointTranslator) sendFilteredUpdate() {
245 filtered := et.filterAddresses()
246 filtered = et.selectAddressFamily(filtered)
247 diffAdd, diffRemove := et.diffEndpoints(filtered)
248
249 if len(diffAdd.Addresses) > 0 {
250 et.sendClientAdd(diffAdd)
251 }
252 if len(diffRemove.Addresses) > 0 {
253 et.sendClientRemove(diffRemove)
254 }
255
256 et.filteredSnapshot = filtered
257 }
258
259 func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet {
260 filtered := make(map[watcher.ID]watcher.Address)
261 for id, addr := range addresses.Addresses {
262 if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 {
263 continue
264 }
265
266 if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 {
267
268
269 altID := id
270 altID.IPFamily = corev1.IPv6Protocol
271 if _, ok := addresses.Addresses[altID]; ok {
272 continue
273 }
274 }
275
276 filtered[id] = addr
277 }
278
279 return watcher.AddressSet{
280 Addresses: filtered,
281 Labels: addresses.Labels,
282 LocalTrafficPolicy: addresses.LocalTrafficPolicy,
283 }
284 }
285
286
287
288
289
290
291
292 func (et *endpointTranslator) filterAddresses() watcher.AddressSet {
293 filtered := make(map[watcher.ID]watcher.Address)
294
295
296 if !et.enableEndpointFiltering {
297 for k, v := range et.availableEndpoints.Addresses {
298 filtered[k] = v
299 }
300 return watcher.AddressSet{
301 Addresses: filtered,
302 Labels: et.availableEndpoints.Labels,
303 }
304 }
305
306
307
308 if et.availableEndpoints.LocalTrafficPolicy {
309 et.log.Debugf("Filtering through addresses that should be consumed by node %s", et.nodeName)
310 for id, address := range et.availableEndpoints.Addresses {
311 if address.Pod != nil && address.Pod.Spec.NodeName == et.nodeName {
312 filtered[id] = address
313 }
314 }
315 et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
316 return watcher.AddressSet{
317 Addresses: filtered,
318 Labels: et.availableEndpoints.Labels,
319 LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
320 }
321 }
322
323
324
325 for _, address := range et.availableEndpoints.Addresses {
326 if len(address.ForZones) == 0 {
327 for k, v := range et.availableEndpoints.Addresses {
328 filtered[k] = v
329 }
330 et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods")
331 return watcher.AddressSet{
332 Addresses: filtered,
333 Labels: et.availableEndpoints.Labels,
334 LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
335 }
336 }
337 }
338
339
340
341 et.log.Debugf("Filtering through addresses that should be consumed by zone %s", et.nodeTopologyZone)
342 for id, address := range et.availableEndpoints.Addresses {
343 for _, zone := range address.ForZones {
344 if zone.Name == et.nodeTopologyZone {
345 filtered[id] = address
346 }
347 }
348 }
349 if len(filtered) > 0 {
350 et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
351 return watcher.AddressSet{
352 Addresses: filtered,
353 Labels: et.availableEndpoints.Labels,
354 LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
355 }
356 }
357
358
359
360 for k, v := range et.availableEndpoints.Addresses {
361 filtered[k] = v
362 }
363 return watcher.AddressSet{
364 Addresses: filtered,
365 Labels: et.availableEndpoints.Labels,
366 LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
367 }
368 }
369
370
371
372
373
374
375 func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watcher.AddressSet, watcher.AddressSet) {
376 add := make(map[watcher.ID]watcher.Address)
377 remove := make(map[watcher.ID]watcher.Address)
378
379 for id, new := range filtered.Addresses {
380 old, ok := et.filteredSnapshot.Addresses[id]
381 if !ok {
382 add[id] = new
383 } else if !reflect.DeepEqual(old, new) {
384 add[id] = new
385 }
386 }
387
388 for id, address := range et.filteredSnapshot.Addresses {
389 if _, ok := filtered.Addresses[id]; !ok {
390 remove[id] = address
391 }
392 }
393
394 return watcher.AddressSet{
395 Addresses: add,
396 Labels: filtered.Labels,
397 },
398 watcher.AddressSet{
399 Addresses: remove,
400 Labels: filtered.Labels,
401 }
402 }
403
404 func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
405 addrs := []*pb.WeightedAddr{}
406 for _, address := range set.Addresses {
407 var (
408 wa *pb.WeightedAddr
409 opaquePorts map[uint32]struct{}
410 err error
411 )
412 if address.Pod != nil {
413 opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
414 wa, err = createWeightedAddr(address, opaquePorts,
415 et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
416 if err != nil {
417 et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err)
418 continue
419 }
420 } else if address.ExternalWorkload != nil {
421 opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
422 wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts, et.meshedHTTP2ClientParams)
423 if err != nil {
424 et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
425 continue
426 }
427 } else {
428
429
430 var addr *net.TcpAddress
431 addr, err = toAddr(address)
432 if err != nil {
433 et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
434 continue
435 }
436
437 var authOverride *pb.AuthorityOverride
438 if address.AuthorityOverride != "" {
439 authOverride = &pb.AuthorityOverride{
440 AuthorityOverride: address.AuthorityOverride,
441 }
442 }
443 wa = &pb.WeightedAddr{
444 Addr: addr,
445 Weight: defaultWeight,
446 AuthorityOverride: authOverride,
447 }
448
449 if address.Identity != "" {
450 wa.TlsIdentity = &pb.TlsIdentity{
451 Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
452 DnsLikeIdentity: &pb.TlsIdentity_DnsLikeIdentity{
453 Name: address.Identity,
454 },
455 },
456 }
457 if et.enableH2Upgrade {
458 wa.ProtocolHint = &pb.ProtocolHint{
459 Protocol: &pb.ProtocolHint_H2_{
460 H2: &pb.ProtocolHint_H2{},
461 },
462 }
463 }
464 wa.Http2 = et.meshedHTTP2ClientParams
465 }
466 }
467
468 if et.extEndpointZoneWeights {
469
470
471 if et.nodeTopologyZone != "" && address.Zone != nil && *address.Zone == et.nodeTopologyZone {
472 wa.Weight *= 10
473 }
474 }
475
476 addrs = append(addrs, wa)
477 }
478
479 add := &pb.Update{Update: &pb.Update_Add{
480 Add: &pb.WeightedAddrSet{
481 Addrs: addrs,
482 MetricLabels: set.Labels,
483 },
484 }}
485
486 et.log.Debugf("Sending destination add: %+v", add)
487 if err := et.stream.Send(add); err != nil {
488 et.log.Debugf("Failed to send address update: %s", err)
489 }
490 }
491
492 func (et *endpointTranslator) sendClientRemove(set watcher.AddressSet) {
493 addrs := []*net.TcpAddress{}
494 for _, address := range set.Addresses {
495 tcpAddr, err := toAddr(address)
496 if err != nil {
497 et.log.Errorf("Failed to translate endpoints to addr: %s", err)
498 continue
499 }
500 addrs = append(addrs, tcpAddr)
501 }
502
503 remove := &pb.Update{Update: &pb.Update_Remove{
504 Remove: &pb.AddrSet{
505 Addrs: addrs,
506 },
507 }}
508
509 et.log.Debugf("Sending destination remove: %+v", remove)
510 if err := et.stream.Send(remove); err != nil {
511 et.log.Debugf("Failed to send address update: %s", err)
512 }
513 }
514
515 func toAddr(address watcher.Address) (*net.TcpAddress, error) {
516 ip, err := addr.ParseProxyIP(address.IP)
517 if err != nil {
518 return nil, err
519 }
520 return &net.TcpAddress{
521 Ip: ip,
522 Port: address.Port,
523 }, nil
524 }
525
526 func createWeightedAddrForExternalWorkload(
527 address watcher.Address,
528 opaquePorts map[uint32]struct{},
529 http2 *pb.Http2ClientParams,
530 ) (*pb.WeightedAddr, error) {
531 tcpAddr, err := toAddr(address)
532 if err != nil {
533 return nil, err
534 }
535
536 weightedAddr := pb.WeightedAddr{
537 Addr: tcpAddr,
538 Weight: defaultWeight,
539 MetricLabels: map[string]string{},
540 }
541
542 weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
543
544
545 if address.ExternalWorkload == nil {
546 return &weightedAddr, nil
547 }
548
549 weightedAddr.ProtocolHint = &pb.ProtocolHint{}
550 weightedAddr.Http2 = http2
551
552 _, opaquePort := opaquePorts[address.Port]
553
554
555
556 if address.OpaqueProtocol || opaquePort {
557 weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
558 Opaque: &pb.ProtocolHint_Opaque{},
559 }
560
561 port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
562 if err != nil {
563 return nil, fmt.Errorf("failed to read inbound port: %w", err)
564 }
565 weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
566 InboundPort: port,
567 }
568 } else {
569 weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
570 H2: &pb.ProtocolHint_H2{},
571 }
572 }
573
574
575 weightedAddr.TlsIdentity = &pb.TlsIdentity{
576 Strategy: &pb.TlsIdentity_UriLikeIdentity_{
577 UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
578 Uri: address.ExternalWorkload.Spec.MeshTLS.Identity,
579 },
580 },
581 ServerName: &pb.TlsIdentity_DnsLikeIdentity{
582 Name: address.ExternalWorkload.Spec.MeshTLS.ServerName,
583 },
584 }
585
586 weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
587
588 z := ""
589 if address.Zone != nil {
590 z = *address.Zone
591 }
592 weightedAddr.MetricLabels["zone"] = z
593
594 return &weightedAddr, nil
595 }
596
597 func createWeightedAddr(
598 address watcher.Address,
599 opaquePorts map[uint32]struct{},
600 enableH2Upgrade bool,
601 identityTrustDomain string,
602 controllerNS string,
603 meshedHttp2 *pb.Http2ClientParams,
604 ) (*pb.WeightedAddr, error) {
605 tcpAddr, err := toAddr(address)
606 if err != nil {
607 return nil, err
608 }
609
610 weightedAddr := pb.WeightedAddr{
611 Addr: tcpAddr,
612 Weight: defaultWeight,
613 MetricLabels: map[string]string{},
614 }
615
616
617
618 if address.Pod == nil {
619 return &weightedAddr, nil
620 }
621
622 skippedInboundPorts := getPodSkippedInboundPortsAnnotations(address.Pod)
623
624 controllerNSLabel := address.Pod.Labels[pkgK8s.ControllerNSLabel]
625 sa, ns := pkgK8s.GetServiceAccountAndNS(address.Pod)
626 weightedAddr.MetricLabels = pkgK8s.GetPodLabels(address.OwnerKind, address.OwnerName, address.Pod)
627
628
629 z := ""
630 if address.Zone != nil {
631 z = *address.Zone
632 }
633 weightedAddr.MetricLabels["zone"] = z
634
635 _, isSkippedInboundPort := skippedInboundPorts[address.Port]
636
637 if controllerNSLabel != "" && !isSkippedInboundPort {
638 weightedAddr.Http2 = meshedHttp2
639 weightedAddr.ProtocolHint = &pb.ProtocolHint{}
640
641 _, opaquePort := opaquePorts[address.Port]
642
643
644
645 if address.OpaqueProtocol || opaquePort {
646 weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
647 Opaque: &pb.ProtocolHint_Opaque{},
648 }
649
650 port, err := getInboundPort(&address.Pod.Spec)
651 if err != nil {
652 return nil, fmt.Errorf("failed to read inbound port: %w", err)
653 }
654 weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
655 InboundPort: port,
656 }
657 } else if enableH2Upgrade {
658
659
660
661 weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
662 H2: &pb.ProtocolHint_H2{},
663 }
664 }
665 }
666
667
668
669
670
671
672 if identityTrustDomain != "" &&
673 controllerNSLabel == controllerNS &&
674 !isSkippedInboundPort {
675
676 id := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", sa, ns, controllerNSLabel, identityTrustDomain)
677 tlsId := &pb.TlsIdentity_DnsLikeIdentity{Name: id}
678
679 weightedAddr.TlsIdentity = &pb.TlsIdentity{
680 Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
681 DnsLikeIdentity: tlsId,
682 },
683 ServerName: tlsId,
684 }
685 }
686
687 return &weightedAddr, nil
688 }
689
690 func getNodeTopologyZone(k8sAPI *k8s.MetadataAPI, srcNode string) (string, error) {
691 node, err := k8sAPI.Get(k8s.Node, srcNode)
692 if err != nil {
693 return "", err
694 }
695 if zone, ok := node.Labels[corev1.LabelTopologyZone]; ok {
696 return zone, nil
697 }
698 return "", nil
699 }
700
701 func newEmptyAddressSet() watcher.AddressSet {
702 return watcher.AddressSet{
703 Addresses: make(map[watcher.ID]watcher.Address),
704 Labels: make(map[string]string),
705 }
706 }
707
708
709
710 func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
711 containers := append(podSpec.InitContainers, podSpec.Containers...)
712 for _, containerSpec := range containers {
713 if containerSpec.Name != pkgK8s.ProxyContainerName {
714 continue
715 }
716 for _, envVar := range containerSpec.Env {
717 if envVar.Name != envInboundListenAddr {
718 continue
719 }
720 addrPort, err := netip.ParseAddrPort(envVar.Value)
721 if err != nil {
722 return 0, fmt.Errorf("failed to parse inbound port for proxy container: %w", err)
723 }
724
725 return uint32(addrPort.Port()), nil
726 }
727 }
728 return 0, fmt.Errorf("failed to find %s environment variable in any container for given pod spec", envInboundListenAddr)
729 }
730
731
732
733 func getInboundPortFromExternalWorkload(ewSpec *ewv1beta1.ExternalWorkloadSpec) (uint32, error) {
734 for _, p := range ewSpec.Ports {
735 if p.Name == pkgK8s.ProxyPortName {
736 return uint32(p.Port), nil
737 }
738 }
739
740 return 0, fmt.Errorf("failed to find %s port for given ExternalWorkloadSpec", pkgK8s.ProxyPortName)
741 }
742
View as plain text