...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/endpoint_translator_test.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination

     1  package destination
     2  
     3  import (
     4  	"fmt"
     5  	"net/netip"
     6  	"sort"
     7  	"strings"
     8  	"sync"
     9  	"testing"
    10  
    11  	"github.com/go-test/deep"
    12  	pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    13  	"github.com/linkerd/linkerd2-proxy-api/go/net"
    14  	"github.com/linkerd/linkerd2/controller/api/destination/watcher"
    15  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
    16  	"github.com/linkerd/linkerd2/pkg/addr"
    17  	"github.com/linkerd/linkerd2/pkg/k8s"
    18  	"google.golang.org/protobuf/proto"
    19  	corev1 "k8s.io/api/core/v1"
    20  	v1 "k8s.io/api/discovery/v1"
    21  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    22  )
    23  
    24  var (
    25  	pod1 = watcher.Address{
    26  		IP:   "1.1.1.1",
    27  		Port: 1,
    28  		Pod: &corev1.Pod{
    29  			ObjectMeta: metav1.ObjectMeta{
    30  				Name:      "pod1",
    31  				Namespace: "ns",
    32  				Labels: map[string]string{
    33  					k8s.ControllerNSLabel:    "linkerd",
    34  					k8s.ProxyDeploymentLabel: "deployment-name",
    35  				},
    36  			},
    37  			Spec: corev1.PodSpec{
    38  				ServiceAccountName: "serviceaccount-name",
    39  			},
    40  		},
    41  		OwnerKind: "replicationcontroller",
    42  		OwnerName: "rc-name",
    43  	}
    44  
    45  	pod1IPv6 = watcher.Address{
    46  		IP:   "2001:0db8:85a3:0000:0000:8a2e:0370:7333",
    47  		Port: 1,
    48  		Pod: &corev1.Pod{
    49  			ObjectMeta: metav1.ObjectMeta{
    50  				Name:      "pod1",
    51  				Namespace: "ns",
    52  				Labels: map[string]string{
    53  					k8s.ControllerNSLabel:    "linkerd",
    54  					k8s.ProxyDeploymentLabel: "deployment-name",
    55  				},
    56  			},
    57  			Spec: corev1.PodSpec{
    58  				ServiceAccountName: "serviceaccount-name",
    59  			},
    60  		},
    61  		OwnerKind: "replicationcontroller",
    62  		OwnerName: "rc-name",
    63  	}
    64  
    65  	pod2 = watcher.Address{
    66  		IP:   "1.1.1.2",
    67  		Port: 2,
    68  		Pod: &corev1.Pod{
    69  			ObjectMeta: metav1.ObjectMeta{
    70  				Name:      "pod2",
    71  				Namespace: "ns",
    72  				Labels: map[string]string{
    73  					k8s.ControllerNSLabel:    "linkerd",
    74  					k8s.ProxyDeploymentLabel: "deployment-name",
    75  				},
    76  			},
    77  		},
    78  	}
    79  
    80  	pod3 = watcher.Address{
    81  		IP:   "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
    82  		Port: 3,
    83  		Pod: &corev1.Pod{
    84  			ObjectMeta: metav1.ObjectMeta{
    85  				Name:      "pod3",
    86  				Namespace: "ns",
    87  				Labels: map[string]string{
    88  					k8s.ControllerNSLabel:    "linkerd",
    89  					k8s.ProxyDeploymentLabel: "deployment-name",
    90  				},
    91  			},
    92  		},
    93  	}
    94  
    95  	podOpaque = watcher.Address{
    96  		IP:   "1.1.1.4",
    97  		Port: 4,
    98  		Pod: &corev1.Pod{
    99  			ObjectMeta: metav1.ObjectMeta{
   100  				Name:      "pod4",
   101  				Namespace: "ns",
   102  				Labels: map[string]string{
   103  					k8s.ControllerNSLabel:    "linkerd",
   104  					k8s.ProxyDeploymentLabel: "deployment-name",
   105  				},
   106  				Annotations: map[string]string{
   107  					k8s.ProxyOpaquePortsAnnotation: "4",
   108  				},
   109  			},
   110  			Spec: corev1.PodSpec{
   111  				Containers: []corev1.Container{
   112  					{
   113  						Name: k8s.ProxyContainerName,
   114  						Env: []corev1.EnvVar{
   115  							{
   116  								Name:  envInboundListenAddr,
   117  								Value: "0.0.0.0:4143",
   118  							},
   119  						},
   120  					},
   121  				},
   122  			},
   123  		},
   124  		OpaqueProtocol: true,
   125  	}
   126  
   127  	ew1 = watcher.Address{
   128  		IP:   "1.1.1.1",
   129  		Port: 1,
   130  		ExternalWorkload: &ewv1beta1.ExternalWorkload{
   131  			ObjectMeta: metav1.ObjectMeta{
   132  				Name:      "ew-1",
   133  				Namespace: "ns",
   134  			},
   135  			Spec: ewv1beta1.ExternalWorkloadSpec{
   136  				MeshTLS: ewv1beta1.MeshTLS{
   137  					Identity:   "spiffe://some-domain/ew-1",
   138  					ServerName: "server.local",
   139  				},
   140  			},
   141  		},
   142  		OwnerKind: "workloadgroup",
   143  		OwnerName: "wg-name",
   144  	}
   145  
   146  	ew2 = watcher.Address{
   147  		IP:   "1.1.1.2",
   148  		Port: 2,
   149  		ExternalWorkload: &ewv1beta1.ExternalWorkload{
   150  			ObjectMeta: metav1.ObjectMeta{
   151  				Name:      "ew-2",
   152  				Namespace: "ns",
   153  				Labels: map[string]string{
   154  					k8s.ControllerNSLabel:    "linkerd",
   155  					k8s.ProxyDeploymentLabel: "deployment-name",
   156  				},
   157  			},
   158  			Spec: ewv1beta1.ExternalWorkloadSpec{
   159  				MeshTLS: ewv1beta1.MeshTLS{
   160  					Identity:   "spiffe://some-domain/ew-2",
   161  					ServerName: "server.local",
   162  				},
   163  			},
   164  		},
   165  	}
   166  
   167  	ew3 = watcher.Address{
   168  		IP:   "1.1.1.3",
   169  		Port: 3,
   170  		ExternalWorkload: &ewv1beta1.ExternalWorkload{
   171  			ObjectMeta: metav1.ObjectMeta{
   172  				Name:      "ew-3",
   173  				Namespace: "ns",
   174  				Labels: map[string]string{
   175  					k8s.ControllerNSLabel:    "linkerd",
   176  					k8s.ProxyDeploymentLabel: "deployment-name",
   177  				},
   178  			},
   179  			Spec: ewv1beta1.ExternalWorkloadSpec{
   180  				MeshTLS: ewv1beta1.MeshTLS{
   181  					Identity:   "spiffe://some-domain/ew-3",
   182  					ServerName: "server.local",
   183  				},
   184  			},
   185  		},
   186  	}
   187  
   188  	ewOpaque = watcher.Address{
   189  		IP:   "1.1.1.4",
   190  		Port: 4,
   191  		ExternalWorkload: &ewv1beta1.ExternalWorkload{
   192  			ObjectMeta: metav1.ObjectMeta{
   193  				Name:      "pod4",
   194  				Namespace: "ns",
   195  				Annotations: map[string]string{
   196  					k8s.ProxyOpaquePortsAnnotation: "4",
   197  				},
   198  			},
   199  			Spec: ewv1beta1.ExternalWorkloadSpec{
   200  				MeshTLS: ewv1beta1.MeshTLS{
   201  					Identity:   "spiffe://some-domain/ew-opaque",
   202  					ServerName: "server.local",
   203  				},
   204  
   205  				Ports: []ewv1beta1.PortSpec{
   206  					{
   207  						Port: 4143,
   208  						Name: "linkerd-proxy",
   209  					},
   210  				},
   211  			},
   212  		},
   213  		OpaqueProtocol: true,
   214  	}
   215  
   216  	remoteGateway1 = watcher.Address{
   217  		IP:   "1.1.1.1",
   218  		Port: 1,
   219  	}
   220  
   221  	remoteGateway2 = watcher.Address{
   222  		IP:       "1.1.1.2",
   223  		Port:     2,
   224  		Identity: "some-identity",
   225  	}
   226  
   227  	remoteGatewayAuthOverride = watcher.Address{
   228  		IP:                "1.1.1.2",
   229  		Port:              2,
   230  		Identity:          "some-identity",
   231  		AuthorityOverride: "some-auth.com:2",
   232  	}
   233  
   234  	west1aAddress = watcher.Address{
   235  		IP:   "1.1.1.1",
   236  		Port: 1,
   237  		ForZones: []v1.ForZone{
   238  			{Name: "west-1a"},
   239  		},
   240  	}
   241  	west1bAddress = watcher.Address{
   242  		IP:   "1.1.1.1",
   243  		Port: 2,
   244  		ForZones: []v1.ForZone{
   245  			{Name: "west-1b"},
   246  		},
   247  	}
   248  	AddressOnTest123Node = watcher.Address{
   249  		IP:   "1.1.1.1",
   250  		Port: 1,
   251  		Pod: &corev1.Pod{
   252  			ObjectMeta: metav1.ObjectMeta{
   253  				Name:      "pod1",
   254  				Namespace: "ns",
   255  				Labels: map[string]string{
   256  					k8s.ControllerNSLabel:    "linkerd",
   257  					k8s.ProxyDeploymentLabel: "deployment-name",
   258  				},
   259  			},
   260  			Spec: corev1.PodSpec{
   261  				NodeName: "test-123",
   262  			},
   263  		},
   264  	}
   265  	AddressNotOnTest123Node = watcher.Address{
   266  		IP:   "1.1.1.2",
   267  		Port: 2,
   268  		Pod: &corev1.Pod{
   269  			ObjectMeta: metav1.ObjectMeta{
   270  				Name:      "pod1",
   271  				Namespace: "ns",
   272  				Labels: map[string]string{
   273  					k8s.ControllerNSLabel:    "linkerd",
   274  					k8s.ProxyDeploymentLabel: "deployment-name",
   275  				},
   276  			},
   277  			Spec: corev1.PodSpec{
   278  				NodeName: "test-234",
   279  			},
   280  		},
   281  	}
   282  )
   283  
   284  func TestEndpointTranslatorForRemoteGateways(t *testing.T) {
   285  	t.Run("Sends one update for add and another for remove", func(t *testing.T) {
   286  		mockGetServer, translator := makeEndpointTranslator(t)
   287  		translator.Start()
   288  		defer translator.Stop()
   289  
   290  		translator.Add(mkAddressSetForServices(remoteGateway1, remoteGateway2))
   291  		translator.Remove(mkAddressSetForServices(remoteGateway2))
   292  
   293  		expectedNumUpdates := 2
   294  		<-mockGetServer.updatesReceived // Add
   295  		<-mockGetServer.updatesReceived // Remove
   296  
   297  		if len(mockGetServer.updatesReceived) != 0 {
   298  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   299  		}
   300  	})
   301  
   302  	t.Run("Recovers after emptying address et", func(t *testing.T) {
   303  		mockGetServer, translator := makeEndpointTranslator(t)
   304  		translator.Start()
   305  		defer translator.Stop()
   306  
   307  		translator.Add(mkAddressSetForServices(remoteGateway1))
   308  		translator.Remove(mkAddressSetForServices(remoteGateway1))
   309  		translator.Add(mkAddressSetForServices(remoteGateway1))
   310  
   311  		expectedNumUpdates := 3
   312  		<-mockGetServer.updatesReceived // Add
   313  		<-mockGetServer.updatesReceived // Remove
   314  		<-mockGetServer.updatesReceived // Add
   315  
   316  		if len(mockGetServer.updatesReceived) != 0 {
   317  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   318  		}
   319  	})
   320  
   321  	t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
   322  		expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
   323  			Name: "some-identity",
   324  		}
   325  
   326  		expectedProtocolHint := &pb.ProtocolHint{
   327  			Protocol: &pb.ProtocolHint_H2_{
   328  				H2: &pb.ProtocolHint_H2{},
   329  			},
   330  		}
   331  
   332  		mockGetServer, translator := makeEndpointTranslator(t)
   333  		translator.Start()
   334  		defer translator.Stop()
   335  
   336  		translator.Add(mkAddressSetForServices(remoteGateway2))
   337  
   338  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   339  		if len(addrs) != 1 {
   340  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   341  		}
   342  
   343  		actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
   344  		if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
   345  			t.Fatalf("TlsIdentity: %v", diff)
   346  		}
   347  
   348  		actualProtocolHint := addrs[0].GetProtocolHint()
   349  		if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
   350  			t.Fatalf("ProtocolHint: %v", diff)
   351  		}
   352  	})
   353  
   354  	t.Run("Sends TlsIdentity and Auth override when present", func(t *testing.T) {
   355  		expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
   356  			Name: "some-identity",
   357  		}
   358  
   359  		expectedProtocolHint := &pb.ProtocolHint{
   360  			Protocol: &pb.ProtocolHint_H2_{
   361  				H2: &pb.ProtocolHint_H2{},
   362  			},
   363  		}
   364  
   365  		expectedAuthOverride := &pb.AuthorityOverride{
   366  			AuthorityOverride: "some-auth.com:2",
   367  		}
   368  
   369  		mockGetServer, translator := makeEndpointTranslator(t)
   370  		translator.Start()
   371  		defer translator.Stop()
   372  
   373  		translator.Add(mkAddressSetForServices(remoteGatewayAuthOverride))
   374  
   375  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   376  		if len(addrs) != 1 {
   377  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   378  		}
   379  
   380  		actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
   381  		if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
   382  			t.Fatalf("TlsIdentity %v", diff)
   383  		}
   384  
   385  		actualProtocolHint := addrs[0].GetProtocolHint()
   386  		if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
   387  			t.Fatalf("ProtocolHint %v", diff)
   388  		}
   389  
   390  		actualAuthOverride := addrs[0].GetAuthorityOverride()
   391  		if diff := deep.Equal(actualAuthOverride, expectedAuthOverride); diff != nil {
   392  			t.Fatalf("AuthOverride %v", diff)
   393  		}
   394  	})
   395  
   396  	t.Run("Does not send TlsIdentity when not present", func(t *testing.T) {
   397  		mockGetServer, translator := makeEndpointTranslator(t)
   398  		translator.Start()
   399  		defer translator.Stop()
   400  
   401  		translator.Add(mkAddressSetForServices(remoteGateway1))
   402  
   403  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   404  		if len(addrs) != 1 {
   405  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   406  		}
   407  
   408  		if addrs[0].TlsIdentity != nil {
   409  			t.Fatalf("Expected no TlsIdentity to be sent, but got [%v]", addrs[0].TlsIdentity)
   410  		}
   411  		if addrs[0].ProtocolHint != nil {
   412  			t.Fatalf("Expected no ProtocolHint to be sent, but got [%v]", addrs[0].TlsIdentity)
   413  		}
   414  	})
   415  
   416  }
   417  
   418  func TestEndpointTranslatorForPods(t *testing.T) {
   419  	t.Run("Sends one update for add and another for remove", func(t *testing.T) {
   420  		mockGetServer, translator := makeEndpointTranslator(t)
   421  		translator.Start()
   422  		defer translator.Stop()
   423  
   424  		translator.Add(mkAddressSetForPods(t, pod1, pod2))
   425  		translator.Remove(mkAddressSetForPods(t, pod2))
   426  
   427  		expectedNumUpdates := 2
   428  		<-mockGetServer.updatesReceived // Add
   429  		<-mockGetServer.updatesReceived // Remove
   430  
   431  		if len(mockGetServer.updatesReceived) != 0 {
   432  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   433  		}
   434  	})
   435  
   436  	t.Run("Sends addresses as removed or added", func(t *testing.T) {
   437  		mockGetServer, translator := makeEndpointTranslator(t)
   438  		translator.Start()
   439  		defer translator.Stop()
   440  
   441  		translator.Add(mkAddressSetForPods(t, pod1, pod2, pod3))
   442  		translator.Remove(mkAddressSetForPods(t, pod3))
   443  
   444  		addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
   445  		actualNumberOfAdded := len(addressesAdded)
   446  		expectedNumberOfAdded := 3
   447  		if actualNumberOfAdded != expectedNumberOfAdded {
   448  			t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
   449  		}
   450  
   451  		addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
   452  		actualNumberOfRemoved := len(addressesRemoved)
   453  		expectedNumberOfRemoved := 1
   454  		if actualNumberOfRemoved != expectedNumberOfRemoved {
   455  			t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
   456  		}
   457  
   458  		sort.Slice(addressesAdded, func(i, j int) bool {
   459  			return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
   460  		})
   461  		checkAddressAndWeight(t, addressesAdded[0], pod1, defaultWeight)
   462  		checkAddressAndWeight(t, addressesAdded[1], pod2, defaultWeight)
   463  		checkAddress(t, addressesRemoved[0], pod3)
   464  	})
   465  
   466  	t.Run("Sends metric labels with added addresses", func(t *testing.T) {
   467  		mockGetServer, translator := makeEndpointTranslator(t)
   468  		translator.Start()
   469  		defer translator.Stop()
   470  
   471  		translator.Add(mkAddressSetForPods(t, pod1))
   472  
   473  		update := <-mockGetServer.updatesReceived
   474  
   475  		actualGlobalMetricLabels := update.GetAdd().MetricLabels
   476  		expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
   477  		if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
   478  			t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
   479  		}
   480  
   481  		actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
   482  		expectedAddedAddress1MetricLabels := map[string]string{
   483  			"pod":                   "pod1",
   484  			"replicationcontroller": "rc-name",
   485  			"serviceaccount":        "serviceaccount-name",
   486  			"control_plane_ns":      "linkerd",
   487  			"zone":                  "",
   488  		}
   489  		if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
   490  			t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
   491  		}
   492  	})
   493  
   494  	t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
   495  		expectedTLSIdentity := &pb.TlsIdentity_DnsLikeIdentity{
   496  			Name: "serviceaccount-name.ns.serviceaccount.identity.linkerd.trust.domain",
   497  		}
   498  
   499  		mockGetServer, translator := makeEndpointTranslator(t)
   500  		translator.Start()
   501  		defer translator.Stop()
   502  
   503  		translator.Add(mkAddressSetForPods(t, pod1))
   504  
   505  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   506  		if len(addrs) != 1 {
   507  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   508  		}
   509  
   510  		actualTLSIdentity := addrs[0].GetTlsIdentity().GetDnsLikeIdentity()
   511  		if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
   512  			t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
   513  		}
   514  	})
   515  
   516  	t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
   517  		expectedProtocolHint := &pb.ProtocolHint{
   518  			Protocol: &pb.ProtocolHint_Opaque_{
   519  				Opaque: &pb.ProtocolHint_Opaque{},
   520  			},
   521  			OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
   522  				InboundPort: 4143,
   523  			},
   524  		}
   525  
   526  		mockGetServer, translator := makeEndpointTranslator(t)
   527  		translator.Start()
   528  		defer translator.Stop()
   529  
   530  		translator.Add(mkAddressSetForServices(podOpaque))
   531  
   532  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   533  		if len(addrs) != 1 {
   534  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   535  		}
   536  
   537  		actualProtocolHint := addrs[0].GetProtocolHint()
   538  		if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
   539  			t.Fatalf("ProtocolHint: %v", diff)
   540  		}
   541  	})
   542  
   543  	t.Run("Sends IPv6 only when pod has both IPv4 and IPv6", func(t *testing.T) {
   544  		mockGetServer, translator := makeEndpointTranslator(t)
   545  		translator.Start()
   546  		defer translator.Stop()
   547  
   548  		translator.Add(mkAddressSetForPods(t, pod1, pod1IPv6))
   549  
   550  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   551  		if len(addrs) != 1 {
   552  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   553  		}
   554  		if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "[2001:db8:85a3::8a2e:370:7333]:1" {
   555  			t.Fatalf("Expected address to be [%s], got [%s]", "[2001:db8:85a3::8a2e:370:7333]:1", ipPort)
   556  		}
   557  
   558  		if updates := len(mockGetServer.updatesReceived); updates > 0 {
   559  			t.Fatalf("Expected to receive no more messages, received [%d]", updates)
   560  		}
   561  	})
   562  
   563  	t.Run("Sends IPv4 only when pod has both IPv4 and IPv6 but the latter in another zone ", func(t *testing.T) {
   564  		mockGetServer, translator := makeEndpointTranslator(t)
   565  		translator.Start()
   566  		defer translator.Stop()
   567  
   568  		pod1West1a := pod1
   569  		pod1West1a.ForZones = []v1.ForZone{
   570  			{Name: "west-1a"},
   571  		}
   572  
   573  		pod1IPv6West1b := pod1IPv6
   574  		pod1IPv6West1b.ForZones = []v1.ForZone{
   575  			{Name: "west-1b"},
   576  		}
   577  
   578  		translator.Add(mkAddressSetForPods(t, pod1West1a, pod1IPv6West1b))
   579  
   580  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   581  		if len(addrs) != 1 {
   582  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   583  		}
   584  		if ipPort := addr.ProxyAddressToString(addrs[0].GetAddr()); ipPort != "1.1.1.1:1" {
   585  			t.Fatalf("Expected address to be [%s], got [%s]", "1.1.1.1:1", ipPort)
   586  		}
   587  
   588  		if updates := len(mockGetServer.updatesReceived); updates > 0 {
   589  			t.Fatalf("Expected to receive no more messages, received [%d]", updates)
   590  		}
   591  	})
   592  }
   593  
   594  func TestEndpointTranslatorExternalWorkloads(t *testing.T) {
   595  	t.Run("Sends one update for add and another for remove", func(t *testing.T) {
   596  		mockGetServer, translator := makeEndpointTranslator(t)
   597  		translator.Start()
   598  		defer translator.Stop()
   599  
   600  		translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2))
   601  		translator.Remove(mkAddressSetForExternalWorkloads(ew2))
   602  
   603  		expectedNumUpdates := 2
   604  		<-mockGetServer.updatesReceived // Add
   605  		<-mockGetServer.updatesReceived // Remove
   606  
   607  		if len(mockGetServer.updatesReceived) != 0 {
   608  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   609  		}
   610  	})
   611  
   612  	t.Run("Sends addresses as removed or added", func(t *testing.T) {
   613  		mockGetServer, translator := makeEndpointTranslator(t)
   614  		translator.Start()
   615  		defer translator.Stop()
   616  
   617  		translator.Add(mkAddressSetForExternalWorkloads(ew1, ew2, ew3))
   618  		translator.Remove(mkAddressSetForExternalWorkloads(ew3))
   619  
   620  		addressesAdded := (<-mockGetServer.updatesReceived).GetAdd().Addrs
   621  		actualNumberOfAdded := len(addressesAdded)
   622  		expectedNumberOfAdded := 3
   623  		if actualNumberOfAdded != expectedNumberOfAdded {
   624  			t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded)
   625  		}
   626  
   627  		addressesRemoved := (<-mockGetServer.updatesReceived).GetRemove().Addrs
   628  		actualNumberOfRemoved := len(addressesRemoved)
   629  		expectedNumberOfRemoved := 1
   630  		if actualNumberOfRemoved != expectedNumberOfRemoved {
   631  			t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved)
   632  		}
   633  
   634  		sort.Slice(addressesAdded, func(i, j int) bool {
   635  			return addressesAdded[i].GetAddr().Port < addressesAdded[j].GetAddr().Port
   636  		})
   637  		checkAddressAndWeight(t, addressesAdded[0], ew1, defaultWeight)
   638  		checkAddressAndWeight(t, addressesAdded[1], ew2, defaultWeight)
   639  		checkAddress(t, addressesRemoved[0], ew3)
   640  	})
   641  
   642  	t.Run("Sends metric labels with added addresses", func(t *testing.T) {
   643  		mockGetServer, translator := makeEndpointTranslator(t)
   644  		translator.Start()
   645  		defer translator.Stop()
   646  
   647  		translator.Add(mkAddressSetForExternalWorkloads(ew1))
   648  
   649  		update := <-mockGetServer.updatesReceived
   650  
   651  		actualGlobalMetricLabels := update.GetAdd().MetricLabels
   652  		expectedGlobalMetricLabels := map[string]string{"namespace": "service-ns", "service": "service-name"}
   653  		if diff := deep.Equal(actualGlobalMetricLabels, expectedGlobalMetricLabels); diff != nil {
   654  			t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels)
   655  		}
   656  
   657  		actualAddedAddress1MetricLabels := update.GetAdd().Addrs[0].MetricLabels
   658  		expectedAddedAddress1MetricLabels := map[string]string{
   659  			"external_workload": "ew-1",
   660  			"zone":              "",
   661  			"workloadgroup":     "wg-name",
   662  		}
   663  		if diff := deep.Equal(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels); diff != nil {
   664  			t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
   665  		}
   666  	})
   667  
   668  	t.Run("Sends TlsIdentity and Server Name when enabled", func(t *testing.T) {
   669  		expectedTLSIdentity := &pb.TlsIdentity{
   670  			Strategy: &pb.TlsIdentity_UriLikeIdentity_{
   671  				UriLikeIdentity: &pb.TlsIdentity_UriLikeIdentity{
   672  					Uri: "spiffe://some-domain/ew-1",
   673  				},
   674  			},
   675  			ServerName: &pb.TlsIdentity_DnsLikeIdentity{
   676  				Name: "server.local",
   677  			},
   678  		}
   679  
   680  		mockGetServer, translator := makeEndpointTranslator(t)
   681  		translator.Start()
   682  		defer translator.Stop()
   683  
   684  		translator.Add(mkAddressSetForExternalWorkloads(ew1))
   685  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   686  		if len(addrs) != 1 {
   687  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   688  		}
   689  
   690  		actualTLSIdentity := addrs[0].GetTlsIdentity()
   691  		if diff := deep.Equal(actualTLSIdentity, expectedTLSIdentity); diff != nil {
   692  			t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTLSIdentity, actualTLSIdentity)
   693  		}
   694  	})
   695  
   696  	t.Run("Sends Opaque ProtocolHint for opaque ports", func(t *testing.T) {
   697  		expectedProtocolHint := &pb.ProtocolHint{
   698  			Protocol: &pb.ProtocolHint_Opaque_{
   699  				Opaque: &pb.ProtocolHint_Opaque{},
   700  			},
   701  			OpaqueTransport: &pb.ProtocolHint_OpaqueTransport{
   702  				InboundPort: 4143,
   703  			},
   704  		}
   705  
   706  		mockGetServer, translator := makeEndpointTranslator(t)
   707  		translator.Start()
   708  		defer translator.Stop()
   709  
   710  		translator.Add(mkAddressSetForExternalWorkloads(ewOpaque))
   711  
   712  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   713  		if len(addrs) != 1 {
   714  			t.Fatalf("Expected [1] address returned, got %v", addrs)
   715  		}
   716  
   717  		actualProtocolHint := addrs[0].GetProtocolHint()
   718  		if diff := deep.Equal(actualProtocolHint, expectedProtocolHint); diff != nil {
   719  			t.Fatalf("ProtocolHint: %v", diff)
   720  		}
   721  	})
   722  }
   723  
   724  func TestEndpointTranslatorTopologyAwareFilter(t *testing.T) {
   725  	t.Run("Sends one update for add and none for remove", func(t *testing.T) {
   726  		mockGetServer, translator := makeEndpointTranslator(t)
   727  		translator.Start()
   728  		defer translator.Stop()
   729  
   730  		translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress))
   731  		translator.Remove(mkAddressSetForServices(west1bAddress))
   732  
   733  		// Only the address meant for west-1a should be added, which means
   734  		// that when we try to remove the address meant for west-1b there
   735  		// should be no remove update.
   736  		expectedNumUpdates := 1
   737  		<-mockGetServer.updatesReceived // Add
   738  
   739  		if len(mockGetServer.updatesReceived) != 0 {
   740  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   741  		}
   742  	})
   743  }
   744  
   745  func TestEndpointTranslatorExperimentalZoneWeights(t *testing.T) {
   746  	zoneA := "west-1a"
   747  	zoneB := "west-1b"
   748  	addrA := watcher.Address{
   749  		IP:   "7.9.7.9",
   750  		Port: 7979,
   751  		Zone: &zoneA,
   752  	}
   753  	addrB := watcher.Address{
   754  		IP:   "9.7.9.7",
   755  		Port: 9797,
   756  		Zone: &zoneB,
   757  	}
   758  
   759  	t.Run("Disabled", func(t *testing.T) {
   760  		mockGetServer, translator := makeEndpointTranslator(t)
   761  		translator.extEndpointZoneWeights = false
   762  		translator.Start()
   763  		defer translator.Stop()
   764  
   765  		translator.Add(mkAddressSetForServices(addrA, addrB))
   766  
   767  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   768  		if len(addrs) != 2 {
   769  			t.Fatalf("Expected [2] addresses returned, got %v", addrs)
   770  		}
   771  		sort.Slice(addrs, func(i, j int) bool {
   772  			return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
   773  		})
   774  		checkAddressAndWeight(t, addrs[0], addrA, defaultWeight)
   775  		checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
   776  	})
   777  
   778  	t.Run("Applies weights", func(t *testing.T) {
   779  		mockGetServer, translator := makeEndpointTranslator(t)
   780  		translator.extEndpointZoneWeights = true
   781  		translator.Start()
   782  		defer translator.Stop()
   783  
   784  		translator.Add(mkAddressSetForServices(addrA, addrB))
   785  
   786  		addrs := (<-mockGetServer.updatesReceived).GetAdd().GetAddrs()
   787  		if len(addrs) != 2 {
   788  			t.Fatalf("Expected [2] addresses returned, got %v", addrs)
   789  		}
   790  		sort.Slice(addrs, func(i, j int) bool {
   791  			return addrs[i].GetAddr().Port < addrs[j].GetAddr().Port
   792  		})
   793  		checkAddressAndWeight(t, addrs[0], addrA, defaultWeight*10)
   794  		checkAddressAndWeight(t, addrs[1], addrB, defaultWeight)
   795  	})
   796  }
   797  
   798  func TestEndpointTranslatorForLocalTrafficPolicy(t *testing.T) {
   799  	t.Run("Sends one update for add and none for remove", func(t *testing.T) {
   800  		mockGetServer, translator := makeEndpointTranslator(t)
   801  		translator.Start()
   802  		defer translator.Stop()
   803  		addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node)
   804  		addressSet.LocalTrafficPolicy = true
   805  		translator.Add(addressSet)
   806  		translator.Remove(mkAddressSetForServices(AddressNotOnTest123Node))
   807  
   808  		// Only the address meant for AddressOnTest123Node should be added, which means
   809  		// that when we try to remove the address meant for AddressNotOnTest123Node there
   810  		// should be no remove update.
   811  		expectedNumUpdates := 1
   812  		<-mockGetServer.updatesReceived // Add
   813  
   814  		if len(mockGetServer.updatesReceived) != 0 {
   815  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   816  		}
   817  	})
   818  
   819  	t.Run("Removes cannot change LocalTrafficPolicy", func(t *testing.T) {
   820  		mockGetServer, translator := makeEndpointTranslator(t)
   821  		translator.Start()
   822  		defer translator.Stop()
   823  		addressSet := mkAddressSetForServices(AddressOnTest123Node, AddressNotOnTest123Node)
   824  		addressSet.LocalTrafficPolicy = true
   825  		translator.Add(addressSet)
   826  		set := watcher.AddressSet{
   827  			Addresses:          make(map[watcher.ServiceID]watcher.Address),
   828  			Labels:             map[string]string{"service": "service-name", "namespace": "service-ns"},
   829  			LocalTrafficPolicy: false,
   830  		}
   831  		translator.Remove(set)
   832  
   833  		// Only the address meant for AddressOnTest123Node should be added.
   834  		// The remove with no addresses should not change the LocalTrafficPolicy
   835  		// and should be a noop that does not send an update.
   836  		expectedNumUpdates := 1
   837  		<-mockGetServer.updatesReceived // Add
   838  
   839  		if len(mockGetServer.updatesReceived) != 0 {
   840  			t.Fatalf("Expecting [%d] updates, got [%d].", expectedNumUpdates, expectedNumUpdates+len(mockGetServer.updatesReceived))
   841  		}
   842  	})
   843  }
   844  
   845  // TestConcurrency, to be triggered with `go test -race`, shouldn't report a race condition
   846  func TestConcurrency(t *testing.T) {
   847  	_, translator := makeEndpointTranslator(t)
   848  	translator.Start()
   849  	defer translator.Stop()
   850  
   851  	var wg sync.WaitGroup
   852  	for i := 0; i < 10; i++ {
   853  		wg.Add(1)
   854  		go func() {
   855  			defer wg.Done()
   856  			translator.Add(mkAddressSetForServices(west1aAddress, west1bAddress))
   857  			translator.Remove(mkAddressSetForServices(west1bAddress))
   858  		}()
   859  	}
   860  
   861  	wg.Wait()
   862  }
   863  
   864  func TestGetInboundPort(t *testing.T) {
   865  	podSpec := &corev1.PodSpec{
   866  		Containers: []corev1.Container{
   867  			{
   868  				Name: k8s.ProxyContainerName,
   869  				Env: []corev1.EnvVar{
   870  					{
   871  						Name:  envInboundListenAddr,
   872  						Value: "1.2.3.4:8080",
   873  					},
   874  				},
   875  			},
   876  		},
   877  	}
   878  
   879  	port, err := getInboundPort(podSpec)
   880  	if err != nil {
   881  		t.Fatalf("Unexpected error: %s", err)
   882  	}
   883  	if port != 8080 {
   884  		t.Fatalf("Expecting port [%d], got [%d]", 8080, port)
   885  	}
   886  
   887  	podSpec.Containers[0].Env[0].Value = "[2001:db8::94]:8080"
   888  	port, err = getInboundPort(podSpec)
   889  	if err != nil {
   890  		t.Fatalf("Unexpected error: %s", err)
   891  	}
   892  	if port != 8080 {
   893  		t.Fatalf("Expecting port [%d], got [%d]", 8080, port)
   894  	}
   895  }
   896  
   897  func mkAddressSetForServices(gatewayAddresses ...watcher.Address) watcher.AddressSet {
   898  	set := watcher.AddressSet{
   899  		Addresses: make(map[watcher.ServiceID]watcher.Address),
   900  		Labels:    map[string]string{"service": "service-name", "namespace": "service-ns"},
   901  	}
   902  	for _, a := range gatewayAddresses {
   903  		a := a // pin
   904  
   905  		id := watcher.ServiceID{
   906  			Name: strings.Join([]string{
   907  				a.IP,
   908  				fmt.Sprint(a.Port),
   909  			}, "-"),
   910  		}
   911  		set.Addresses[id] = a
   912  	}
   913  	return set
   914  }
   915  
   916  func mkAddressSetForPods(t *testing.T, podAddresses ...watcher.Address) watcher.AddressSet {
   917  	t.Helper()
   918  
   919  	set := watcher.AddressSet{
   920  		Addresses: make(map[watcher.PodID]watcher.Address),
   921  		Labels:    map[string]string{"service": "service-name", "namespace": "service-ns"},
   922  	}
   923  	for _, p := range podAddresses {
   924  		// The IP family is set on the PodID used to index the
   925  		// watcher.Address; here we simply detect it
   926  		fam := corev1.IPv4Protocol
   927  		addr, err := netip.ParseAddr(p.IP)
   928  		if err != nil {
   929  			t.Fatalf("Invalid IP '%s': %s", p.IP, err)
   930  		}
   931  		if addr.Is6() {
   932  			fam = corev1.IPv6Protocol
   933  		}
   934  
   935  		id := watcher.PodID{
   936  			Name:      p.Pod.Name,
   937  			Namespace: p.Pod.Namespace,
   938  			IPFamily:  fam,
   939  		}
   940  		set.Addresses[id] = p
   941  	}
   942  	return set
   943  }
   944  
   945  func mkAddressSetForExternalWorkloads(ewAddresses ...watcher.Address) watcher.AddressSet {
   946  	set := watcher.AddressSet{
   947  		Addresses: make(map[watcher.PodID]watcher.Address),
   948  		Labels:    map[string]string{"service": "service-name", "namespace": "service-ns"},
   949  	}
   950  	for _, ew := range ewAddresses {
   951  		id := watcher.ExternalWorkloadID{Name: ew.ExternalWorkload.Name, Namespace: ew.ExternalWorkload.Namespace}
   952  		set.Addresses[id] = ew
   953  	}
   954  	return set
   955  }
   956  
   957  func checkAddressAndWeight(t *testing.T, actual *pb.WeightedAddr, expected watcher.Address, weight uint32) {
   958  	t.Helper()
   959  
   960  	checkAddress(t, actual.GetAddr(), expected)
   961  	if actual.GetWeight() != weight {
   962  		t.Fatalf("Expected weight [%+v] but got [%+v]", weight, actual.GetWeight())
   963  	}
   964  }
   965  
   966  func checkAddress(t *testing.T, actual *net.TcpAddress, expected watcher.Address) {
   967  	t.Helper()
   968  
   969  	expectedAddr, err := addr.ParseProxyIP(expected.IP)
   970  	expectedTCP := net.TcpAddress{
   971  		Ip:   expectedAddr,
   972  		Port: expected.Port,
   973  	}
   974  	if err != nil {
   975  		t.Fatalf("Failed to parse expected IP [%s]: %s", expected.IP, err)
   976  	}
   977  	if actual.Ip.GetIpv4() == 0 && actual.Ip.GetIpv6() == nil {
   978  		t.Fatal("Actual IP is empty")
   979  	}
   980  	if actual.Ip.GetIpv4() != expectedTCP.Ip.GetIpv4() {
   981  		t.Fatalf("Expected IPv4 [%+v] but got [%+v]", expectedTCP.Ip, actual.Ip)
   982  	}
   983  	if !proto.Equal(actual.Ip.GetIpv6(), expectedTCP.Ip.GetIpv6()) {
   984  		t.Fatalf("Expected IPv6 [%+v] but got [%+v]", expectedTCP.Ip, actual.Ip)
   985  	}
   986  	if actual.Port != expectedTCP.Port {
   987  		t.Fatalf("Expected port [%+v] but got [%+v]", expectedTCP.Port, actual.Port)
   988  	}
   989  }
   990  

View as plain text