...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/endpoint_routing_test.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint_test
     2  
     3  import (
     4  	"fmt"
     5  	"net"
     6  	"strings"
     7  	"testing"
     8  
     9  	"github.com/stretchr/testify/assert"
    10  	"github.com/stretchr/testify/require"
    11  	"k8s.io/apimachinery/pkg/util/intstr"
    12  
    13  	"github.com/datawire/ambassador/v2/cmd/entrypoint"
    14  	"github.com/datawire/ambassador/v2/pkg/ambex"
    15  	v3bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v3"
    16  	v3cluster "github.com/datawire/ambassador/v2/pkg/api/envoy/config/cluster/v3"
    17  	amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
    18  	"github.com/datawire/ambassador/v2/pkg/kates"
    19  	"github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    20  )
    21  
    22  func TestEndpointRouting(t *testing.T) {
    23  	f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
    24  	// Create Mapping, Service, and Endpoints resources to start.
    25  	assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
    26  	assert.NoError(t, f.Upsert(makeService("default", "foo")))
    27  	subset, err := makeSubset(8080, "1.2.3.4")
    28  	require.NoError(t, err)
    29  	assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
    30  	f.Flush()
    31  	snap, err := f.GetSnapshot(HasMapping("default", "foo"))
    32  	require.NoError(t, err)
    33  	assert.NotNil(t, snap)
    34  
    35  	// Check that the endpoints resource we created at the start was properly propagated.
    36  	endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo"))
    37  	require.NoError(t, err)
    38  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip)
    39  	assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port)
    40  	assert.Contains(t, endpoints.Entries, "k8s/default/foo/80")
    41  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
    42  	assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
    43  }
    44  
    45  func TestEndpointRoutingMappingAnnotations(t *testing.T) {
    46  	f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
    47  	// Create Mapping, Service, and Endpoints resources to start.
    48  	svc := makeService("default", "foo")
    49  	svc.ObjectMeta.Annotations = map[string]string{
    50  		"getambassador.io/config": `
    51  ---
    52  apiVersion: getambassador.io/v3alpha1
    53  kind: Mapping
    54  name: foo
    55  prefix: /foo
    56  service: foo
    57  resolver: endpoint`,
    58  	}
    59  	assert.NoError(t, f.Upsert(svc))
    60  	subset, err := makeSubset(8080, "1.2.3.4")
    61  	require.NoError(t, err)
    62  	assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
    63  	f.Flush()
    64  	snap, err := f.GetSnapshot(HasService("default", "foo"))
    65  	require.NoError(t, err)
    66  	assert.NotNil(t, snap)
    67  
    68  	// Check that the endpoints resource we created at the start was properly propagated.
    69  	endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo"))
    70  	require.NoError(t, err)
    71  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo"][0].Ip)
    72  	assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo"][0].Port)
    73  	assert.Contains(t, endpoints.Entries, "k8s/default/foo/80")
    74  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
    75  	assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
    76  }
    77  
    78  func TestEndpointRoutingMultiplePorts(t *testing.T) {
    79  	f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
    80  	// Create Mapping, Service, and Endpoints, except this time the Service has multiple ports.
    81  	assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "foo", "endpoint")))
    82  	assert.NoError(t, f.Upsert(&kates.Service{
    83  		TypeMeta:   kates.TypeMeta{Kind: "Service"},
    84  		ObjectMeta: kates.ObjectMeta{Namespace: "default", Name: "foo"},
    85  		Spec: kates.ServiceSpec{
    86  			Ports: []kates.ServicePort{
    87  				{
    88  					Name:       "cleartext",
    89  					Port:       80,
    90  					TargetPort: intstr.FromInt(8080),
    91  				},
    92  				{
    93  					Name:       "encrypted",
    94  					Port:       443,
    95  					TargetPort: intstr.FromInt(8443),
    96  				},
    97  			},
    98  		},
    99  	}))
   100  	subset, err := makeSubset("cleartext", 8080, "encrypted", 8443, "1.2.3.4")
   101  	require.NoError(t, err)
   102  	assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
   103  	f.Flush()
   104  	snap, err := f.GetSnapshot(HasMapping("default", "foo"))
   105  	require.NoError(t, err)
   106  	assert.NotNil(t, snap)
   107  
   108  	// Check that the endpoints resource we created at the start was properly propagated.
   109  	endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo/80"))
   110  	require.NoError(t, err)
   111  	assert.Contains(t, endpoints.Entries, "k8s/default/foo/80")
   112  	assert.Contains(t, endpoints.Entries, "k8s/default/foo/443")
   113  	assert.Contains(t, endpoints.Entries, "k8s/default/foo/cleartext")
   114  	assert.Contains(t, endpoints.Entries, "k8s/default/foo/encrypted")
   115  
   116  	// Make sure 80 and cleartext both map to container port 8080
   117  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
   118  	assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/80"][0].Port)
   119  
   120  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/cleartext"][0].Ip)
   121  	assert.Equal(t, uint32(8080), endpoints.Entries["k8s/default/foo/cleartext"][0].Port)
   122  
   123  	// Make sure 443 and encrypted both map to container port 8443
   124  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/443"][0].Ip)
   125  	assert.Equal(t, uint32(8443), endpoints.Entries["k8s/default/foo/443"][0].Port)
   126  
   127  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/encrypted"][0].Ip)
   128  	assert.Equal(t, uint32(8443), endpoints.Entries["k8s/default/foo/encrypted"][0].Port)
   129  }
   130  
   131  func TestEndpointRoutingIP(t *testing.T) {
   132  	f := entrypoint.RunFake(t, entrypoint.FakeConfig{EnvoyConfig: true}, nil)
   133  	// Create a Mapping that points straight at an IP address.
   134  	assert.NoError(t, f.Upsert(makeMapping("default", "foo", "/foo", "4.3.2.1", "endpoint")))
   135  	f.Flush()
   136  
   137  	// Check that the envoy config embeds the IP address directly in the cluster config.
   138  	config, err := f.GetEnvoyConfig(func(config *v3bootstrap.Bootstrap) bool {
   139  		return FindCluster(config, ClusterNameContains("4_3_2_1")) != nil
   140  	})
   141  	require.NoError(t, err)
   142  	cluster := FindCluster(config, ClusterNameContains("4_3_2_1"))
   143  	require.NotNil(t, cluster)
   144  	require.NotNil(t, cluster.LoadAssignment)
   145  	require.Len(t, cluster.LoadAssignment.Endpoints, 1)
   146  	require.Len(t, cluster.LoadAssignment.Endpoints[0].LbEndpoints, 1)
   147  	ep := cluster.LoadAssignment.Endpoints[0].LbEndpoints[0].GetEndpoint()
   148  	assert.NotNil(t, ep)
   149  	sockAddr := ep.Address.GetSocketAddress()
   150  	assert.Equal(t, "4.3.2.1", sockAddr.Address)
   151  }
   152  
   153  // Test that we resend endpoints when a new mapping is created that references an existing set of
   154  // endpoints.
   155  func TestEndpointRoutingMappingCreation(t *testing.T) {
   156  	f := entrypoint.RunFake(t, entrypoint.FakeConfig{}, nil)
   157  	assert.NoError(t, f.Upsert(makeService("default", "foo")))
   158  	subset, err := makeSubset(8080, "1.2.3.4")
   159  	require.NoError(t, err)
   160  	assert.NoError(t, f.Upsert(makeEndpoints("default", "foo", subset)))
   161  	f.Flush()
   162  	f.AssertEndpointsEmpty(timeout)
   163  	assert.NoError(t, f.UpsertYAML(`
   164  ---
   165  apiVersion: getambassador.io/v3alpha1
   166  kind: Mapping
   167  metadata:
   168    name: foo
   169    namespace: default
   170  spec:
   171    prefix: /foo
   172    resolver: endpoint
   173    service: foo.default
   174  `))
   175  	f.Flush()
   176  	// Check that endpoints get sent even though we did not do actually update endpoints between
   177  	// this flush and the previous one.
   178  	endpoints, err := f.GetEndpoints(HasEndpoints("k8s/default/foo/80"))
   179  	require.NoError(t, err)
   180  	assert.Equal(t, "1.2.3.4", endpoints.Entries["k8s/default/foo/80"][0].Ip)
   181  }
   182  
   183  func ClusterNameContains(substring string) func(*v3cluster.Cluster) bool {
   184  	return func(c *v3cluster.Cluster) bool {
   185  		return strings.Contains(c.Name, substring)
   186  	}
   187  }
   188  
   189  func HasService(namespace, name string) func(snapshot *snapshot.Snapshot) bool {
   190  	return func(snapshot *snapshot.Snapshot) bool {
   191  		for _, m := range snapshot.Kubernetes.Services {
   192  			if m.Namespace == namespace && m.Name == name {
   193  				return true
   194  			}
   195  		}
   196  		return false
   197  	}
   198  }
   199  
   200  func HasMapping(namespace, name string) func(snapshot *snapshot.Snapshot) bool {
   201  	return func(snapshot *snapshot.Snapshot) bool {
   202  		for _, m := range snapshot.Kubernetes.Mappings {
   203  			if m.Namespace == namespace && m.Name == name {
   204  				return true
   205  			}
   206  		}
   207  		return false
   208  	}
   209  }
   210  
   211  func HasEndpoints(path string) func(endpoints *ambex.Endpoints) bool {
   212  	return func(endpoints *ambex.Endpoints) bool {
   213  		_, ok := endpoints.Entries[path]
   214  		return ok
   215  	}
   216  }
   217  
   218  func makeMapping(namespace, name, prefix, service, resolver string) *amb.Mapping {
   219  	return &amb.Mapping{
   220  		TypeMeta:   kates.TypeMeta{Kind: "Mapping"},
   221  		ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name},
   222  		Spec: amb.MappingSpec{
   223  			Prefix:   prefix,
   224  			Service:  service,
   225  			Resolver: resolver,
   226  		},
   227  	}
   228  }
   229  
   230  func makeService(namespace, name string) *kates.Service {
   231  	return &kates.Service{
   232  		TypeMeta:   kates.TypeMeta{Kind: "Service"},
   233  		ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name},
   234  		Spec: kates.ServiceSpec{
   235  			Ports: []kates.ServicePort{
   236  				{
   237  					Port:       80,
   238  					TargetPort: intstr.FromInt(8080),
   239  				},
   240  			},
   241  		},
   242  	}
   243  }
   244  
   245  func makeEndpoints(namespace, name string, subsets ...kates.EndpointSubset) *kates.Endpoints {
   246  	return &kates.Endpoints{
   247  		TypeMeta:   kates.TypeMeta{Kind: "Endpoints"},
   248  		ObjectMeta: kates.ObjectMeta{Namespace: namespace, Name: name},
   249  		Subsets:    subsets,
   250  	}
   251  }
   252  
   253  // makeSubset provides a convenient way to kubernetes EndpointSubset resources. Any int args are
   254  // ports, any ip address strings are addresses, and no ip address strings are used as the port name
   255  // for any ports that follow them in the arg list.
   256  func makeSubset(args ...interface{}) (kates.EndpointSubset, error) {
   257  	portName := ""
   258  	var ports []kates.EndpointPort
   259  	var addrs []kates.EndpointAddress
   260  	for _, arg := range args {
   261  		switch v := arg.(type) {
   262  		case int:
   263  			ports = append(ports, kates.EndpointPort{Name: portName, Port: int32(v), Protocol: kates.ProtocolTCP})
   264  		case string:
   265  			IP := net.ParseIP(v)
   266  			if IP == nil {
   267  				portName = v
   268  			} else {
   269  				addrs = append(addrs, kates.EndpointAddress{IP: v})
   270  			}
   271  		default:
   272  			return kates.EndpointSubset{}, fmt.Errorf("unrecognized type: %T", v)
   273  		}
   274  	}
   275  
   276  	return kates.EndpointSubset{Addresses: addrs, Ports: ports}, nil
   277  }
   278  

View as plain text