...

Source file src/github.com/datawire/ambassador/v2/pkg/gateway/dispatcher_test.go

Documentation: github.com/datawire/ambassador/v2/pkg/gateway

     1  package gateway_test
     2  
     3  import (
     4  	// standard library
     5  	"errors"
     6  	"fmt"
     7  	"testing"
     8  
     9  	// third-party libraries
    10  	"github.com/stretchr/testify/assert"
    11  	"github.com/stretchr/testify/require"
    12  	"google.golang.org/protobuf/types/known/anypb"
    13  
    14  	// envoy api v2
    15  	apiv2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
    16  	apiv2_core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
    17  	apiv2_listener "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/listener"
    18  	apiv2_httpman "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/network/http_connection_manager/v2"
    19  
    20  	// envoy control plane
    21  	ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    22  	ecp_wellknown "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/wellknown"
    23  
    24  	// first-party libraries
    25  	"github.com/datawire/ambassador/v2/pkg/gateway"
    26  	"github.com/datawire/ambassador/v2/pkg/kates"
    27  	"github.com/datawire/dlib/dlog"
    28  )
    29  
    30  func assertErrorContains(t *testing.T, err error, msg string) {
    31  	t.Helper()
    32  	if assert.Error(t, err) {
    33  		assert.Contains(t, err.Error(), msg)
    34  	}
    35  }
    36  
    37  func TestDispatcherRegister(t *testing.T) {
    38  	t.Parallel()
    39  	ctx := dlog.NewTestContext(t, false)
    40  	disp := gateway.NewDispatcher()
    41  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    42  	require.NoError(t, err)
    43  	foo := makeFoo("default", "foo", "bar")
    44  	assert.NoError(t, disp.Upsert(foo))
    45  	l := disp.GetListener(ctx, "bar")
    46  	require.NotNil(t, l)
    47  	assert.Equal(t, "bar", l.Name)
    48  }
    49  
    50  func TestDispatcherDuplicateRegister(t *testing.T) {
    51  	t.Parallel()
    52  	disp := gateway.NewDispatcher()
    53  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    54  	require.NoError(t, err)
    55  	err = disp.Register("Foo", wrapFooCompiler(compile_Foo))
    56  	assertErrorContains(t, err, "duplicate")
    57  }
    58  
    59  func TestIsRegistered(t *testing.T) {
    60  	t.Parallel()
    61  	disp := gateway.NewDispatcher()
    62  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    63  	require.NoError(t, err)
    64  	assert.True(t, disp.IsRegistered("Foo"))
    65  	assert.False(t, disp.IsRegistered("Bar"))
    66  }
    67  
    68  func TestDispatcherFaultIsolation1(t *testing.T) {
    69  	t.Parallel()
    70  	disp := gateway.NewDispatcher()
    71  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    72  	require.NoError(t, err)
    73  	foo := makeFoo("default", "foo", "bang")
    74  	foo.Spec.PanicArg = errors.New("bang bang!")
    75  	err = disp.Upsert(foo)
    76  	assertErrorContains(t, err, "error processing")
    77  }
    78  
    79  func TestDispatcherFaultIsolation2(t *testing.T) {
    80  	t.Parallel()
    81  	disp := gateway.NewDispatcher()
    82  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    83  	require.NoError(t, err)
    84  	foo := makeFoo("default", "foo", "bang")
    85  	foo.Spec.PanicArg = errors.New("bang bang!")
    86  	err = disp.Upsert(foo)
    87  	assertErrorContains(t, err, "error processing")
    88  }
    89  
    90  func TestDispatcherTransformError(t *testing.T) {
    91  	t.Parallel()
    92  	disp := gateway.NewDispatcher()
    93  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithErrors))
    94  	require.NoError(t, err)
    95  	foo := makeFoo("default", "foo", "bar")
    96  	err = disp.Upsert(foo)
    97  	require.NoError(t, err)
    98  
    99  	errors := disp.GetErrors()
   100  	require.Len(t, errors, 6)
   101  	assert.Equal(t, "Foo foo.default", errors[0].Source.Location())
   102  	assert.Equal(t, "this is an error", errors[0].Error)
   103  
   104  	assert.Equal(t, "listener 1 in Foo foo.default", errors[1].Source.Location())
   105  	assert.Equal(t, "this is a listener error", errors[1].Error)
   106  
   107  	assert.Equal(t, "route in Foo foo.default", errors[2].Source.Location())
   108  	assert.Equal(t, "this is a route error", errors[2].Error)
   109  
   110  	assert.Equal(t, "clusterRef in Foo foo.default", errors[3].Source.Location())
   111  	assert.Equal(t, "this is a clusterRef error", errors[3].Error)
   112  
   113  	assert.Equal(t, "cluster in Foo foo.default", errors[4].Source.Location())
   114  	assert.Equal(t, "this is a cluster error", errors[4].Error)
   115  
   116  	assert.Equal(t, "load assignment in Foo foo.default", errors[5].Source.Location())
   117  	assert.Equal(t, "this is a load assignment error", errors[5].Error)
   118  }
   119  
   120  func compile_FooWithErrors(f *Foo) (*gateway.CompiledConfig, error) {
   121  	src := gateway.SourceFromResource(f)
   122  	return &gateway.CompiledConfig{
   123  		CompiledItem: gateway.NewCompiledItemError(src, "this is an error"),
   124  		Listeners: []*gateway.CompiledListener{
   125  			{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("listener %d in %s", 1, src),
   126  				"this is a listener error")},
   127  		},
   128  		Routes: []*gateway.CompiledRoute{
   129  			{
   130  				CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("route in %s", src), "this is a route error"),
   131  				ClusterRefs:  []*gateway.ClusterRef{{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("clusterRef in %s", src), "this is a clusterRef error")}},
   132  			},
   133  		},
   134  		Clusters: []*gateway.CompiledCluster{
   135  			{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("cluster in %s", src),
   136  				"this is a cluster error")},
   137  		},
   138  		LoadAssignments: []*gateway.CompiledLoadAssignment{
   139  			{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("load assignment in %s", src),
   140  				"this is a load assignment error")},
   141  		},
   142  	}, nil
   143  }
   144  
   145  func TestDispatcherNoTransform(t *testing.T) {
   146  	t.Parallel()
   147  	disp := gateway.NewDispatcher()
   148  	foo := makeFoo("default", "foo", "bar")
   149  	err := disp.Upsert(foo)
   150  	assertErrorContains(t, err, "no transform for kind")
   151  }
   152  
   153  func TestDispatcherDelete(t *testing.T) {
   154  	t.Parallel()
   155  	ctx := dlog.NewTestContext(t, false)
   156  	disp := gateway.NewDispatcher()
   157  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
   158  	require.NoError(t, err)
   159  	foo := makeFoo("default", "foo", "bar")
   160  	assert.NoError(t, disp.Upsert(foo))
   161  	l := disp.GetListener(ctx, "bar")
   162  	require.NotNil(t, l)
   163  	assert.Equal(t, "bar", l.Name)
   164  	disp.Delete(foo)
   165  	l = disp.GetListener(ctx, "bar")
   166  	require.Nil(t, l)
   167  }
   168  
   169  func TestDispatcherDeleteKey(t *testing.T) {
   170  	t.Parallel()
   171  	ctx := dlog.NewTestContext(t, false)
   172  	disp := gateway.NewDispatcher()
   173  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
   174  	require.NoError(t, err)
   175  	foo := makeFoo("default", "foo", "bar")
   176  	assert.NoError(t, disp.Upsert(foo))
   177  	l := disp.GetListener(ctx, "bar")
   178  	require.NotNil(t, l)
   179  	assert.Equal(t, "bar", l.Name)
   180  	disp.DeleteKey("Foo", "default", "foo")
   181  	l = disp.GetListener(ctx, "bar")
   182  	require.Nil(t, l)
   183  }
   184  
   185  func compile_Foo(f *Foo) (*gateway.CompiledConfig, error) {
   186  	if f.Spec.Value == "bang" {
   187  		return nil, f.Spec.PanicArg
   188  	}
   189  	return &gateway.CompiledConfig{
   190  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   191  		Listeners: []*gateway.CompiledListener{
   192  			{
   193  				Listener: &apiv2.Listener{Name: f.Spec.Value},
   194  			},
   195  		},
   196  	}, nil
   197  }
   198  
   199  func TestDispatcherUpsertYamlErr(t *testing.T) {
   200  	t.Parallel()
   201  	disp := gateway.NewDispatcher()
   202  	err := disp.UpsertYaml("{")
   203  	assertErrorContains(t, err, "error converting")
   204  	err = disp.UpsertYaml(`
   205  ---
   206  kind: Gatewayyyy
   207  apiVersion: networking.x-k8s.io/v1alpha1
   208  metadata:
   209    name: my-gateway
   210  spec:
   211    listeners:
   212    - protocol: HTTP
   213      port: 8080
   214  `)
   215  	assertErrorContains(t, err, "no transform for kind")
   216  }
   217  
   218  func TestDispatcherAssemblyWithRouteConfg(t *testing.T) {
   219  	t.Parallel()
   220  	ctx := dlog.NewTestContext(t, false)
   221  	disp := gateway.NewDispatcher()
   222  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithRouteConfigName))
   223  	require.NoError(t, err)
   224  	foo := makeFoo("default", "foo", "bar")
   225  	assert.NoError(t, disp.Upsert(foo))
   226  	l := disp.GetListener(ctx, "bar")
   227  	require.NotNil(t, l)
   228  	assert.Equal(t, "bar", l.Name)
   229  	r := disp.GetRouteConfiguration(ctx, "bar-routeconfig")
   230  	require.NotNil(t, r)
   231  	assert.Equal(t, "bar-routeconfig", r.Name)
   232  }
   233  
   234  func compile_FooWithRouteConfigName(f *Foo) (*gateway.CompiledConfig, error) {
   235  	if f.Spec.Value == "bang" {
   236  		return nil, f.Spec.PanicArg
   237  	}
   238  
   239  	name := f.Spec.Value
   240  	rcName := fmt.Sprintf("%s-routeconfig", name)
   241  
   242  	hcm := &apiv2_httpman.HttpConnectionManager{
   243  		StatPrefix: name,
   244  		HttpFilters: []*apiv2_httpman.HttpFilter{
   245  			{Name: ecp_wellknown.CORS},
   246  			{Name: ecp_wellknown.Router},
   247  		},
   248  		RouteSpecifier: &apiv2_httpman.HttpConnectionManager_Rds{
   249  			Rds: &apiv2_httpman.Rds{
   250  				ConfigSource: &apiv2_core.ConfigSource{
   251  					ConfigSourceSpecifier: &apiv2_core.ConfigSource_Ads{
   252  						Ads: &apiv2_core.AggregatedConfigSource{},
   253  					},
   254  				},
   255  				RouteConfigName: rcName,
   256  			},
   257  		},
   258  	}
   259  	hcmAny, err := anypb.New(hcm)
   260  	if err != nil {
   261  		return nil, err
   262  	}
   263  
   264  	l := &apiv2.Listener{
   265  		Name: name,
   266  		FilterChains: []*apiv2_listener.FilterChain{
   267  			{
   268  				Filters: []*apiv2_listener.Filter{
   269  					{
   270  						Name:       ecp_wellknown.HTTPConnectionManager,
   271  						ConfigType: &apiv2_listener.Filter_TypedConfig{TypedConfig: hcmAny},
   272  					},
   273  				},
   274  			},
   275  		},
   276  	}
   277  
   278  	return &gateway.CompiledConfig{
   279  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   280  		Listeners:    []*gateway.CompiledListener{{Listener: l}},
   281  	}, nil
   282  }
   283  
   284  func TestDispatcherAssemblyWithEmptyRouteConfigName(t *testing.T) {
   285  	t.Parallel()
   286  	ctx := dlog.NewTestContext(t, false)
   287  	disp := gateway.NewDispatcher()
   288  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithEmptyRouteConfigName))
   289  	require.NoError(t, err)
   290  	foo := makeFoo("default", "foo", "bar")
   291  	assert.NoError(t, disp.Upsert(foo))
   292  	l := disp.GetListener(ctx, "bar")
   293  	require.NotNil(t, l)
   294  	assert.Equal(t, "bar", l.Name)
   295  	// This is a bit weird, but the go control plane's consistency check seems to imply that an
   296  	// empty route config name is ok.
   297  	r := disp.GetRouteConfiguration(ctx, "")
   298  	require.NotNil(t, r)
   299  	assert.Equal(t, "", r.Name)
   300  }
   301  
   302  func compile_FooWithEmptyRouteConfigName(f *Foo) (*gateway.CompiledConfig, error) {
   303  	name := f.Spec.Value
   304  
   305  	hcm := &apiv2_httpman.HttpConnectionManager{
   306  		StatPrefix: name,
   307  		HttpFilters: []*apiv2_httpman.HttpFilter{
   308  			{Name: ecp_wellknown.CORS},
   309  			{Name: ecp_wellknown.Router},
   310  		},
   311  		RouteSpecifier: &apiv2_httpman.HttpConnectionManager_Rds{
   312  			Rds: &apiv2_httpman.Rds{
   313  				ConfigSource: &apiv2_core.ConfigSource{
   314  					ConfigSourceSpecifier: &apiv2_core.ConfigSource_Ads{
   315  						Ads: &apiv2_core.AggregatedConfigSource{},
   316  					},
   317  				},
   318  			},
   319  		},
   320  	}
   321  	hcmAny, err := anypb.New(hcm)
   322  	if err != nil {
   323  		return nil, err
   324  	}
   325  
   326  	l := &apiv2.Listener{
   327  		Name: name,
   328  		FilterChains: []*apiv2_listener.FilterChain{
   329  			{
   330  				Filters: []*apiv2_listener.Filter{
   331  					{
   332  						Name:       ecp_wellknown.HTTPConnectionManager,
   333  						ConfigType: &apiv2_listener.Filter_TypedConfig{TypedConfig: hcmAny},
   334  					},
   335  				},
   336  			},
   337  		},
   338  	}
   339  
   340  	return &gateway.CompiledConfig{
   341  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   342  		Listeners:    []*gateway.CompiledListener{{Listener: l}},
   343  	}, nil
   344  }
   345  
   346  func TestDispatcherAssemblyWithoutRds(t *testing.T) {
   347  	t.Parallel()
   348  	ctx := dlog.NewTestContext(t, false)
   349  	disp := gateway.NewDispatcher()
   350  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithoutRds))
   351  	require.NoError(t, err)
   352  	foo := makeFoo("default", "foo", "bar")
   353  	assert.NoError(t, disp.Upsert(foo))
   354  	l := disp.GetListener(ctx, "bar")
   355  	require.NotNil(t, l)
   356  	assert.Equal(t, "bar", l.Name)
   357  	r := disp.GetRouteConfiguration(ctx, "bar")
   358  	require.Nil(t, r)
   359  }
   360  
   361  func compile_FooWithoutRds(f *Foo) (*gateway.CompiledConfig, error) {
   362  	name := f.Spec.Value
   363  
   364  	hcm := &apiv2_httpman.HttpConnectionManager{
   365  		StatPrefix: name,
   366  		HttpFilters: []*apiv2_httpman.HttpFilter{
   367  			{Name: ecp_wellknown.CORS},
   368  			{Name: ecp_wellknown.Router},
   369  		},
   370  	}
   371  	hcmAny, err := anypb.New(hcm)
   372  	if err != nil {
   373  		return nil, err
   374  	}
   375  
   376  	l := &apiv2.Listener{
   377  		Name: name,
   378  		FilterChains: []*apiv2_listener.FilterChain{
   379  			{
   380  				Filters: []*apiv2_listener.Filter{
   381  					{
   382  						Name: ecp_wellknown.RateLimit,
   383  					},
   384  					{
   385  						Name:       ecp_wellknown.HTTPConnectionManager,
   386  						ConfigType: &apiv2_listener.Filter_TypedConfig{TypedConfig: hcmAny},
   387  					},
   388  				},
   389  			},
   390  		},
   391  	}
   392  
   393  	return &gateway.CompiledConfig{
   394  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   395  		Listeners:    []*gateway.CompiledListener{{Listener: l}},
   396  	}, nil
   397  }
   398  
   399  func TestDispatcherAssemblyEndpointDefaulting(t *testing.T) {
   400  	t.Parallel()
   401  	ctx := dlog.NewTestContext(t, false)
   402  	disp := gateway.NewDispatcher()
   403  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithClusterRefs))
   404  	require.NoError(t, err)
   405  	foo := makeFoo("default", "foo", "bar")
   406  	err = disp.Upsert(foo)
   407  	require.NoError(t, err)
   408  	_, snap := disp.GetSnapshot(ctx)
   409  	found := false
   410  	for _, r := range snap.Resources[ecp_cache_types.Endpoint].Items {
   411  		cla := r.Resource.(*apiv2.ClusterLoadAssignment)
   412  		if cla.ClusterName == "foo" && len(cla.Endpoints) == 0 {
   413  			found = true
   414  		}
   415  	}
   416  	if !found {
   417  		assert.Fail(t, "no defaulted cluster load assignment")
   418  	}
   419  }
   420  
   421  func wrapFooCompiler(inner func(*Foo) (*gateway.CompiledConfig, error)) func(kates.Object) (*gateway.CompiledConfig, error) {
   422  	return func(untyped kates.Object) (*gateway.CompiledConfig, error) {
   423  		return inner(untyped.(*Foo))
   424  	}
   425  }
   426  
   427  func compile_FooWithClusterRefs(f *Foo) (*gateway.CompiledConfig, error) {
   428  	return &gateway.CompiledConfig{
   429  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   430  		Routes: []*gateway.CompiledRoute{{
   431  			ClusterRefs: []*gateway.ClusterRef{{Name: "foo"}},
   432  		}},
   433  	}, nil
   434  }
   435  
   436  func TestDispatcherAssemblyEndpointWatches(t *testing.T) {
   437  	t.Parallel()
   438  	ctx := dlog.NewTestContext(t, false)
   439  	disp := gateway.NewDispatcher()
   440  	err := disp.Register("Foo", wrapFooCompiler(compile_FooEndpointWatches))
   441  	require.NoError(t, err)
   442  	foo := makeFoo("default", "foo", "bar")
   443  	err = disp.Upsert(foo)
   444  	require.NoError(t, err)
   445  	disp.GetSnapshot(ctx)
   446  	assert.True(t, disp.IsWatched("foo-ns", "foo"))
   447  }
   448  
   449  func compile_FooEndpointWatches(f *Foo) (*gateway.CompiledConfig, error) {
   450  	return &gateway.CompiledConfig{
   451  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   452  		Routes: []*gateway.CompiledRoute{{
   453  			CompiledItem: gateway.CompiledItem{Source: gateway.SourceFromResource(f), Namespace: "foo-ns"},
   454  			ClusterRefs:  []*gateway.ClusterRef{{Name: "foo"}},
   455  		}},
   456  	}, nil
   457  }
   458  

View as plain text