...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/server_test.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	gonet "net"
     7  	"net/netip"
     8  	"reflect"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/golang/protobuf/ptypes/duration"
    13  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    14  	"github.com/linkerd/linkerd2-proxy-api/go/net"
    15  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
    16  	"github.com/linkerd/linkerd2/controller/api/util"
    17  	"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
    18  	"github.com/linkerd/linkerd2/controller/k8s"
    19  	"github.com/linkerd/linkerd2/pkg/addr"
    20  	pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
    21  	"github.com/linkerd/linkerd2/testutil"
    22  	logging "github.com/sirupsen/logrus"
    23  	"google.golang.org/grpc/codes"
    24  	"google.golang.org/grpc/status"
    25  	corev1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/util/intstr"
    28  )
    29  
    30  const fullyQualifiedName = "name1.ns.svc.mycluster.local"
    31  const fullyQualifiedNameIPv6 = "name-ipv6.ns.svc.mycluster.local"
    32  const fullyQualifiedNameDual = "name-ds.ns.svc.mycluster.local"
    33  const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
    34  const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
    35  const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
    36  const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
    37  const clusterIP = "172.17.12.0"
    38  const clusterIPv6 = "2001:db8::88"
    39  const clusterIPOpaque = "172.17.12.1"
    40  const podIP1 = "172.17.0.12"
    41  const podIP1v6 = "2001:db8::68"
    42  const podIPv6Dual = "2001:db8::94"
    43  const podIP2 = "172.17.0.13"
    44  const podIPOpaque = "172.17.0.14"
    45  const podIPSkipped = "172.17.0.15"
    46  const podIPPolicy = "172.17.0.16"
    47  const podIPStatefulSet = "172.17.13.15"
    48  const externalIP = "192.168.1.20"
    49  const externalIPv6 = "2001:db8::78"
    50  const externalWorkloadIP = "200.1.1.1"
    51  const externalWorkloadIPPolicy = "200.1.1.2"
    52  const port uint32 = 8989
    53  const opaquePort uint32 = 4242
    54  const skippedPort uint32 = 24224
    55  
    56  func TestGet(t *testing.T) {
    57  	t.Run("Returns error if not valid service name", func(t *testing.T) {
    58  		server := makeServer(t)
    59  		defer server.clusterStore.UnregisterGauges()
    60  
    61  		stream := &bufferingGetStream{
    62  			updates:          make(chan *pb.Update, 50),
    63  			MockServerStream: util.NewMockServerStream(),
    64  		}
    65  
    66  		err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
    67  		if err == nil {
    68  			t.Fatalf("Expecting error, got nothing")
    69  		}
    70  	})
    71  
    72  	t.Run("Returns InvalidArgument for ExternalName service", func(t *testing.T) {
    73  		server := makeServer(t)
    74  		defer server.clusterStore.UnregisterGauges()
    75  
    76  		stream := &bufferingGetStream{
    77  			updates:          make(chan *pb.Update, 50),
    78  			MockServerStream: util.NewMockServerStream(),
    79  		}
    80  
    81  		err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: "externalname.ns.svc.cluster.local"}, stream)
    82  
    83  		code := status.Code(err)
    84  		if code != codes.InvalidArgument {
    85  			t.Fatalf("Expected InvalidArgument, got %s", code)
    86  		}
    87  	})
    88  
    89  	t.Run("Returns endpoints (IPv4)", func(t *testing.T) {
    90  		testReturnEndpoints(t, fullyQualifiedName, podIP1, port)
    91  	})
    92  
    93  	t.Run("Returns endpoints (IPv6)", func(t *testing.T) {
    94  		testReturnEndpoints(t, fullyQualifiedNameIPv6, podIP1v6, port)
    95  	})
    96  
    97  	t.Run("Returns endpoints (dual-stack)", func(t *testing.T) {
    98  		testReturnEndpoints(t, fullyQualifiedNameDual, podIPv6Dual, port)
    99  	})
   100  
   101  	t.Run("Sets meshed HTTP/2 client params", func(t *testing.T) {
   102  		server := makeServer(t)
   103  		http2Params := pb.Http2ClientParams{
   104  			KeepAlive: &pb.Http2ClientParams_KeepAlive{
   105  				Timeout:  &duration.Duration{Seconds: 10},
   106  				Interval: &duration.Duration{Seconds: 20},
   107  			},
   108  		}
   109  		server.config.MeshedHttp2ClientParams = &http2Params
   110  		defer server.clusterStore.UnregisterGauges()
   111  
   112  		stream := &bufferingGetStream{
   113  			updates:          make(chan *pb.Update, 50),
   114  			MockServerStream: util.NewMockServerStream(),
   115  		}
   116  		defer stream.Cancel()
   117  		errs := make(chan error)
   118  
   119  		// server.Get blocks until the grpc stream is complete so we call it
   120  		// in a goroutine and watch stream.updates for updates.
   121  		go func() {
   122  			err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port)}, stream)
   123  			if err != nil {
   124  				errs <- err
   125  			}
   126  		}()
   127  
   128  		select {
   129  		case update := <-stream.updates:
   130  			add, ok := update.GetUpdate().(*pb.Update_Add)
   131  			if !ok {
   132  				t.Fatalf("Update expected to be an add, but was %+v", update)
   133  			}
   134  			addr := add.Add.Addrs[0]
   135  			if !reflect.DeepEqual(addr.GetHttp2(), &http2Params) {
   136  				t.Fatalf("Expected HTTP/2 client params to be %v, but got %v", &http2Params, addr.GetHttp2())
   137  			}
   138  		case err := <-errs:
   139  			t.Fatalf("Got error: %s", err)
   140  		}
   141  	})
   142  
   143  	t.Run("Does not set unmeshed HTTP/2 client params", func(t *testing.T) {
   144  		server := makeServer(t)
   145  		http2Params := pb.Http2ClientParams{
   146  			KeepAlive: &pb.Http2ClientParams_KeepAlive{
   147  				Timeout:  &duration.Duration{Seconds: 10},
   148  				Interval: &duration.Duration{Seconds: 20},
   149  			},
   150  		}
   151  		server.config.MeshedHttp2ClientParams = &http2Params
   152  		defer server.clusterStore.UnregisterGauges()
   153  
   154  		stream := &bufferingGetStream{
   155  			updates:          make(chan *pb.Update, 50),
   156  			MockServerStream: util.NewMockServerStream(),
   157  		}
   158  		defer stream.Cancel()
   159  		errs := make(chan error)
   160  
   161  		// server.Get blocks until the grpc stream is complete so we call it
   162  		// in a goroutine and watch stream.updates for updates.
   163  		go func() {
   164  			err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "name2.ns.svc.mycluster.local", port)}, stream)
   165  			if err != nil {
   166  				errs <- err
   167  			}
   168  		}()
   169  
   170  		select {
   171  		case update := <-stream.updates:
   172  			add, ok := update.GetUpdate().(*pb.Update_Add)
   173  			if !ok {
   174  				t.Fatalf("Update expected to be an add, but was %+v", update)
   175  			}
   176  			addr := add.Add.Addrs[0]
   177  			if addr.GetHttp2() != nil {
   178  				t.Fatalf("Expected HTTP/2 client params to be nil, but got %v", addr.GetHttp2())
   179  			}
   180  		case err := <-errs:
   181  			t.Fatalf("Got error: %s", err)
   182  		}
   183  	})
   184  
   185  	t.Run("Return endpoint with unknown protocol hint and identity when service name contains skipped inbound port", func(t *testing.T) {
   186  		server := makeServer(t)
   187  		defer server.clusterStore.UnregisterGauges()
   188  
   189  		stream := &bufferingGetStream{
   190  			updates:          make(chan *pb.Update, 50),
   191  			MockServerStream: util.NewMockServerStream(),
   192  		}
   193  		defer stream.Cancel()
   194  		errs := make(chan error)
   195  
   196  		path := fmt.Sprintf("%s:%d", fullyQualifiedNameSkipped, skippedPort)
   197  
   198  		// server.Get blocks until the grpc stream is complete so we call it
   199  		// in a goroutine and watch stream.updates for updates.
   200  		go func() {
   201  			err := server.Get(&pb.GetDestination{
   202  				Scheme: "k8s",
   203  				Path:   path,
   204  			}, stream)
   205  			if err != nil {
   206  				errs <- err
   207  			}
   208  		}()
   209  
   210  		select {
   211  		case update := <-stream.updates:
   212  			addrs := update.GetAdd().Addrs
   213  			if len(addrs) == 0 {
   214  				t.Fatalf("Expected len(addrs) to be > 0")
   215  			}
   216  
   217  			if addrs[0].GetProtocolHint().GetProtocol() != nil || addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
   218  				t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addrs[0].ProtocolHint)
   219  			}
   220  
   221  			if addrs[0].TlsIdentity != nil {
   222  				t.Fatalf("Expected TLS identity for %s to be nil but got %+v", path, addrs[0].TlsIdentity)
   223  			}
   224  		case err := <-errs:
   225  			t.Fatalf("Got error: %s", err)
   226  		}
   227  	})
   228  
   229  	t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
   230  		testOpaque(t, "policy-test")
   231  	})
   232  
   233  	t.Run("Return endpoint opaque protocol controlled by a server (native sidecar)", func(t *testing.T) {
   234  		testOpaque(t, "native")
   235  	})
   236  
   237  	t.Run("Remote discovery", func(t *testing.T) {
   238  		server := makeServer(t)
   239  		defer server.clusterStore.UnregisterGauges()
   240  
   241  		// Wait for cluster store to be synced.
   242  		time.Sleep(50 * time.Millisecond)
   243  
   244  		stream := &bufferingGetStream{
   245  			updates:          make(chan *pb.Update, 50),
   246  			MockServerStream: util.NewMockServerStream(),
   247  		}
   248  		defer stream.Cancel()
   249  		errs := make(chan error)
   250  
   251  		// server.Get blocks until the grpc stream is complete so we call it
   252  		// in a goroutine and watch stream.updates for updates.
   253  		go func() {
   254  			err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", "foo-target.ns.svc.mycluster.local", 80)}, stream)
   255  			if err != nil {
   256  				errs <- err
   257  			}
   258  		}()
   259  
   260  		select {
   261  		case update := <-stream.updates:
   262  			if updateAddAddress(t, update)[0] != fmt.Sprintf("%s:%d", "172.17.55.1", 80) {
   263  				t.Fatalf("Expected %s but got %s", fmt.Sprintf("%s:%d", podIP1, port), updateAddAddress(t, update)[0])
   264  			}
   265  
   266  			if len(stream.updates) != 0 {
   267  				t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
   268  			}
   269  
   270  		case err := <-errs:
   271  			t.Fatalf("Got error: %s", err)
   272  		}
   273  	})
   274  }
   275  
   276  func testOpaque(t *testing.T, name string) {
   277  	server, client := getServerWithClient(t)
   278  	defer server.clusterStore.UnregisterGauges()
   279  
   280  	stream := &bufferingGetStream{
   281  		updates:          make(chan *pb.Update, 50),
   282  		MockServerStream: util.NewMockServerStream(),
   283  	}
   284  	defer stream.Cancel()
   285  	errs := make(chan error)
   286  
   287  	path := fmt.Sprintf("%s.ns.svc.mycluster.local:%d", name, 80)
   288  
   289  	// server.Get blocks until the grpc stream is complete so we call it
   290  	// in a goroutine and watch stream.updates for updates.
   291  	go func() {
   292  		err := server.Get(&pb.GetDestination{
   293  			Scheme: "k8s",
   294  			Path:   path,
   295  		}, stream)
   296  		if err != nil {
   297  			errs <- err
   298  		}
   299  	}()
   300  
   301  	select {
   302  	case err := <-errs:
   303  		t.Fatalf("Got error: %s", err)
   304  	case update := <-stream.updates:
   305  		addrs := update.GetAdd().Addrs
   306  		if len(addrs) == 0 {
   307  			t.Fatalf("Expected len(addrs) to be > 0")
   308  		}
   309  
   310  		if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
   311  			t.Fatalf("Expected opaque transport for %s but was nil", path)
   312  		}
   313  	}
   314  
   315  	// Update the Server's pod selector so that it no longer selects the
   316  	// pod. This should result in the proxy protocol no longer being marked
   317  	// as opaque.
   318  	srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), name, metav1.GetOptions{})
   319  	if err != nil {
   320  		t.Fatal(err)
   321  	}
   322  	// PodSelector is updated to NOT select the pod
   323  	srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
   324  	_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
   325  	if err != nil {
   326  		t.Fatal(err)
   327  	}
   328  
   329  	select {
   330  	case update := <-stream.updates:
   331  		addrs := update.GetAdd().Addrs
   332  		if len(addrs) == 0 {
   333  			t.Fatalf("Expected len(addrs) to be > 0")
   334  		}
   335  
   336  		if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
   337  			t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
   338  		}
   339  	case err := <-errs:
   340  		t.Fatalf("Got error: %s", err)
   341  	}
   342  
   343  	// Update the Server's pod selector so that it once again selects the
   344  	// pod. This should result in the proxy protocol once again being marked
   345  	// as opaque.
   346  	srv.Spec.PodSelector.MatchLabels = map[string]string{"app": name}
   347  
   348  	_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
   349  	if err != nil {
   350  		t.Fatal(err)
   351  	}
   352  
   353  	select {
   354  	case update := <-stream.updates:
   355  		addrs := update.GetAdd().Addrs
   356  		if len(addrs) == 0 {
   357  			t.Fatalf("Expected len(addrs) to be > 0")
   358  		}
   359  
   360  		if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
   361  			t.Fatalf("Expected opaque transport for %s but was nil", path)
   362  		}
   363  	case err := <-errs:
   364  		t.Fatalf("Got error: %s", err)
   365  	}
   366  }
   367  
   368  func TestGetProfiles(t *testing.T) {
   369  	t.Run("Returns error if not valid service name", func(t *testing.T) {
   370  		server := makeServer(t)
   371  		defer server.clusterStore.UnregisterGauges()
   372  
   373  		stream := &bufferingGetProfileStream{
   374  			updates:          []*pb.DestinationProfile{},
   375  			MockServerStream: util.NewMockServerStream(),
   376  		}
   377  		defer stream.Cancel()
   378  		err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
   379  		if err == nil {
   380  			t.Fatalf("Expecting error, got nothing")
   381  		}
   382  	})
   383  
   384  	t.Run("Returns InvalidArgument for ExternalName service", func(t *testing.T) {
   385  		server := makeServer(t)
   386  		defer server.clusterStore.UnregisterGauges()
   387  
   388  		stream := &bufferingGetProfileStream{
   389  			updates:          []*pb.DestinationProfile{},
   390  			MockServerStream: util.NewMockServerStream(),
   391  		}
   392  		defer stream.Cancel()
   393  
   394  		err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "externalname.ns.svc.cluster.local"}, stream)
   395  		code := status.Code(err)
   396  		if code != codes.InvalidArgument {
   397  			t.Fatalf("Expected InvalidArgument, got %s", code)
   398  		}
   399  	})
   400  
   401  	t.Run("Returns server profile", func(t *testing.T) {
   402  		server := makeServer(t)
   403  		defer server.clusterStore.UnregisterGauges()
   404  
   405  		stream := profileStream(t, server, fullyQualifiedName, port, "ns:other")
   406  		defer stream.Cancel()
   407  		profile := assertSingleProfile(t, stream.Updates())
   408  		if profile.FullyQualifiedName != fullyQualifiedName {
   409  			t.Fatalf("Expected fully qualified name '%s', but got '%s'",
   410  				fullyQualifiedName, profile.FullyQualifiedName)
   411  		}
   412  		if profile.OpaqueProtocol {
   413  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   414  		}
   415  		routes := profile.GetRoutes()
   416  		if len(routes) != 1 {
   417  			t.Fatalf("Expected 0 routes but got %d: %v", len(routes), routes)
   418  		}
   419  	})
   420  
   421  	t.Run("Return service profile when using json token", func(t *testing.T) {
   422  		server := makeServer(t)
   423  		defer server.clusterStore.UnregisterGauges()
   424  
   425  		stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"other"}`)
   426  		defer stream.Cancel()
   427  		profile := assertSingleProfile(t, stream.Updates())
   428  		if profile.FullyQualifiedName != fullyQualifiedName {
   429  			t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
   430  		}
   431  		routes := profile.GetRoutes()
   432  		if len(routes) != 1 {
   433  			t.Fatalf("Expected 1 route got %d: %v", len(routes), routes)
   434  		}
   435  	})
   436  
   437  	t.Run("Returns client profile", func(t *testing.T) {
   438  		server := makeServer(t)
   439  		defer server.clusterStore.UnregisterGauges()
   440  
   441  		stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"client-ns"}`)
   442  		defer stream.Cancel()
   443  		profile := assertSingleProfile(t, stream.Updates())
   444  		routes := profile.GetRoutes()
   445  		if len(routes) != 1 {
   446  			t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
   447  		}
   448  		if !routes[0].GetIsRetryable() {
   449  			t.Fatalf("Expected route to be retryable, but it was not")
   450  		}
   451  	})
   452  
   453  	t.Run("Return profile when using cluster IP", func(t *testing.T) {
   454  		server := makeServer(t)
   455  		defer server.clusterStore.UnregisterGauges()
   456  
   457  		stream := profileStream(t, server, clusterIP, port, "")
   458  		defer stream.Cancel()
   459  		profile := assertSingleProfile(t, stream.Updates())
   460  		if profile.FullyQualifiedName != fullyQualifiedName {
   461  			t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
   462  		}
   463  		if profile.OpaqueProtocol {
   464  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   465  		}
   466  		routes := profile.GetRoutes()
   467  		if len(routes) != 1 {
   468  			t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
   469  		}
   470  	})
   471  
   472  	t.Run("Return profile when using secondary cluster IP", func(t *testing.T) {
   473  		server := makeServer(t)
   474  		defer server.clusterStore.UnregisterGauges()
   475  
   476  		stream := profileStream(t, server, clusterIPv6, port, "")
   477  		defer stream.Cancel()
   478  		profile := assertSingleProfile(t, stream.Updates())
   479  		if profile.FullyQualifiedName != fullyQualifiedNameDual {
   480  			t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
   481  		}
   482  		if profile.OpaqueProtocol {
   483  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   484  		}
   485  		routes := profile.GetRoutes()
   486  		if len(routes) != 1 {
   487  			t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
   488  		}
   489  	})
   490  
   491  	t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
   492  		server := makeServer(t)
   493  		defer server.clusterStore.UnregisterGauges()
   494  
   495  		stream := profileStream(t, server, fullyQualifiedPodDNS, port, "ns:ns")
   496  		defer stream.Cancel()
   497  
   498  		epAddr, err := toAddress(podIPStatefulSet, port)
   499  		if err != nil {
   500  			t.Fatalf("Got error: %s", err)
   501  		}
   502  
   503  		// An explanation for why we expect 1 to 3 updates is in test cases
   504  		// above
   505  		updates := stream.Updates()
   506  		if len(updates) == 0 || len(updates) > 3 {
   507  			t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
   508  		}
   509  
   510  		first := updates[0]
   511  		if first.Endpoint == nil {
   512  			t.Fatalf("Expected response to have endpoint field")
   513  		}
   514  		if first.OpaqueProtocol {
   515  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   516  		}
   517  		_, exists := first.Endpoint.MetricLabels["namespace"]
   518  		if !exists {
   519  			t.Fatalf("Expected 'namespace' metric label to exist but it did not")
   520  		}
   521  		if first.GetEndpoint().GetProtocolHint() == nil {
   522  			t.Fatalf("Expected protocol hint but found none")
   523  		}
   524  		if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
   525  			t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
   526  		}
   527  		if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
   528  			t.Fatal("IP is empty")
   529  		}
   530  		if first.Endpoint.Addr.String() != epAddr.String() {
   531  			t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
   532  		}
   533  	})
   534  
   535  	t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) {
   536  		server := makeServer(t)
   537  		http2Params := pb.Http2ClientParams{
   538  			KeepAlive: &pb.Http2ClientParams_KeepAlive{
   539  				Timeout:  &duration.Duration{Seconds: 10},
   540  				Interval: &duration.Duration{Seconds: 20},
   541  			},
   542  		}
   543  		server.config.MeshedHttp2ClientParams = &http2Params
   544  		defer server.clusterStore.UnregisterGauges()
   545  
   546  		stream := profileStream(t, server, podIP1, port, "ns:ns")
   547  		defer stream.Cancel()
   548  
   549  		epAddr, err := toAddress(podIP1, port)
   550  		if err != nil {
   551  			t.Fatalf("Got error: %s", err)
   552  		}
   553  
   554  		// An explanation for why we expect 1 to 3 updates is in test cases
   555  		// above
   556  		updates := stream.Updates()
   557  		if len(updates) == 0 || len(updates) > 3 {
   558  			t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
   559  		}
   560  
   561  		first := updates[0]
   562  		if first.Endpoint == nil {
   563  			t.Fatalf("Expected response to have endpoint field")
   564  		}
   565  		if first.OpaqueProtocol {
   566  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   567  		}
   568  		_, exists := first.Endpoint.MetricLabels["namespace"]
   569  		if !exists {
   570  			t.Fatalf("Expected 'namespace' metric label to exist but it did not")
   571  		}
   572  		if first.GetEndpoint().GetProtocolHint() == nil {
   573  			t.Fatalf("Expected protocol hint but found none")
   574  		}
   575  		if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
   576  			t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
   577  		}
   578  		if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
   579  			t.Fatal("IP is empty")
   580  		}
   581  		if first.Endpoint.Addr.String() != epAddr.String() {
   582  			t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
   583  		}
   584  		if !reflect.DeepEqual(first.Endpoint.GetHttp2(), &http2Params) {
   585  			t.Fatalf("Expected HTTP/2 client params to be %v, but got %v", &http2Params, first.Endpoint.GetHttp2())
   586  		}
   587  	})
   588  
   589  	t.Run("Return profile with endpoint when using pod secondary IP", func(t *testing.T) {
   590  		server := makeServer(t)
   591  		defer server.clusterStore.UnregisterGauges()
   592  
   593  		stream := profileStream(t, server, podIPv6Dual, port, "ns:ns")
   594  		defer stream.Cancel()
   595  
   596  		epAddr, err := toAddress(podIPv6Dual, port)
   597  		if err != nil {
   598  			t.Fatalf("Got error: %s", err)
   599  		}
   600  
   601  		// An explanation for why we expect 1 to 3 updates is in test cases
   602  		// above
   603  		updates := stream.Updates()
   604  		if len(updates) == 0 || len(updates) > 3 {
   605  			t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
   606  		}
   607  
   608  		first := updates[0]
   609  		if first.Endpoint == nil {
   610  			t.Fatalf("Expected response to have endpoint field")
   611  		}
   612  		if first.OpaqueProtocol {
   613  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   614  		}
   615  		_, exists := first.Endpoint.MetricLabels["namespace"]
   616  		if !exists {
   617  			t.Fatalf("Expected 'namespace' metric label to exist but it did not")
   618  		}
   619  		if first.GetEndpoint().GetProtocolHint() == nil {
   620  			t.Fatalf("Expected protocol hint but found none")
   621  		}
   622  		if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
   623  			t.Fatalf("Expected pod to not support opaque traffic on port %d", port)
   624  		}
   625  		if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
   626  			t.Fatal("IP is empty")
   627  		}
   628  		if first.Endpoint.Addr.String() != epAddr.String() {
   629  			t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
   630  		}
   631  	})
   632  
   633  	t.Run("Return profile with endpoint when using externalworkload IP", func(t *testing.T) {
   634  		server := makeServer(t)
   635  		http2Params := pb.Http2ClientParams{
   636  			KeepAlive: &pb.Http2ClientParams_KeepAlive{
   637  				Timeout:  &duration.Duration{Seconds: 10},
   638  				Interval: &duration.Duration{Seconds: 20},
   639  			},
   640  		}
   641  		server.config.MeshedHttp2ClientParams = &http2Params
   642  		defer server.clusterStore.UnregisterGauges()
   643  
   644  		stream := profileStream(t, server, externalWorkloadIP, port, "ns:ns")
   645  		defer stream.Cancel()
   646  
   647  		epAddr, err := toAddress(externalWorkloadIP, port)
   648  		if err != nil {
   649  			t.Fatalf("Got error: %s", err)
   650  		}
   651  
   652  		// An explanation for why we expect 1 to 3 updates is in test cases
   653  		// above
   654  		updates := stream.Updates()
   655  		if len(updates) == 0 || len(updates) > 3 {
   656  			t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
   657  		}
   658  
   659  		first := updates[0]
   660  		if first.Endpoint == nil {
   661  			t.Fatalf("Expected response to have endpoint field")
   662  		}
   663  		if first.OpaqueProtocol {
   664  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
   665  		}
   666  		_, exists := first.Endpoint.MetricLabels["namespace"]
   667  		if !exists {
   668  			t.Fatalf("Expected 'namespace' metric label to exist but it did not %v", first.Endpoint)
   669  		}
   670  		if first.GetEndpoint().GetProtocolHint() == nil {
   671  			t.Fatalf("Expected protocol hint but found none")
   672  		}
   673  		if first.GetEndpoint().GetProtocolHint().GetOpaqueTransport() != nil {
   674  			t.Fatalf("Expected externalworkload to not support opaque traffic on port %d", port)
   675  		}
   676  		if first.Endpoint.Addr.Ip.GetIpv4() == 0 && first.Endpoint.Addr.Ip.GetIpv6() == nil {
   677  			t.Fatal("IP is empty")
   678  		}
   679  		if first.Endpoint.Addr.String() != epAddr.String() {
   680  			t.Fatalf("Expected endpoint IP to be %s, but it was %s", epAddr.Ip, first.Endpoint.Addr.Ip)
   681  		}
   682  		if !reflect.DeepEqual(first.Endpoint.GetHttp2(), &http2Params) {
   683  			t.Fatalf("Expected HTTP/2 client params to be %v, but got %v", &http2Params, first.Endpoint.GetHttp2())
   684  		}
   685  	})
   686  
   687  	t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
   688  		server := makeServer(t)
   689  		defer server.clusterStore.UnregisterGauges()
   690  
   691  		stream := profileStream(t, server, "172.0.0.0", 1234, "")
   692  		defer stream.Cancel()
   693  		profile := assertSingleProfile(t, stream.Updates())
   694  		if profile.RetryBudget == nil {
   695  			t.Fatalf("Expected default profile to have a retry budget")
   696  		}
   697  	})
   698  
   699  	t.Run("Return profile with no opaque transport when pod does not have label and port is opaque", func(t *testing.T) {
   700  		server := makeServer(t)
   701  		defer server.clusterStore.UnregisterGauges()
   702  
   703  		// port 3306 is in the default opaque port list
   704  		stream := profileStream(t, server, podIP2, 3306, "")
   705  		defer stream.Cancel()
   706  		profile := assertSingleProfile(t, stream.Updates())
   707  		if profile.Endpoint == nil {
   708  			t.Fatalf("Expected response to have endpoint field")
   709  		}
   710  
   711  		if profile.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil {
   712  			t.Fatalf("Expected no opaque transport but found one")
   713  		}
   714  		if profile.GetEndpoint().GetHttp2() != nil {
   715  			t.Fatalf("Expected no HTTP/2 client parameters but found one")
   716  		}
   717  	})
   718  
   719  	t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) {
   720  		server := makeServer(t)
   721  		defer server.clusterStore.UnregisterGauges()
   722  
   723  		stream := profileStream(t, server, podIP2, port, "")
   724  		defer stream.Cancel()
   725  		profile := assertSingleProfile(t, stream.Updates())
   726  		if profile.Endpoint == nil {
   727  			t.Fatalf("Expected response to have endpoint field")
   728  		}
   729  		if profile.Endpoint.GetProtocolHint().GetProtocol() != nil || profile.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil {
   730  			t.Fatalf("Expected no protocol hint but found one")
   731  		}
   732  	})
   733  
   734  	t.Run("Return profile with protocol hint for default opaque port when pod is unmeshed", func(t *testing.T) {
   735  		server := makeServer(t)
   736  		defer server.clusterStore.UnregisterGauges()
   737  
   738  		// 3306 is in the default opaque list
   739  		stream := profileStream(t, server, podIP2, 3306, "")
   740  		defer stream.Cancel()
   741  		profile := assertSingleProfile(t, stream.Updates())
   742  		if profile.Endpoint == nil {
   743  			t.Fatalf("Expected response to have endpoint field")
   744  		}
   745  		if !profile.OpaqueProtocol {
   746  			t.Fatal("Expected port 3306 to be an opaque protocol, but it was not")
   747  		}
   748  		if profile.GetEndpoint().GetProtocolHint() != nil {
   749  			t.Fatalf("Expected protocol hint to be nil")
   750  		}
   751  	})
   752  
   753  	t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) {
   754  		server := makeServer(t)
   755  		defer server.clusterStore.UnregisterGauges()
   756  
   757  		stream := profileStream(t, server, clusterIPOpaque, opaquePort, "")
   758  		defer stream.Cancel()
   759  		profile := assertSingleProfile(t, stream.Updates())
   760  		if profile.FullyQualifiedName != fullyQualifiedNameOpaque {
   761  			t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, profile.FullyQualifiedName)
   762  		}
   763  		if profile.OpaqueProtocol {
   764  			t.Fatalf("Expected port %d to not be an opaque protocol, but it was", opaquePort)
   765  		}
   766  	})
   767  
   768  	t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
   769  		server := makeServer(t)
   770  		defer server.clusterStore.UnregisterGauges()
   771  
   772  		stream := profileStream(t, server, podIPOpaque, opaquePort, "")
   773  		defer stream.Cancel()
   774  
   775  		epAddr, err := toAddress(podIPOpaque, opaquePort)
   776  		if err != nil {
   777  			t.Fatalf("Got error: %s", err)
   778  		}
   779  
   780  		// An explanation for why we expect 1 to 3 updates is in test cases
   781  		// above
   782  		updates := stream.Updates()
   783  		if len(updates) == 0 || len(updates) > 3 {
   784  			t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
   785  		}
   786  
   787  		profile := assertSingleProfile(t, updates)
   788  		if profile.Endpoint == nil {
   789  			t.Fatalf("Expected response to have endpoint field")
   790  		}
   791  		if !profile.OpaqueProtocol {
   792  			t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
   793  		}
   794  		_, exists := profile.Endpoint.MetricLabels["namespace"]
   795  		if !exists {
   796  			t.Fatalf("Expected 'namespace' metric label to exist but it did not")
   797  		}
   798  		if profile.Endpoint.ProtocolHint == nil {
   799  			t.Fatalf("Expected protocol hint but found none")
   800  		}
   801  		if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
   802  			t.Fatalf("Expected pod to support opaque traffic on port 4143")
   803  		}
   804  		if profile.Endpoint.Addr.Ip.GetIpv4() == 0 && profile.Endpoint.Addr.Ip.GetIpv6() == nil {
   805  			t.Fatal("IP is empty")
   806  		}
   807  		if profile.Endpoint.Addr.String() != epAddr.String() {
   808  			t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port)
   809  		}
   810  	})
   811  
   812  	t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) {
   813  		server := makeServer(t)
   814  		defer server.clusterStore.UnregisterGauges()
   815  
   816  		stream := profileStream(t, server, fullyQualifiedNameOpaqueService, opaquePort, "")
   817  		defer stream.Cancel()
   818  		profile := assertSingleProfile(t, stream.Updates())
   819  		if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService {
   820  			t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaqueService, profile.FullyQualifiedName)
   821  		}
   822  		if !profile.OpaqueProtocol {
   823  			t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
   824  		}
   825  	})
   826  
   827  	t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
   828  		server := makeServer(t)
   829  		defer server.clusterStore.UnregisterGauges()
   830  
   831  		stream := profileStream(t, server, podIPSkipped, skippedPort, "")
   832  		defer stream.Cancel()
   833  		profile := assertSingleProfile(t, stream.Updates())
   834  		addr := profile.GetEndpoint()
   835  		if addr == nil {
   836  			t.Fatalf("Expected to not be nil")
   837  		}
   838  		if addr.GetProtocolHint().GetProtocol() != nil || addr.GetProtocolHint().GetOpaqueTransport() != nil {
   839  			t.Fatalf("Expected protocol hint for %s to be nil but got %+v", podIPSkipped, addr.ProtocolHint)
   840  		}
   841  		if addr.TlsIdentity != nil {
   842  			t.Fatalf("Expected TLS identity for %s to be nil but got %+v", podIPSkipped, addr.TlsIdentity)
   843  		}
   844  	})
   845  
   846  	t.Run("Return opaque protocol profile with endpoint when using externalworkload IP and opaque protocol port", func(t *testing.T) {
   847  		server := makeServer(t)
   848  		defer server.clusterStore.UnregisterGauges()
   849  
   850  		stream := profileStream(t, server, externalWorkloadIP, opaquePort, "")
   851  		defer stream.Cancel()
   852  
   853  		epAddr, err := toAddress(externalWorkloadIP, opaquePort)
   854  		if err != nil {
   855  			t.Fatalf("Got error: %s", err)
   856  		}
   857  
   858  		// An explanation for why we expect 1 to 3 updates is in test cases
   859  		// above
   860  		updates := stream.Updates()
   861  		if len(updates) == 0 || len(updates) > 3 {
   862  			t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(updates), updates)
   863  		}
   864  
   865  		profile := assertSingleProfile(t, updates)
   866  		if profile.Endpoint == nil {
   867  			t.Fatalf("Expected response to have endpoint field")
   868  		}
   869  		if !profile.OpaqueProtocol {
   870  			t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
   871  		}
   872  		_, exists := profile.Endpoint.MetricLabels["namespace"]
   873  		if !exists {
   874  			t.Fatalf("Expected 'namespace' metric label to exist but it did not")
   875  		}
   876  		if profile.Endpoint.ProtocolHint == nil {
   877  			t.Fatalf("Expected protocol hint but found none")
   878  		}
   879  		if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
   880  			t.Fatalf("Expected pod to support opaque traffic on port 4143")
   881  		}
   882  		if profile.Endpoint.Addr.Ip.GetIpv4() == 0 && profile.Endpoint.Addr.Ip.GetIpv6() == nil {
   883  			t.Fatal("IP is empty")
   884  		}
   885  		if profile.Endpoint.Addr.String() != epAddr.String() {
   886  			t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port)
   887  		}
   888  	})
   889  
   890  	t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
   891  		server := makeServer(t)
   892  		defer server.clusterStore.UnregisterGauges()
   893  
   894  		stream := profileStream(t, server, podIPPolicy, 80, "")
   895  		defer stream.Cancel()
   896  		profile := assertSingleProfile(t, stream.Updates())
   897  		if profile.Endpoint == nil {
   898  			t.Fatalf("Expected response to have endpoint field")
   899  		}
   900  		if !profile.OpaqueProtocol {
   901  			t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
   902  		}
   903  		if profile.Endpoint.GetProtocolHint() == nil {
   904  			t.Fatalf("Expected protocol hint but found none")
   905  		}
   906  		if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
   907  			t.Fatalf("Expected pod to support opaque traffic on port 4143")
   908  		}
   909  	})
   910  
   911  	t.Run("Return profile with opaque protocol when using externalworkload IP selected by a Server", func(t *testing.T) {
   912  		server := makeServer(t)
   913  		defer server.clusterStore.UnregisterGauges()
   914  
   915  		stream := profileStream(t, server, externalWorkloadIPPolicy, 80, "")
   916  		defer stream.Cancel()
   917  		profile := assertSingleProfile(t, stream.Updates())
   918  		if profile.Endpoint == nil {
   919  			t.Fatalf("Expected response to have endpoint field")
   920  		}
   921  		if !profile.OpaqueProtocol {
   922  			t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
   923  		}
   924  		if profile.Endpoint.GetProtocolHint() == nil {
   925  			t.Fatalf("Expected protocol hint but found none")
   926  		}
   927  		if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 {
   928  			t.Fatalf("Expected pod to support opaque traffic on port 4143")
   929  		}
   930  	})
   931  
   932  	t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
   933  		server := makeServer(t)
   934  		defer server.clusterStore.UnregisterGauges()
   935  
   936  		stream := profileStream(t, server, externalIP, 3306, "")
   937  		defer stream.Cancel()
   938  		profile := assertSingleProfile(t, stream.Updates())
   939  		if !profile.OpaqueProtocol {
   940  			t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 3306)
   941  		}
   942  
   943  	})
   944  
   945  	t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) {
   946  		server := makeServer(t)
   947  		defer server.clusterStore.UnregisterGauges()
   948  
   949  		stream := profileStream(t, server, externalIP, 80, "")
   950  		defer stream.Cancel()
   951  		profile := assertSingleProfile(t, stream.Updates())
   952  		if profile.OpaqueProtocol {
   953  			t.Fatalf("Expected port %d to be a non-opaque protocol, but it was opaque", 80)
   954  		}
   955  	})
   956  
   957  	t.Run("Return profile for host port pods", func(t *testing.T) {
   958  		hostPort := uint32(7777)
   959  		containerPort := uint32(80)
   960  		server, l5dClient := getServerWithClient(t)
   961  		defer server.clusterStore.UnregisterGauges()
   962  
   963  		stream := profileStream(t, server, externalIP, hostPort, "")
   964  		defer stream.Cancel()
   965  
   966  		// HostPort maps to pod.
   967  		profile := assertSingleProfile(t, stream.Updates())
   968  		dstPod := profile.Endpoint.MetricLabels["pod"]
   969  		if dstPod != "hostport-mapping" {
   970  			t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping", dstPod)
   971  		}
   972  
   973  		ip, err := addr.ParseProxyIP(externalIP)
   974  		if err != nil {
   975  			t.Fatalf("Error parsing IP: %s", err)
   976  		}
   977  		addr := profile.Endpoint.Addr
   978  		if addr.Ip.String() != ip.String() && addr.Port != hostPort {
   979  			t.Fatalf("Expected endpoint addr to be %s port:%d got %s", ip, hostPort, addr)
   980  		}
   981  
   982  		// HostPort pod is deleted.
   983  		err = server.k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "hostport-mapping", metav1.DeleteOptions{})
   984  		if err != nil {
   985  			t.Fatalf("Failed to delete pod: %s", err)
   986  		}
   987  		err = testutil.RetryFor(time.Second*10, func() error {
   988  			updates := stream.Updates()
   989  			if len(updates) < 2 {
   990  				return fmt.Errorf("expected 2 updates, got %d", len(updates))
   991  			}
   992  			return nil
   993  		})
   994  		if err != nil {
   995  			t.Fatal(err)
   996  		}
   997  		profile = stream.Updates()[1]
   998  		dstPod = profile.Endpoint.MetricLabels["pod"]
   999  		if dstPod != "" {
  1000  			t.Fatalf("Expected no dst_pod but got %s", dstPod)
  1001  		}
  1002  
  1003  		// New HostPort pod is created.
  1004  		_, err = server.k8sAPI.Client.CoreV1().Pods("ns").Create(context.Background(), &corev1.Pod{
  1005  			ObjectMeta: metav1.ObjectMeta{
  1006  				Name:      "hostport-mapping-2",
  1007  				Namespace: "ns",
  1008  				Labels: map[string]string{
  1009  					"app": "hostport-mapping-2",
  1010  				},
  1011  			},
  1012  			Spec: corev1.PodSpec{
  1013  				Containers: []corev1.Container{
  1014  					{
  1015  						Name: pkgk8s.ProxyContainerName,
  1016  						Env: []corev1.EnvVar{
  1017  							{
  1018  								Name:  "LINKERD2_PROXY_INBOUND_LISTEN_ADDR",
  1019  								Value: "0.0.0.0:4143",
  1020  							},
  1021  						},
  1022  					},
  1023  					{
  1024  						Name:  "nginx",
  1025  						Image: "nginx",
  1026  						Ports: []corev1.ContainerPort{
  1027  							{
  1028  								Name:          "nginx-7777",
  1029  								ContainerPort: (int32)(containerPort),
  1030  								HostPort:      (int32)(hostPort),
  1031  							},
  1032  						},
  1033  					},
  1034  				},
  1035  			},
  1036  			Status: corev1.PodStatus{
  1037  				Phase: "Running",
  1038  				Conditions: []corev1.PodCondition{
  1039  					{
  1040  						Type:   corev1.PodReady,
  1041  						Status: corev1.ConditionTrue,
  1042  					},
  1043  				},
  1044  				HostIP:  externalIP,
  1045  				HostIPs: []corev1.HostIP{{IP: externalIP}, {IP: externalIPv6}},
  1046  				PodIP:   "172.17.0.55",
  1047  				PodIPs:  []corev1.PodIP{{IP: "172.17.0.55"}},
  1048  			},
  1049  		}, metav1.CreateOptions{})
  1050  		if err != nil {
  1051  			t.Fatalf("Failed to create pod: %s", err)
  1052  		}
  1053  
  1054  		err = testutil.RetryFor(time.Second*10, func() error {
  1055  			updates := stream.Updates()
  1056  			if len(updates) < 3 {
  1057  				return fmt.Errorf("expected 3 updates, got %d", len(updates))
  1058  			}
  1059  			return nil
  1060  		})
  1061  		if err != nil {
  1062  			t.Fatal(err)
  1063  		}
  1064  
  1065  		profile = stream.Updates()[2]
  1066  		dstPod = profile.Endpoint.MetricLabels["pod"]
  1067  		if dstPod != "hostport-mapping-2" {
  1068  			t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod)
  1069  		}
  1070  		if profile.OpaqueProtocol {
  1071  			t.Fatal("Expected OpaqueProtocol=false")
  1072  		}
  1073  
  1074  		// Server is created, setting the port to opaque
  1075  		l5dClient.ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{
  1076  			ObjectMeta: metav1.ObjectMeta{
  1077  				Name:      "srv-hostport-mapping-2",
  1078  				Namespace: "ns",
  1079  			},
  1080  			Spec: v1beta2.ServerSpec{
  1081  				PodSelector: &metav1.LabelSelector{
  1082  					MatchLabels: map[string]string{
  1083  						"app": "hostport-mapping-2",
  1084  					},
  1085  				},
  1086  				Port: intstr.IntOrString{
  1087  					Type:   intstr.String,
  1088  					StrVal: "nginx-7777",
  1089  				},
  1090  				ProxyProtocol: "opaque",
  1091  			},
  1092  		}, metav1.CreateOptions{})
  1093  
  1094  		var updates []*pb.DestinationProfile
  1095  		err = testutil.RetryFor(time.Second*10, func() error {
  1096  			updates = stream.Updates()
  1097  			if len(updates) < 4 {
  1098  				return fmt.Errorf("expected 4 updates, got %d", len(updates))
  1099  			}
  1100  			return nil
  1101  		})
  1102  		if err != nil {
  1103  			t.Fatal(err)
  1104  		}
  1105  
  1106  		profile = stream.Updates()[3]
  1107  		if !profile.OpaqueProtocol {
  1108  			t.Fatal("Expected OpaqueProtocol=true")
  1109  		}
  1110  	})
  1111  }
  1112  
  1113  func TestTokenStructure(t *testing.T) {
  1114  	t.Run("when JSON is valid", func(t *testing.T) {
  1115  		server := makeServer(t)
  1116  		defer server.clusterStore.UnregisterGauges()
  1117  
  1118  		dest := &pb.GetDestination{ContextToken: "{\"ns\":\"ns-1\",\"nodeName\":\"node-1\"}\n"}
  1119  		token := server.parseContextToken(dest.ContextToken)
  1120  
  1121  		if token.Ns != "ns-1" {
  1122  			t.Fatalf("Expected token namespace to be %s got %s", "ns-1", token.Ns)
  1123  		}
  1124  
  1125  		if token.NodeName != "node-1" {
  1126  			t.Fatalf("Expected token nodeName to be %s got %s", "node-1", token.NodeName)
  1127  		}
  1128  	})
  1129  
  1130  	t.Run("when JSON is invalid and old token format used", func(t *testing.T) {
  1131  		server := makeServer(t)
  1132  		defer server.clusterStore.UnregisterGauges()
  1133  
  1134  		dest := &pb.GetDestination{ContextToken: "ns:ns-2"}
  1135  		token := server.parseContextToken(dest.ContextToken)
  1136  		if token.Ns != "ns-2" {
  1137  			t.Fatalf("Expected %s got %s", "ns-2", token.Ns)
  1138  		}
  1139  	})
  1140  
  1141  	t.Run("when invalid JSON and invalid old format", func(t *testing.T) {
  1142  		server := makeServer(t)
  1143  		server.clusterStore.UnregisterGauges()
  1144  
  1145  		dest := &pb.GetDestination{ContextToken: "123fa-test"}
  1146  		token := server.parseContextToken(dest.ContextToken)
  1147  		if token.Ns != "" || token.NodeName != "" {
  1148  			t.Fatalf("Expected context token to be empty, got %v", token)
  1149  		}
  1150  	})
  1151  }
  1152  
  1153  func updateAddAddress(t *testing.T, update *pb.Update) []string {
  1154  	t.Helper()
  1155  	add, ok := update.GetUpdate().(*pb.Update_Add)
  1156  	if !ok {
  1157  		t.Fatalf("Update expected to be an add, but was %+v", update)
  1158  	}
  1159  	ips := []string{}
  1160  	for _, ip := range add.Add.Addrs {
  1161  		ips = append(ips, addr.ProxyAddressToString(ip.GetAddr()))
  1162  	}
  1163  	return ips
  1164  }
  1165  
  1166  func updateRemoveAddress(t *testing.T, update *pb.Update) []string {
  1167  	t.Helper()
  1168  	add, ok := update.GetUpdate().(*pb.Update_Remove)
  1169  	if !ok {
  1170  		t.Fatalf("Update expected to be a remove, but was %+v", update)
  1171  	}
  1172  	ips := []string{}
  1173  	for _, ip := range add.Remove.Addrs {
  1174  		ips = append(ips, addr.ProxyAddressToString(ip))
  1175  	}
  1176  	return ips
  1177  }
  1178  
  1179  func toAddress(path string, port uint32) (*net.TcpAddress, error) {
  1180  	ip, err := addr.ParseProxyIP(path)
  1181  	if err != nil {
  1182  		return nil, err
  1183  	}
  1184  	return &net.TcpAddress{
  1185  		Ip:   ip,
  1186  		Port: port,
  1187  	}, nil
  1188  }
  1189  
  1190  func TestIpWatcherGetSvcID(t *testing.T) {
  1191  	name := "service"
  1192  	namespace := "test"
  1193  	clusterIP := "10.245.0.1"
  1194  	k8sConfigs := `
  1195  apiVersion: v1
  1196  kind: Service
  1197  metadata:
  1198    name: service
  1199    namespace: test
  1200  spec:
  1201    type: ClusterIP
  1202    clusterIP: 10.245.0.1
  1203    clusterIPs:
  1204    - 10.245.0.1
  1205    - 2001:db8::88
  1206    ports:
  1207    - port: 1234`
  1208  
  1209  	t.Run("get services IDs by IP address", func(t *testing.T) {
  1210  		k8sAPI, err := k8s.NewFakeAPI(k8sConfigs)
  1211  		if err != nil {
  1212  			t.Fatalf("NewFakeAPI returned an error: %s", err)
  1213  		}
  1214  
  1215  		err = watcher.InitializeIndexers(k8sAPI)
  1216  		if err != nil {
  1217  			t.Fatalf("InitializeIndexers returned an error: %s", err)
  1218  		}
  1219  
  1220  		k8sAPI.Sync(nil)
  1221  
  1222  		svc, err := getSvcID(k8sAPI, clusterIP, logging.WithFields(nil))
  1223  		if err != nil {
  1224  			t.Fatalf("Error getting service: %s", err)
  1225  		}
  1226  		if svc == nil {
  1227  			t.Fatalf("Expected to find service mapped to [%s]", clusterIP)
  1228  		}
  1229  		if svc.Name != name {
  1230  			t.Fatalf("Expected service name to be [%s], but got [%s]", name, svc.Name)
  1231  		}
  1232  		if svc.Namespace != namespace {
  1233  			t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace)
  1234  		}
  1235  
  1236  		svc6, err := getSvcID(k8sAPI, clusterIPv6, logging.WithFields(nil))
  1237  		if err != nil {
  1238  			t.Fatalf("Error getting service: %s", err)
  1239  		}
  1240  		if svc6 == nil {
  1241  			t.Fatalf("Expected to find service mapped to [%s]", clusterIPv6)
  1242  		}
  1243  		if svc.Name != name {
  1244  			t.Fatalf("Expected service name to be [%s], but got [%s]", name, svc.Name)
  1245  		}
  1246  		if svc.Namespace != namespace {
  1247  			t.Fatalf("Expected service namespace to be [%s], but got [%s]", namespace, svc.Namespace)
  1248  		}
  1249  
  1250  		badClusterIP := "10.256.0.2"
  1251  		svc, err = getSvcID(k8sAPI, badClusterIP, logging.WithFields(nil))
  1252  		if err != nil {
  1253  			t.Fatalf("Error getting service: %s", err)
  1254  		}
  1255  		if svc != nil {
  1256  			t.Fatalf("Expected not to find service mapped to [%s]", badClusterIP)
  1257  		}
  1258  	})
  1259  }
  1260  
  1261  func testReturnEndpoints(t *testing.T, fqdn, ip string, port uint32) {
  1262  	t.Helper()
  1263  
  1264  	server := makeServer(t)
  1265  	defer server.clusterStore.UnregisterGauges()
  1266  
  1267  	stream := &bufferingGetStream{
  1268  		updates:          make(chan *pb.Update, 50),
  1269  		MockServerStream: util.NewMockServerStream(),
  1270  	}
  1271  	defer stream.Cancel()
  1272  
  1273  	testReturnEndpointsForServer(t, server, stream, fqdn, ip, port)
  1274  }
  1275  
  1276  func testReturnEndpointsForServer(t *testing.T, server *server, stream *bufferingGetStream, fqdn, ip string, port uint32) {
  1277  	t.Helper()
  1278  
  1279  	errs := make(chan error)
  1280  	// server.Get blocks until the grpc stream is complete so we call it
  1281  	// in a goroutine and watch stream.updates for updates.
  1282  	go func() {
  1283  		err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: fmt.Sprintf("%s:%d", fqdn, port)}, stream)
  1284  		if err != nil {
  1285  			errs <- err
  1286  		}
  1287  	}()
  1288  
  1289  	addr := fmt.Sprintf("%s:%d", ip, port)
  1290  	parsedIP, err := netip.ParseAddr(ip)
  1291  	if err != nil {
  1292  		t.Fatalf("Invalid IP [%s]: %s", ip, err)
  1293  	}
  1294  	if parsedIP.Is6() {
  1295  		addr = fmt.Sprintf("[%s]:%d", ip, port)
  1296  	}
  1297  
  1298  	select {
  1299  	case update := <-stream.updates:
  1300  		if updateAddAddress(t, update)[0] != addr {
  1301  			t.Fatalf("Expected %s but got %s", addr, updateAddAddress(t, update)[0])
  1302  		}
  1303  
  1304  		if len(stream.updates) != 0 {
  1305  			t.Fatalf("Expected 1 update but got %d: %v", 1+len(stream.updates), stream.updates)
  1306  		}
  1307  	case err := <-errs:
  1308  		t.Fatalf("Got error: %s", err)
  1309  	}
  1310  }
  1311  
  1312  func assertSingleProfile(t *testing.T, updates []*pb.DestinationProfile) *pb.DestinationProfile {
  1313  	t.Helper()
  1314  	// Under normal conditions the creation of resources by the fake API will
  1315  	// generate notifications that are discarded after the stream.Cancel() call,
  1316  	// but very rarely those notifications might come after, in which case we'll
  1317  	// get a second update.
  1318  	if len(updates) != 1 {
  1319  		t.Fatalf("Expected 1 profile update but got %d: %v", len(updates), updates)
  1320  	}
  1321  	return updates[0]
  1322  }
  1323  
  1324  func profileStream(t *testing.T, server *server, host string, port uint32, token string) *bufferingGetProfileStream {
  1325  	t.Helper()
  1326  
  1327  	stream := &bufferingGetProfileStream{
  1328  		updates:          []*pb.DestinationProfile{},
  1329  		MockServerStream: util.NewMockServerStream(),
  1330  	}
  1331  
  1332  	go func() {
  1333  		err := server.GetProfile(&pb.GetDestination{
  1334  			Scheme:       "k8s",
  1335  			Path:         gonet.JoinHostPort(host, fmt.Sprintf("%d", port)),
  1336  			ContextToken: token,
  1337  		}, stream)
  1338  		if err != nil {
  1339  			logging.Fatalf("Got error: %s", err)
  1340  		}
  1341  	}()
  1342  	// Give GetProfile some slack
  1343  	time.Sleep(50 * time.Millisecond)
  1344  
  1345  	return stream
  1346  }
  1347  

View as plain text