...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/server.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"encoding/json"
     5  	"errors"
     6  	"fmt"
     7  	"net"
     8  	"strconv"
     9  	"strings"
    10  
    11  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    12  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
    13  	sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
    14  	"github.com/linkerd/linkerd2/controller/k8s"
    15  	labels "github.com/linkerd/linkerd2/pkg/k8s"
    16  	"github.com/linkerd/linkerd2/pkg/prometheus"
    17  	"github.com/linkerd/linkerd2/pkg/util"
    18  	logging "github.com/sirupsen/logrus"
    19  	"google.golang.org/grpc"
    20  	"google.golang.org/grpc/codes"
    21  	"google.golang.org/grpc/peer"
    22  	"google.golang.org/grpc/status"
    23  	corev1 "k8s.io/api/core/v1"
    24  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    25  )
    26  
    27  type (
    28  	Config struct {
    29  		ControllerNS,
    30  		IdentityTrustDomain,
    31  		ClusterDomain string
    32  
    33  		EnableH2Upgrade,
    34  		EnableEndpointSlices,
    35  		EnableIPv6,
    36  		ExtEndpointZoneWeights bool
    37  
    38  		MeshedHttp2ClientParams *pb.Http2ClientParams
    39  
    40  		DefaultOpaquePorts map[uint32]struct{}
    41  	}
    42  
    43  	server struct {
    44  		pb.UnimplementedDestinationServer
    45  
    46  		config Config
    47  
    48  		workloads    *watcher.WorkloadWatcher
    49  		endpoints    *watcher.EndpointsWatcher
    50  		opaquePorts  *watcher.OpaquePortsWatcher
    51  		profiles     *watcher.ProfileWatcher
    52  		clusterStore *watcher.ClusterStore
    53  
    54  		k8sAPI      *k8s.API
    55  		metadataAPI *k8s.MetadataAPI
    56  		log         *logging.Entry
    57  		shutdown    <-chan struct{}
    58  	}
    59  )
    60  
    61  // NewServer returns a new instance of the destination server.
    62  //
    63  // The destination server serves service discovery and other information to the
    64  // proxy.  This implementation supports the "k8s" destination scheme and expects
    65  // destination paths to be of the form:
    66  // <service>.<namespace>.svc.cluster.local:<port>
    67  //
    68  // If the port is omitted, 80 is used as a default.  If the namespace is
    69  // omitted, "default" is used as a default.append
    70  //
    71  // Addresses for the given destination are fetched from the Kubernetes Endpoints
    72  // API.
    73  func NewServer(
    74  	addr string,
    75  	config Config,
    76  	k8sAPI *k8s.API,
    77  	metadataAPI *k8s.MetadataAPI,
    78  	clusterStore *watcher.ClusterStore,
    79  	shutdown <-chan struct{},
    80  ) (*grpc.Server, error) {
    81  	log := logging.WithFields(logging.Fields{
    82  		"addr":      addr,
    83  		"component": "server",
    84  	})
    85  
    86  	// Initialize indexers that are used across watchers
    87  	err := watcher.InitializeIndexers(k8sAPI)
    88  	if err != nil {
    89  		return nil, err
    90  	}
    91  
    92  	workloads, err := watcher.NewWorkloadWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, config.DefaultOpaquePorts)
    93  	if err != nil {
    94  		return nil, err
    95  	}
    96  	endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, metadataAPI, log, config.EnableEndpointSlices, "local")
    97  	if err != nil {
    98  		return nil, err
    99  	}
   100  	opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, config.DefaultOpaquePorts)
   101  	if err != nil {
   102  		return nil, err
   103  	}
   104  	profiles, err := watcher.NewProfileWatcher(k8sAPI, log)
   105  	if err != nil {
   106  		return nil, err
   107  	}
   108  
   109  	srv := server{
   110  		pb.UnimplementedDestinationServer{},
   111  		config,
   112  		workloads,
   113  		endpoints,
   114  		opaquePorts,
   115  		profiles,
   116  		clusterStore,
   117  		k8sAPI,
   118  		metadataAPI,
   119  		log,
   120  		shutdown,
   121  	}
   122  
   123  	s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
   124  	// linkerd2-proxy-api/destination.Destination (proxy-facing)
   125  	pb.RegisterDestinationServer(s, &srv)
   126  	return s, nil
   127  }
   128  
   129  func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) error {
   130  	log := s.log
   131  
   132  	client, _ := peer.FromContext(stream.Context())
   133  	if client != nil {
   134  		log = log.WithField("remote", client.Addr)
   135  	}
   136  
   137  	var token contextToken
   138  	if dest.GetContextToken() != "" {
   139  		log.Debugf("Dest token: %q", dest.GetContextToken())
   140  		token = s.parseContextToken(dest.GetContextToken())
   141  		log = log.WithFields(logging.Fields{"context-pod": token.Pod, "context-ns": token.Ns})
   142  	}
   143  
   144  	log.Debugf("Get %s", dest.GetPath())
   145  
   146  	streamEnd := make(chan struct{})
   147  	// The host must be fully-qualified or be an IP address.
   148  	host, port, err := getHostAndPort(dest.GetPath())
   149  	if err != nil {
   150  		log.Debugf("Invalid service %s", dest.GetPath())
   151  		return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
   152  	}
   153  
   154  	// Return error for an IP query
   155  	if ip := net.ParseIP(host); ip != nil {
   156  		return status.Errorf(codes.InvalidArgument, "IP queries not supported by Get API: host=%s", host)
   157  	}
   158  
   159  	service, instanceID, err := parseK8sServiceName(host, s.config.ClusterDomain)
   160  	if err != nil {
   161  		log.Debugf("Invalid service %s", dest.GetPath())
   162  		return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
   163  	}
   164  
   165  	svc, err := s.k8sAPI.Svc().Lister().Services(service.Namespace).Get(service.Name)
   166  	if err != nil {
   167  		if kerrors.IsNotFound(err) {
   168  			log.Debugf("Service not found %s", service)
   169  			return status.Errorf(codes.NotFound, "Service %s.%s not found", service.Name, service.Namespace)
   170  		}
   171  		log.Debugf("Failed to get service %s: %v", service, err)
   172  		return status.Errorf(codes.Internal, "Failed to get service %s", dest.GetPath())
   173  	}
   174  
   175  	if cluster, found := svc.Labels[labels.RemoteDiscoveryLabel]; found {
   176  		// Remote discovery
   177  		remoteSvc, found := svc.Labels[labels.RemoteServiceLabel]
   178  		if !found {
   179  			log.Debugf("Remote discovery service missing remote service name %s", service)
   180  			return status.Errorf(codes.FailedPrecondition, "Remote discovery service missing remote service name %s", dest.GetPath())
   181  		}
   182  		remoteWatcher, remoteConfig, found := s.clusterStore.Get(cluster)
   183  		if !found {
   184  			log.Errorf("Failed to get remote cluster %s", cluster)
   185  			return status.Errorf(codes.NotFound, "Remote cluster not found: %s", cluster)
   186  		}
   187  		translator := newEndpointTranslator(
   188  			s.config.ControllerNS,
   189  			remoteConfig.TrustDomain,
   190  			s.config.EnableH2Upgrade,
   191  			false, // Disable endpoint filtering for remote discovery.
   192  			s.config.EnableIPv6,
   193  			s.config.ExtEndpointZoneWeights,
   194  			s.config.MeshedHttp2ClientParams,
   195  			fmt.Sprintf("%s.%s.svc.%s:%d", remoteSvc, service.Namespace, remoteConfig.ClusterDomain, port),
   196  			token.NodeName,
   197  			s.config.DefaultOpaquePorts,
   198  			s.metadataAPI,
   199  			stream,
   200  			streamEnd,
   201  			log,
   202  		)
   203  		translator.Start()
   204  		defer translator.Stop()
   205  
   206  		err = remoteWatcher.Subscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator)
   207  		if err != nil {
   208  			var ise watcher.InvalidService
   209  			if errors.As(err, &ise) {
   210  				log.Debugf("Invalid remote discovery service %s", dest.GetPath())
   211  				return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
   212  			}
   213  			log.Errorf("Failed to subscribe to remote disocvery service %q in cluster %s: %s", dest.GetPath(), cluster, err)
   214  			return err
   215  		}
   216  		defer remoteWatcher.Unsubscribe(watcher.ServiceID{Namespace: service.Namespace, Name: remoteSvc}, port, instanceID, translator)
   217  
   218  	} else {
   219  		// Local discovery
   220  		translator := newEndpointTranslator(
   221  			s.config.ControllerNS,
   222  			s.config.IdentityTrustDomain,
   223  			s.config.EnableH2Upgrade,
   224  			true,
   225  			s.config.EnableIPv6,
   226  			s.config.ExtEndpointZoneWeights,
   227  			s.config.MeshedHttp2ClientParams,
   228  			dest.GetPath(),
   229  			token.NodeName,
   230  			s.config.DefaultOpaquePorts,
   231  			s.metadataAPI,
   232  			stream,
   233  			streamEnd,
   234  			log,
   235  		)
   236  		translator.Start()
   237  		defer translator.Stop()
   238  
   239  		err = s.endpoints.Subscribe(service, port, instanceID, translator)
   240  		if err != nil {
   241  			var ise watcher.InvalidService
   242  			if errors.As(err, &ise) {
   243  				log.Debugf("Invalid service %s", dest.GetPath())
   244  				return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
   245  			}
   246  			log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err)
   247  			return err
   248  		}
   249  		defer s.endpoints.Unsubscribe(service, port, instanceID, translator)
   250  	}
   251  
   252  	select {
   253  	case <-s.shutdown:
   254  	case <-stream.Context().Done():
   255  		log.Debugf("Get %s cancelled", dest.GetPath())
   256  	case <-streamEnd:
   257  		log.Errorf("Get %s stream aborted", dest.GetPath())
   258  	}
   259  
   260  	return nil
   261  }
   262  
   263  func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetProfileServer) error {
   264  	log := s.log
   265  
   266  	client, _ := peer.FromContext(stream.Context())
   267  	if client != nil {
   268  		log = log.WithField("remote", client.Addr)
   269  	}
   270  
   271  	var token contextToken
   272  	if dest.GetContextToken() != "" {
   273  		log.Debugf("Dest token: %q", dest.GetContextToken())
   274  		token = s.parseContextToken(dest.GetContextToken())
   275  		log = log.WithFields(logging.Fields{"context-pod": token.Pod, "context-ns": token.Ns})
   276  	}
   277  
   278  	log.Debugf("Getting profile for %s", dest.GetPath())
   279  
   280  	// The host must be fully-qualified or be an IP address.
   281  	host, port, err := getHostAndPort(dest.GetPath())
   282  	if err != nil {
   283  		log.Debugf("Invalid address %q", dest.GetPath())
   284  		return status.Errorf(codes.InvalidArgument, "invalid authority: %q: %q", dest.GetPath(), err)
   285  	}
   286  
   287  	if ip := net.ParseIP(host); ip != nil {
   288  		err = s.getProfileByIP(token, ip, port, log, stream)
   289  		if err != nil {
   290  			var ise watcher.InvalidService
   291  			if errors.As(err, &ise) {
   292  				log.Debugf("Invalid service %s", dest.GetPath())
   293  				return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
   294  			}
   295  			log.Errorf("Failed to subscribe to profile by ip %q: %q", dest.GetPath(), err)
   296  		}
   297  		return err
   298  	}
   299  
   300  	err = s.getProfileByName(token, host, port, log, stream)
   301  	if err != nil {
   302  		var ise watcher.InvalidService
   303  		if errors.As(err, &ise) {
   304  			log.Debugf("Invalid service %s", dest.GetPath())
   305  			return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
   306  		}
   307  		log.Errorf("Failed to subscribe to profile by name %q: %q", dest.GetPath(), err)
   308  	}
   309  	return err
   310  }
   311  
   312  func (s *server) getProfileByIP(
   313  	token contextToken,
   314  	ip net.IP,
   315  	port uint32,
   316  	log *logging.Entry,
   317  	stream pb.Destination_GetProfileServer,
   318  ) error {
   319  	// Get the service that the IP currently maps to.
   320  	svcID, err := getSvcID(s.k8sAPI, ip.String(), s.log)
   321  	if err != nil {
   322  		return err
   323  	}
   324  
   325  	if svcID == nil {
   326  		return s.subscribeToEndpointProfile(nil, "", ip.String(), port, log, stream)
   327  	}
   328  
   329  	fqn := fmt.Sprintf("%s.%s.svc.%s", svcID.Name, svcID.Namespace, s.config.ClusterDomain)
   330  	return s.subscribeToServiceProfile(*svcID, token, fqn, port, log, stream)
   331  }
   332  
   333  func (s *server) getProfileByName(
   334  	token contextToken,
   335  	host string,
   336  	port uint32,
   337  	log *logging.Entry,
   338  	stream pb.Destination_GetProfileServer,
   339  ) error {
   340  	service, hostname, err := parseK8sServiceName(host, s.config.ClusterDomain)
   341  	if err != nil {
   342  		s.log.Debugf("Invalid service %s", host)
   343  		return status.Errorf(codes.InvalidArgument, "invalid service %q: %q", host, err)
   344  	}
   345  
   346  	// If the pod name (instance ID) is not empty, it means we parsed a DNS
   347  	// name. When we fetch the profile using a pod's DNS name, we want to
   348  	// return an endpoint in the profile response.
   349  	if hostname != "" {
   350  		return s.subscribeToEndpointProfile(&service, hostname, "", port, log, stream)
   351  	}
   352  
   353  	return s.subscribeToServiceProfile(service, token, host, port, log, stream)
   354  }
   355  
   356  // Resolves a profile for a service, sending updates to the provided stream.
   357  //
   358  // This function does not return until the stream is closed.
   359  func (s *server) subscribeToServiceProfile(
   360  	service watcher.ID,
   361  	token contextToken,
   362  	fqn string,
   363  	port uint32,
   364  	log *logging.Entry,
   365  	stream pb.Destination_GetProfileServer,
   366  ) error {
   367  	log = log.
   368  		WithField("ns", service.Namespace).
   369  		WithField("svc", service.Name).
   370  		WithField("port", port)
   371  
   372  	canceled := stream.Context().Done()
   373  	streamEnd := make(chan struct{})
   374  
   375  	// We build up the pipeline of profile updaters backwards, starting from
   376  	// the translator which takes profile updates, translates them to protobuf
   377  	// and pushes them onto the gRPC stream.
   378  	translator := newProfileTranslator(stream, log, fqn, port, streamEnd)
   379  	translator.Start()
   380  	defer translator.Stop()
   381  
   382  	// The opaque ports adaptor merges profile updates with service opaque
   383  	// port annotation updates; it then publishes the result to the traffic
   384  	// split adaptor.
   385  	opaquePortsAdaptor := newOpaquePortsAdaptor(translator)
   386  
   387  	// Create an adaptor that merges service-level opaque port configurations
   388  	// onto profile updates.
   389  	err := s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
   390  	if err != nil {
   391  		log.Warnf("Failed to subscribe to service updates for %s: %s", service, err)
   392  		return err
   393  	}
   394  	defer s.opaquePorts.Unsubscribe(service, opaquePortsAdaptor)
   395  
   396  	// Ensure that (1) nil values are turned into a default policy and (2)
   397  	// subsequent updates that refer to same service profile object are
   398  	// deduplicated to prevent sending redundant updates.
   399  	dup := newDedupProfileListener(opaquePortsAdaptor, log)
   400  	defaultProfile := sp.ServiceProfile{}
   401  	listener := newDefaultProfileListener(&defaultProfile, dup, log)
   402  
   403  	// The primary lookup uses the context token to determine the requester's
   404  	// namespace. If there's no namespace in the token, start a single
   405  	// subscription.
   406  	if token.Ns == "" {
   407  		return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log, streamEnd)
   408  	}
   409  	return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log, streamEnd)
   410  }
   411  
   412  // subscribeToServicesWithContext establishes two profile watches: a "backup"
   413  // watch (ignoring the client namespace) and a preferred "primary" watch
   414  // assuming the client's context. Once updates are received for both watches, we
   415  // select over both watches to send profile updates to the stream. A nil update
   416  // may be sent if both the primary and backup watches are initialized with a nil
   417  // value.
   418  func (s *server) subscribeToServicesWithContext(
   419  	fqn string,
   420  	token contextToken,
   421  	listener watcher.ProfileUpdateListener,
   422  	canceled <-chan struct{},
   423  	log *logging.Entry,
   424  	streamEnd <-chan struct{},
   425  ) error {
   426  	// We ned to support two subscriptions:
   427  	// - First, a backup subscription that assumes the context of the server
   428  	//   namespace.
   429  	// - And then, a primary subscription that assumes the context of the client
   430  	//   namespace.
   431  	primary, backup := newFallbackProfileListener(listener, log)
   432  
   433  	// The backup lookup ignores the context token to lookup any
   434  	// server-namespace-hosted profiles.
   435  	backupID, err := profileID(fqn, contextToken{}, s.config.ClusterDomain)
   436  	if err != nil {
   437  		log.Debug("Invalid service")
   438  		return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
   439  	}
   440  	err = s.profiles.Subscribe(backupID, backup)
   441  	if err != nil {
   442  		log.Warnf("Failed to subscribe to profile: %s", err)
   443  		return err
   444  	}
   445  	defer s.profiles.Unsubscribe(backupID, backup)
   446  
   447  	primaryID, err := profileID(fqn, token, s.config.ClusterDomain)
   448  	if err != nil {
   449  		log.Debug("Invalid service")
   450  		return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
   451  	}
   452  	err = s.profiles.Subscribe(primaryID, primary)
   453  	if err != nil {
   454  		log.Warnf("Failed to subscribe to profile: %s", err)
   455  		return err
   456  	}
   457  	defer s.profiles.Unsubscribe(primaryID, primary)
   458  
   459  	select {
   460  	case <-s.shutdown:
   461  	case <-canceled:
   462  		log.Debugf("GetProfile %s cancelled", fqn)
   463  	case <-streamEnd:
   464  		log.Errorf("GetProfile %s stream aborted", fqn)
   465  	}
   466  	return nil
   467  }
   468  
   469  // subscribeToServiceWithoutContext establishes a single profile watch, assuming
   470  // no client context. All udpates are published to the provided listener.
   471  func (s *server) subscribeToServiceWithoutContext(
   472  	fqn string,
   473  	listener watcher.ProfileUpdateListener,
   474  	canceled <-chan struct{},
   475  	log *logging.Entry,
   476  	streamEnd <-chan struct{},
   477  ) error {
   478  	id, err := profileID(fqn, contextToken{}, s.config.ClusterDomain)
   479  	if err != nil {
   480  		log.Debug("Invalid service")
   481  		return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
   482  	}
   483  	err = s.profiles.Subscribe(id, listener)
   484  	if err != nil {
   485  		log.Warnf("Failed to subscribe to profile: %s", err)
   486  		return err
   487  	}
   488  	defer s.profiles.Unsubscribe(id, listener)
   489  
   490  	select {
   491  	case <-s.shutdown:
   492  	case <-canceled:
   493  		log.Debugf("GetProfile %s cancelled", fqn)
   494  	case <-streamEnd:
   495  		log.Errorf("GetProfile %s stream aborted", fqn)
   496  	}
   497  	return nil
   498  }
   499  
   500  // Resolves a profile for a single endpoint, sending updates to the provided
   501  // stream.
   502  //
   503  // This function does not return until the stream is closed.
   504  func (s *server) subscribeToEndpointProfile(
   505  	service *watcher.ServiceID,
   506  	hostname,
   507  	ip string,
   508  	port uint32,
   509  	log *logging.Entry,
   510  	stream pb.Destination_GetProfileServer,
   511  ) error {
   512  	canceled := stream.Context().Done()
   513  	streamEnd := make(chan struct{})
   514  	translator := newEndpointProfileTranslator(
   515  		s.config.EnableH2Upgrade,
   516  		s.config.ControllerNS,
   517  		s.config.IdentityTrustDomain,
   518  		s.config.DefaultOpaquePorts,
   519  		s.config.MeshedHttp2ClientParams,
   520  		stream,
   521  		streamEnd,
   522  		log,
   523  	)
   524  	translator.Start()
   525  	defer translator.Stop()
   526  
   527  	var err error
   528  	ip, err = s.workloads.Subscribe(service, hostname, ip, port, translator)
   529  	if err != nil {
   530  		return err
   531  	}
   532  	defer s.workloads.Unsubscribe(ip, port, translator)
   533  
   534  	select {
   535  	case <-s.shutdown:
   536  	case <-canceled:
   537  		s.log.Debugf("Cancelled")
   538  	case <-streamEnd:
   539  		log.Errorf("GetProfile %s:%d stream aborted", ip, port)
   540  	}
   541  	return nil
   542  }
   543  
   544  // getSvcID returns the service that corresponds to a Cluster IP address if one
   545  // exists.
   546  func getSvcID(k8sAPI *k8s.API, clusterIP string, log *logging.Entry) (*watcher.ServiceID, error) {
   547  	objs, err := k8sAPI.Svc().Informer().GetIndexer().ByIndex(watcher.PodIPIndex, clusterIP)
   548  	if err != nil {
   549  		return nil, status.Error(codes.Unknown, err.Error())
   550  	}
   551  	services := make([]*corev1.Service, 0)
   552  	for _, obj := range objs {
   553  		service := obj.(*corev1.Service)
   554  		services = append(services, service)
   555  	}
   556  	if len(services) > 1 {
   557  		conflictingServices := []string{}
   558  		for _, service := range services {
   559  			conflictingServices = append(conflictingServices, fmt.Sprintf("%s:%s", service.Namespace, service.Name))
   560  		}
   561  		log.Warnf("found conflicting %s cluster IP: %s", clusterIP, strings.Join(conflictingServices, ","))
   562  		return nil, status.Errorf(codes.FailedPrecondition, "found %d services with conflicting cluster IP %s", len(services), clusterIP)
   563  	}
   564  	if len(services) == 0 {
   565  		return nil, nil
   566  	}
   567  	service := &watcher.ServiceID{
   568  		Namespace: services[0].Namespace,
   569  		Name:      services[0].Name,
   570  	}
   571  	return service, nil
   572  }
   573  
   574  ////////////
   575  /// util ///
   576  ////////////
   577  
   578  type contextToken struct {
   579  	Ns       string `json:"ns,omitempty"`
   580  	NodeName string `json:"nodeName,omitempty"`
   581  	Pod      string `json:"pod,omitempty"`
   582  }
   583  
   584  func (s *server) parseContextToken(token string) contextToken {
   585  	ctxToken := contextToken{}
   586  	if token == "" {
   587  		return ctxToken
   588  	}
   589  	if err := json.Unmarshal([]byte(token), &ctxToken); err != nil {
   590  		// if json is invalid, means token can have ns:<namespace> form
   591  		parts := strings.Split(token, ":")
   592  		if len(parts) == 2 && parts[0] == "ns" {
   593  			s.log.Warnf("context token %s using old token format", token)
   594  			ctxToken = contextToken{
   595  				Ns: parts[1],
   596  			}
   597  		} else {
   598  			s.log.Errorf("context token %s is invalid: %s", token, err)
   599  		}
   600  	}
   601  	return ctxToken
   602  }
   603  
   604  func profileID(authority string, ctxToken contextToken, clusterDomain string) (watcher.ProfileID, error) {
   605  	host, _, err := getHostAndPort(authority)
   606  	if err != nil {
   607  		return watcher.ProfileID{}, fmt.Errorf("invalid authority: %w", err)
   608  	}
   609  	service, _, err := parseK8sServiceName(host, clusterDomain)
   610  	if err != nil {
   611  		return watcher.ProfileID{}, fmt.Errorf("invalid k8s service name: %w", err)
   612  	}
   613  	id := watcher.ProfileID{
   614  		Name:      fmt.Sprintf("%s.%s.svc.%s", service.Name, service.Namespace, clusterDomain),
   615  		Namespace: service.Namespace,
   616  	}
   617  	if ctxToken.Ns != "" {
   618  		id.Namespace = ctxToken.Ns
   619  	}
   620  	return id, nil
   621  }
   622  
   623  func getHostAndPort(authority string) (string, watcher.Port, error) {
   624  	if !strings.Contains(authority, ":") {
   625  		return authority, watcher.Port(80), nil
   626  	}
   627  
   628  	host, sport, err := net.SplitHostPort(authority)
   629  	if err != nil {
   630  		return "", 0, fmt.Errorf("invalid destination: %w", err)
   631  	}
   632  	port, err := strconv.Atoi(sport)
   633  	if err != nil {
   634  		return "", 0, fmt.Errorf("invalid port %s: %w", sport, err)
   635  	}
   636  	if port <= 0 || port > 65535 {
   637  		return "", 0, fmt.Errorf("invalid port %d", port)
   638  	}
   639  	return host, watcher.Port(port), nil
   640  }
   641  
   642  type instanceID = string
   643  
   644  // parseK8sServiceName is a utility that destructures a Kubernetes service hostname into its constituent components.
   645  //
   646  // If the authority does not represent a Kubernetes service, an error is returned.
   647  //
   648  // If the hostname is a pod DNS name, then the pod's name (instanceID) is returned
   649  // as well. See https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/.
   650  func parseK8sServiceName(fqdn, clusterDomain string) (watcher.ServiceID, instanceID, error) {
   651  	labels := strings.Split(fqdn, ".")
   652  	suffix := append([]string{"svc"}, strings.Split(clusterDomain, ".")...)
   653  
   654  	if !hasSuffix(labels, suffix) {
   655  		return watcher.ServiceID{}, "", fmt.Errorf("name %s does not match cluster domain %s", fqdn, clusterDomain)
   656  	}
   657  
   658  	n := len(labels)
   659  	if n == 2+len(suffix) {
   660  		// <service>.<namespace>.<suffix>
   661  		service := watcher.ServiceID{
   662  			Name:      labels[0],
   663  			Namespace: labels[1],
   664  		}
   665  		return service, "", nil
   666  	}
   667  
   668  	if n == 3+len(suffix) {
   669  		// <instance-id>.<service>.<namespace>.<suffix>
   670  		instanceID := labels[0]
   671  		service := watcher.ServiceID{
   672  			Name:      labels[1],
   673  			Namespace: labels[2],
   674  		}
   675  		return service, instanceID, nil
   676  	}
   677  
   678  	return watcher.ServiceID{}, "", fmt.Errorf("invalid k8s service %s", fqdn)
   679  }
   680  
   681  func hasSuffix(slice []string, suffix []string) bool {
   682  	if len(slice) < len(suffix) {
   683  		return false
   684  	}
   685  	for i, s := range slice[len(slice)-len(suffix):] {
   686  		if s != suffix[i] {
   687  			return false
   688  		}
   689  	}
   690  	return true
   691  }
   692  
   693  func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) map[uint32]struct{} {
   694  	annotation, ok := pod.Annotations[labels.ProxyIgnoreInboundPortsAnnotation]
   695  	if !ok || annotation == "" {
   696  		return nil
   697  	}
   698  
   699  	return util.ParsePorts(annotation)
   700  }
   701  

View as plain text