...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/endpoint_translator.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"fmt"
     5  	"net/netip"
     6  	"reflect"
     7  
     8  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
     9  	"github.com/linkerd/linkerd2-proxy-api/go/net"
    10  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
    11  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
    12  	"github.com/linkerd/linkerd2/controller/k8s"
    13  	"github.com/linkerd/linkerd2/pkg/addr"
    14  	pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
    15  	"github.com/prometheus/client_golang/prometheus"
    16  	"github.com/prometheus/client_golang/prometheus/promauto"
    17  	logging "github.com/sirupsen/logrus"
    18  	corev1 "k8s.io/api/core/v1"
    19  )
    20  
    21  const (
    22  	defaultWeight uint32 = 10000
    23  
    24  	// inboundListenAddr is the environment variable holding the inbound
    25  	// listening address for the proxy container.
    26  	envInboundListenAddr = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"
    27  
    28  	updateQueueCapacity = 100
    29  )
    30  
    31  // endpointTranslator satisfies EndpointUpdateListener and translates updates
    32  // into Destination.Get messages.
    33  type (
    34  	endpointTranslator struct {
    35  		controllerNS        string
    36  		identityTrustDomain string
    37  		nodeTopologyZone    string
    38  		nodeName            string
    39  		defaultOpaquePorts  map[uint32]struct{}
    40  
    41  		enableH2Upgrade,
    42  		enableEndpointFiltering,
    43  		enableIPv6,
    44  
    45  		extEndpointZoneWeights bool
    46  
    47  		meshedHTTP2ClientParams *pb.Http2ClientParams
    48  
    49  		availableEndpoints watcher.AddressSet
    50  		filteredSnapshot   watcher.AddressSet
    51  		stream             pb.Destination_GetServer
    52  		endStream          chan struct{}
    53  		log                *logging.Entry
    54  		overflowCounter    prometheus.Counter
    55  
    56  		updates chan interface{}
    57  		stop    chan struct{}
    58  	}
    59  
    60  	addUpdate struct {
    61  		set watcher.AddressSet
    62  	}
    63  
    64  	removeUpdate struct {
    65  		set watcher.AddressSet
    66  	}
    67  
    68  	noEndpointsUpdate struct {
    69  		exists bool
    70  	}
    71  )
    72  
    73  var updatesQueueOverflowCounter = promauto.NewCounterVec(
    74  	prometheus.CounterOpts{
    75  		Name: "endpoint_updates_queue_overflow",
    76  		Help: "A counter incremented whenever the endpoint updates queue overflows",
    77  	},
    78  	[]string{
    79  		"service",
    80  	},
    81  )
    82  
    83  func newEndpointTranslator(
    84  	controllerNS string,
    85  	identityTrustDomain string,
    86  	enableH2Upgrade,
    87  	enableEndpointFiltering,
    88  	enableIPv6,
    89  	extEndpointZoneWeights bool,
    90  	meshedHTTP2ClientParams *pb.Http2ClientParams,
    91  	service string,
    92  	srcNodeName string,
    93  	defaultOpaquePorts map[uint32]struct{},
    94  	k8sAPI *k8s.MetadataAPI,
    95  	stream pb.Destination_GetServer,
    96  	endStream chan struct{},
    97  	log *logging.Entry,
    98  ) *endpointTranslator {
    99  	log = log.WithFields(logging.Fields{
   100  		"component": "endpoint-translator",
   101  		"service":   service,
   102  	})
   103  
   104  	nodeTopologyZone, err := getNodeTopologyZone(k8sAPI, srcNodeName)
   105  	if err != nil {
   106  		log.Errorf("Failed to get node topology zone for node %s: %s", srcNodeName, err)
   107  	}
   108  	availableEndpoints := newEmptyAddressSet()
   109  
   110  	filteredSnapshot := newEmptyAddressSet()
   111  
   112  	return &endpointTranslator{
   113  		controllerNS,
   114  		identityTrustDomain,
   115  		nodeTopologyZone,
   116  		srcNodeName,
   117  		defaultOpaquePorts,
   118  		enableH2Upgrade,
   119  		enableEndpointFiltering,
   120  		enableIPv6,
   121  		extEndpointZoneWeights,
   122  		meshedHTTP2ClientParams,
   123  
   124  		availableEndpoints,
   125  		filteredSnapshot,
   126  		stream,
   127  		endStream,
   128  		log,
   129  		updatesQueueOverflowCounter.With(prometheus.Labels{"service": service}),
   130  		make(chan interface{}, updateQueueCapacity),
   131  		make(chan struct{}),
   132  	}
   133  }
   134  
   135  func (et *endpointTranslator) Add(set watcher.AddressSet) {
   136  	et.enqueueUpdate(&addUpdate{set})
   137  }
   138  
   139  func (et *endpointTranslator) Remove(set watcher.AddressSet) {
   140  	et.enqueueUpdate(&removeUpdate{set})
   141  }
   142  
   143  func (et *endpointTranslator) NoEndpoints(exists bool) {
   144  	et.enqueueUpdate(&noEndpointsUpdate{exists})
   145  }
   146  
   147  // Add, Remove, and NoEndpoints are called from a client-go informer callback
   148  // and therefore must not block. For each of these, we enqueue an update in
   149  // a channel so that it can be processed asyncronously. To ensure that enqueuing
   150  // does not block, we first check to see if there is capacity in the buffered
   151  // channel. If there is not, we drop the update and signal to the stream that
   152  // it has fallen too far behind and should be closed.
   153  func (et *endpointTranslator) enqueueUpdate(update interface{}) {
   154  	select {
   155  	case et.updates <- update:
   156  		// Update has been successfully enqueued.
   157  	default:
   158  		// We are unable to enqueue because the channel does not have capacity.
   159  		// The stream has fallen too far behind and should be closed.
   160  		et.overflowCounter.Inc()
   161  		select {
   162  		case <-et.endStream:
   163  			// The endStream channel has already been closed so no action is
   164  			// necessary.
   165  		default:
   166  			et.log.Error("endpoint update queue full; aborting stream")
   167  			close(et.endStream)
   168  		}
   169  	}
   170  }
   171  
   172  // Start initiates a goroutine which processes update events off of the
   173  // endpointTranslator's internal queue and sends to the grpc stream as
   174  // appropriate. The goroutine calls several non-thread-safe functions (including
   175  // Send) and therefore, Start must not be called more than once.
   176  func (et *endpointTranslator) Start() {
   177  	go func() {
   178  		for {
   179  			select {
   180  			case update := <-et.updates:
   181  				et.processUpdate(update)
   182  			case <-et.stop:
   183  				return
   184  			}
   185  		}
   186  	}()
   187  }
   188  
   189  // Stop terminates the goroutine started by Start.
   190  func (et *endpointTranslator) Stop() {
   191  	close(et.stop)
   192  }
   193  
   194  func (et *endpointTranslator) processUpdate(update interface{}) {
   195  	switch update := update.(type) {
   196  	case *addUpdate:
   197  		et.add(update.set)
   198  	case *removeUpdate:
   199  		et.remove(update.set)
   200  	case *noEndpointsUpdate:
   201  		et.noEndpoints(update.exists)
   202  	}
   203  }
   204  
   205  func (et *endpointTranslator) add(set watcher.AddressSet) {
   206  	for id, address := range set.Addresses {
   207  		et.availableEndpoints.Addresses[id] = address
   208  	}
   209  
   210  	et.availableEndpoints.Labels = set.Labels
   211  	et.availableEndpoints.LocalTrafficPolicy = set.LocalTrafficPolicy
   212  
   213  	et.sendFilteredUpdate()
   214  }
   215  
   216  func (et *endpointTranslator) remove(set watcher.AddressSet) {
   217  	for id := range set.Addresses {
   218  		delete(et.availableEndpoints.Addresses, id)
   219  	}
   220  
   221  	et.sendFilteredUpdate()
   222  }
   223  
   224  func (et *endpointTranslator) noEndpoints(exists bool) {
   225  	et.log.Debugf("NoEndpoints(%+v)", exists)
   226  
   227  	et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
   228  	et.filteredSnapshot.Addresses = map[watcher.ID]watcher.Address{}
   229  
   230  	u := &pb.Update{
   231  		Update: &pb.Update_NoEndpoints{
   232  			NoEndpoints: &pb.NoEndpoints{
   233  				Exists: exists,
   234  			},
   235  		},
   236  	}
   237  
   238  	et.log.Debugf("Sending destination no endpoints: %+v", u)
   239  	if err := et.stream.Send(u); err != nil {
   240  		et.log.Debugf("Failed to send address update: %s", err)
   241  	}
   242  }
   243  
   244  func (et *endpointTranslator) sendFilteredUpdate() {
   245  	filtered := et.filterAddresses()
   246  	filtered = et.selectAddressFamily(filtered)
   247  	diffAdd, diffRemove := et.diffEndpoints(filtered)
   248  
   249  	if len(diffAdd.Addresses) > 0 {
   250  		et.sendClientAdd(diffAdd)
   251  	}
   252  	if len(diffRemove.Addresses) > 0 {
   253  		et.sendClientRemove(diffRemove)
   254  	}
   255  
   256  	et.filteredSnapshot = filtered
   257  }
   258  
   259  func (et *endpointTranslator) selectAddressFamily(addresses watcher.AddressSet) watcher.AddressSet {
   260  	filtered := make(map[watcher.ID]watcher.Address)
   261  	for id, addr := range addresses.Addresses {
   262  		if id.IPFamily == corev1.IPv6Protocol && !et.enableIPv6 {
   263  			continue
   264  		}
   265  
   266  		if id.IPFamily == corev1.IPv4Protocol && et.enableIPv6 {
   267  			// Only consider IPv4 address for which there's not already an IPv6
   268  			// alternative
   269  			altID := id
   270  			altID.IPFamily = corev1.IPv6Protocol
   271  			if _, ok := addresses.Addresses[altID]; ok {
   272  				continue
   273  			}
   274  		}
   275  
   276  		filtered[id] = addr
   277  	}
   278  
   279  	return watcher.AddressSet{
   280  		Addresses:          filtered,
   281  		Labels:             addresses.Labels,
   282  		LocalTrafficPolicy: addresses.LocalTrafficPolicy,
   283  	}
   284  }
   285  
   286  // filterAddresses is responsible for filtering endpoints based on the node's
   287  // topology zone. The client will only receive endpoints with the same
   288  // consumption zone as the node. An endpoints consumption zone is set
   289  // by its Hints field and can be different than its actual Topology zone.
   290  // when service.spec.internalTrafficPolicy is set to local, Topology Aware
   291  // Hints are not used.
   292  func (et *endpointTranslator) filterAddresses() watcher.AddressSet {
   293  	filtered := make(map[watcher.ID]watcher.Address)
   294  
   295  	// If endpoint filtering is disabled, return all available addresses.
   296  	if !et.enableEndpointFiltering {
   297  		for k, v := range et.availableEndpoints.Addresses {
   298  			filtered[k] = v
   299  		}
   300  		return watcher.AddressSet{
   301  			Addresses: filtered,
   302  			Labels:    et.availableEndpoints.Labels,
   303  		}
   304  	}
   305  
   306  	// If service.spec.internalTrafficPolicy is set to local, filter and return the addresses
   307  	// for local node only
   308  	if et.availableEndpoints.LocalTrafficPolicy {
   309  		et.log.Debugf("Filtering through addresses that should be consumed by node %s", et.nodeName)
   310  		for id, address := range et.availableEndpoints.Addresses {
   311  			if address.Pod != nil && address.Pod.Spec.NodeName == et.nodeName {
   312  				filtered[id] = address
   313  			}
   314  		}
   315  		et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
   316  		return watcher.AddressSet{
   317  			Addresses:          filtered,
   318  			Labels:             et.availableEndpoints.Labels,
   319  			LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
   320  		}
   321  	}
   322  	// If any address does not have a hint, then all hints are ignored and all
   323  	// available addresses are returned. This replicates kube-proxy behavior
   324  	// documented in the KEP: https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/2433-topology-aware-hints/README.md#kube-proxy
   325  	for _, address := range et.availableEndpoints.Addresses {
   326  		if len(address.ForZones) == 0 {
   327  			for k, v := range et.availableEndpoints.Addresses {
   328  				filtered[k] = v
   329  			}
   330  			et.log.Debugf("Hints not available on endpointslice. Zone Filtering disabled. Falling back to routing to all pods")
   331  			return watcher.AddressSet{
   332  				Addresses:          filtered,
   333  				Labels:             et.availableEndpoints.Labels,
   334  				LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
   335  			}
   336  		}
   337  	}
   338  
   339  	// Each address that has a hint matching the node's zone should be added
   340  	// to the set of addresses that will be returned.
   341  	et.log.Debugf("Filtering through addresses that should be consumed by zone %s", et.nodeTopologyZone)
   342  	for id, address := range et.availableEndpoints.Addresses {
   343  		for _, zone := range address.ForZones {
   344  			if zone.Name == et.nodeTopologyZone {
   345  				filtered[id] = address
   346  			}
   347  		}
   348  	}
   349  	if len(filtered) > 0 {
   350  		et.log.Debugf("Filtered from %d to %d addresses", len(et.availableEndpoints.Addresses), len(filtered))
   351  		return watcher.AddressSet{
   352  			Addresses:          filtered,
   353  			Labels:             et.availableEndpoints.Labels,
   354  			LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
   355  		}
   356  	}
   357  
   358  	// If there were no filtered addresses, then fall to using endpoints from
   359  	// all zones.
   360  	for k, v := range et.availableEndpoints.Addresses {
   361  		filtered[k] = v
   362  	}
   363  	return watcher.AddressSet{
   364  		Addresses:          filtered,
   365  		Labels:             et.availableEndpoints.Labels,
   366  		LocalTrafficPolicy: et.availableEndpoints.LocalTrafficPolicy,
   367  	}
   368  }
   369  
   370  // diffEndpoints calculates the difference between the filtered set of
   371  // endpoints in the current (Add/Remove) operation and the snapshot of
   372  // previously filtered endpoints. This diff allows the client to receive only
   373  // the endpoints that match the topological zone, by adding new endpoints and
   374  // removing stale ones.
   375  func (et *endpointTranslator) diffEndpoints(filtered watcher.AddressSet) (watcher.AddressSet, watcher.AddressSet) {
   376  	add := make(map[watcher.ID]watcher.Address)
   377  	remove := make(map[watcher.ID]watcher.Address)
   378  
   379  	for id, new := range filtered.Addresses {
   380  		old, ok := et.filteredSnapshot.Addresses[id]
   381  		if !ok {
   382  			add[id] = new
   383  		} else if !reflect.DeepEqual(old, new) {
   384  			add[id] = new
   385  		}
   386  	}
   387  
   388  	for id, address := range et.filteredSnapshot.Addresses {
   389  		if _, ok := filtered.Addresses[id]; !ok {
   390  			remove[id] = address
   391  		}
   392  	}
   393  
   394  	return watcher.AddressSet{
   395  			Addresses: add,
   396  			Labels:    filtered.Labels,
   397  		},
   398  		watcher.AddressSet{
   399  			Addresses: remove,
   400  			Labels:    filtered.Labels,
   401  		}
   402  }
   403  
   404  func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
   405  	addrs := []*pb.WeightedAddr{}
   406  	for _, address := range set.Addresses {
   407  		var (
   408  			wa          *pb.WeightedAddr
   409  			opaquePorts map[uint32]struct{}
   410  			err         error
   411  		)
   412  		if address.Pod != nil {
   413  			opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
   414  			wa, err = createWeightedAddr(address, opaquePorts,
   415  				et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.meshedHTTP2ClientParams)
   416  			if err != nil {
   417  				et.log.Errorf("Failed to translate Pod endpoints to weighted addr: %s", err)
   418  				continue
   419  			}
   420  		} else if address.ExternalWorkload != nil {
   421  			opaquePorts = watcher.GetAnnotatedOpaquePortsForExternalWorkload(address.ExternalWorkload, et.defaultOpaquePorts)
   422  			wa, err = createWeightedAddrForExternalWorkload(address, opaquePorts, et.meshedHTTP2ClientParams)
   423  			if err != nil {
   424  				et.log.Errorf("Failed to translate ExternalWorkload endpoints to weighted addr: %s", err)
   425  				continue
   426  			}
   427  		} else {
   428  			// When there's no associated pod, we may still need to set metadata
   429  			// (especially for remote multi-cluster services).
   430  			var addr *net.TcpAddress
   431  			addr, err = toAddr(address)
   432  			if err != nil {
   433  				et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
   434  				continue
   435  			}
   436  
   437  			var authOverride *pb.AuthorityOverride
   438  			if address.AuthorityOverride != "" {
   439  				authOverride = &pb.AuthorityOverride{
   440  					AuthorityOverride: address.AuthorityOverride,
   441  				}
   442  			}
   443  			wa = &pb.WeightedAddr{
   444  				Addr:              addr,
   445  				Weight:            defaultWeight,
   446  				AuthorityOverride: authOverride,
   447  			}
   448  
   449  			if address.Identity != "" {
   450  				wa.TlsIdentity = &pb.TlsIdentity{
   451  					Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
   452  						DnsLikeIdentity: &pb.TlsIdentity_DnsLikeIdentity{
   453  							Name: address.Identity,
   454  						},
   455  					},
   456  				}
   457  				if et.enableH2Upgrade {
   458  					wa.ProtocolHint = &pb.ProtocolHint{
   459  						Protocol: &pb.ProtocolHint_H2_{
   460  							H2: &pb.ProtocolHint_H2{},
   461  						},
   462  					}
   463  				}
   464  				wa.Http2 = et.meshedHTTP2ClientParams
   465  			}
   466  		}
   467  
   468  		if et.extEndpointZoneWeights {
   469  			// EXPERIMENTAL: Use the endpoint weight field to indicate zonal
   470  			// preference so that local endoints are more heavily weighted.
   471  			if et.nodeTopologyZone != "" && address.Zone != nil && *address.Zone == et.nodeTopologyZone {
   472  				wa.Weight *= 10
   473  			}
   474  		}
   475  
   476  		addrs = append(addrs, wa)
   477  	}
   478  
   479  	add := &pb.Update{Update: &pb.Update_Add{
   480  		Add: &pb.WeightedAddrSet{
   481  			Addrs:        addrs,
   482  			MetricLabels: set.Labels,
   483  		},
   484  	}}
   485  
   486  	et.log.Debugf("Sending destination add: %+v", add)
   487  	if err := et.stream.Send(add); err != nil {
   488  		et.log.Debugf("Failed to send address update: %s", err)
   489  	}
   490  }
   491  
   492  func (et *endpointTranslator) sendClientRemove(set watcher.AddressSet) {
   493  	addrs := []*net.TcpAddress{}
   494  	for _, address := range set.Addresses {
   495  		tcpAddr, err := toAddr(address)
   496  		if err != nil {
   497  			et.log.Errorf("Failed to translate endpoints to addr: %s", err)
   498  			continue
   499  		}
   500  		addrs = append(addrs, tcpAddr)
   501  	}
   502  
   503  	remove := &pb.Update{Update: &pb.Update_Remove{
   504  		Remove: &pb.AddrSet{
   505  			Addrs: addrs,
   506  		},
   507  	}}
   508  
   509  	et.log.Debugf("Sending destination remove: %+v", remove)
   510  	if err := et.stream.Send(remove); err != nil {
   511  		et.log.Debugf("Failed to send address update: %s", err)
   512  	}
   513  }
   514  
   515  func toAddr(address watcher.Address) (*net.TcpAddress, error) {
   516  	ip, err := addr.ParseProxyIP(address.IP)
   517  	if err != nil {
   518  		return nil, err
   519  	}
   520  	return &net.TcpAddress{
   521  		Ip:   ip,
   522  		Port: address.Port,
   523  	}, nil
   524  }
   525  
   526  func createWeightedAddrForExternalWorkload(
   527  	address watcher.Address,
   528  	opaquePorts map[uint32]struct{},
   529  	http2 *pb.Http2ClientParams,
   530  ) (*pb.WeightedAddr, error) {
   531  	tcpAddr, err := toAddr(address)
   532  	if err != nil {
   533  		return nil, err
   534  	}
   535  
   536  	weightedAddr := pb.WeightedAddr{
   537  		Addr:         tcpAddr,
   538  		Weight:       defaultWeight,
   539  		MetricLabels: map[string]string{},
   540  	}
   541  
   542  	weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
   543  	// If the address is not backed by an ExternalWorkload, there is no additional metadata
   544  	// to add.
   545  	if address.ExternalWorkload == nil {
   546  		return &weightedAddr, nil
   547  	}
   548  
   549  	weightedAddr.ProtocolHint = &pb.ProtocolHint{}
   550  	weightedAddr.Http2 = http2
   551  
   552  	_, opaquePort := opaquePorts[address.Port]
   553  	// If address is set as opaque by a Server, or its port is set as
   554  	// opaque by annotation or default value, then set the hinted protocol to
   555  	// Opaque.
   556  	if address.OpaqueProtocol || opaquePort {
   557  		weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
   558  			Opaque: &pb.ProtocolHint_Opaque{},
   559  		}
   560  
   561  		port, err := getInboundPortFromExternalWorkload(&address.ExternalWorkload.Spec)
   562  		if err != nil {
   563  			return nil, fmt.Errorf("failed to read inbound port: %w", err)
   564  		}
   565  		weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
   566  			InboundPort: port,
   567  		}
   568  	} else {
   569  		weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
   570  			H2: &pb.ProtocolHint_H2{},
   571  		}
   572  	}
   573  
   574  	// we assume external workloads support only SPIRE identity
   575  	weightedAddr.TlsIdentity = &pb.TlsIdentity{
   576  		Strategy: &pb.TlsIdentity_UriLikeIdentity_{
   577  			UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
   578  				Uri: address.ExternalWorkload.Spec.MeshTLS.Identity,
   579  			},
   580  		},
   581  		ServerName: &pb.TlsIdentity_DnsLikeIdentity{
   582  			Name: address.ExternalWorkload.Spec.MeshTLS.ServerName,
   583  		},
   584  	}
   585  
   586  	weightedAddr.MetricLabels = pkgK8s.GetExternalWorkloadLabels(address.OwnerKind, address.OwnerName, address.ExternalWorkload)
   587  	// Set a zone label, even if it is empty (for consistency).
   588  	z := ""
   589  	if address.Zone != nil {
   590  		z = *address.Zone
   591  	}
   592  	weightedAddr.MetricLabels["zone"] = z
   593  
   594  	return &weightedAddr, nil
   595  }
   596  
   597  func createWeightedAddr(
   598  	address watcher.Address,
   599  	opaquePorts map[uint32]struct{},
   600  	enableH2Upgrade bool,
   601  	identityTrustDomain string,
   602  	controllerNS string,
   603  	meshedHttp2 *pb.Http2ClientParams,
   604  ) (*pb.WeightedAddr, error) {
   605  	tcpAddr, err := toAddr(address)
   606  	if err != nil {
   607  		return nil, err
   608  	}
   609  
   610  	weightedAddr := pb.WeightedAddr{
   611  		Addr:         tcpAddr,
   612  		Weight:       defaultWeight,
   613  		MetricLabels: map[string]string{},
   614  	}
   615  
   616  	// If the address is not backed by a pod, there is no additional metadata
   617  	// to add.
   618  	if address.Pod == nil {
   619  		return &weightedAddr, nil
   620  	}
   621  
   622  	skippedInboundPorts := getPodSkippedInboundPortsAnnotations(address.Pod)
   623  
   624  	controllerNSLabel := address.Pod.Labels[pkgK8s.ControllerNSLabel]
   625  	sa, ns := pkgK8s.GetServiceAccountAndNS(address.Pod)
   626  	weightedAddr.MetricLabels = pkgK8s.GetPodLabels(address.OwnerKind, address.OwnerName, address.Pod)
   627  
   628  	// Set a zone label, even if it is empty (for consistency).
   629  	z := ""
   630  	if address.Zone != nil {
   631  		z = *address.Zone
   632  	}
   633  	weightedAddr.MetricLabels["zone"] = z
   634  
   635  	_, isSkippedInboundPort := skippedInboundPorts[address.Port]
   636  
   637  	if controllerNSLabel != "" && !isSkippedInboundPort {
   638  		weightedAddr.Http2 = meshedHttp2
   639  		weightedAddr.ProtocolHint = &pb.ProtocolHint{}
   640  
   641  		_, opaquePort := opaquePorts[address.Port]
   642  		// If address is set as opaque by a Server, or its port is set as
   643  		// opaque by annotation or default value, then set the hinted protocol to
   644  		// Opaque.
   645  		if address.OpaqueProtocol || opaquePort {
   646  			weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{
   647  				Opaque: &pb.ProtocolHint_Opaque{},
   648  			}
   649  
   650  			port, err := getInboundPort(&address.Pod.Spec)
   651  			if err != nil {
   652  				return nil, fmt.Errorf("failed to read inbound port: %w", err)
   653  			}
   654  			weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
   655  				InboundPort: port,
   656  			}
   657  		} else if enableH2Upgrade {
   658  			// If the pod is controlled by any Linkerd control plane, then it can be
   659  			// hinted that this destination knows H2 (and handles our orig-proto
   660  			// translation)
   661  			weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{
   662  				H2: &pb.ProtocolHint_H2{},
   663  			}
   664  		}
   665  	}
   666  
   667  	// If the pod is controlled by the same Linkerd control plane, then it can
   668  	// participate in identity with peers.
   669  	//
   670  	// TODO this should be relaxed to match a trust domain annotation so that
   671  	// multiple meshes can participate in identity if they share trust roots.
   672  	if identityTrustDomain != "" &&
   673  		controllerNSLabel == controllerNS &&
   674  		!isSkippedInboundPort {
   675  
   676  		id := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", sa, ns, controllerNSLabel, identityTrustDomain)
   677  		tlsId := &pb.TlsIdentity_DnsLikeIdentity{Name: id}
   678  
   679  		weightedAddr.TlsIdentity = &pb.TlsIdentity{
   680  			Strategy: &pb.TlsIdentity_DnsLikeIdentity_{
   681  				DnsLikeIdentity: tlsId,
   682  			},
   683  			ServerName: tlsId,
   684  		}
   685  	}
   686  
   687  	return &weightedAddr, nil
   688  }
   689  
   690  func getNodeTopologyZone(k8sAPI *k8s.MetadataAPI, srcNode string) (string, error) {
   691  	node, err := k8sAPI.Get(k8s.Node, srcNode)
   692  	if err != nil {
   693  		return "", err
   694  	}
   695  	if zone, ok := node.Labels[corev1.LabelTopologyZone]; ok {
   696  		return zone, nil
   697  	}
   698  	return "", nil
   699  }
   700  
   701  func newEmptyAddressSet() watcher.AddressSet {
   702  	return watcher.AddressSet{
   703  		Addresses: make(map[watcher.ID]watcher.Address),
   704  		Labels:    make(map[string]string),
   705  	}
   706  }
   707  
   708  // getInboundPort gets the inbound port from the proxy container's environment
   709  // variable.
   710  func getInboundPort(podSpec *corev1.PodSpec) (uint32, error) {
   711  	containers := append(podSpec.InitContainers, podSpec.Containers...)
   712  	for _, containerSpec := range containers {
   713  		if containerSpec.Name != pkgK8s.ProxyContainerName {
   714  			continue
   715  		}
   716  		for _, envVar := range containerSpec.Env {
   717  			if envVar.Name != envInboundListenAddr {
   718  				continue
   719  			}
   720  			addrPort, err := netip.ParseAddrPort(envVar.Value)
   721  			if err != nil {
   722  				return 0, fmt.Errorf("failed to parse inbound port for proxy container: %w", err)
   723  			}
   724  
   725  			return uint32(addrPort.Port()), nil
   726  		}
   727  	}
   728  	return 0, fmt.Errorf("failed to find %s environment variable in any container for given pod spec", envInboundListenAddr)
   729  }
   730  
   731  // getInboundPortFromExternalWorkload gets the inbound port from the ExternalWorkload spec
   732  // variable.
   733  func getInboundPortFromExternalWorkload(ewSpec *ewv1beta1.ExternalWorkloadSpec) (uint32, error) {
   734  	for _, p := range ewSpec.Ports {
   735  		if p.Name == pkgK8s.ProxyPortName {
   736  			return uint32(p.Port), nil
   737  		}
   738  	}
   739  
   740  	return 0, fmt.Errorf("failed to find %s port for given ExternalWorkloadSpec", pkgK8s.ProxyPortName)
   741  }
   742  

View as plain text