...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/gateway/dispatcher_test.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/gateway

     1  package gateway_test
     2  
     3  import (
     4  	// standard library
     5  	"errors"
     6  	"fmt"
     7  	"testing"
     8  
     9  	// third-party libraries
    10  	"github.com/stretchr/testify/assert"
    11  	"github.com/stretchr/testify/require"
    12  	"google.golang.org/protobuf/types/known/anypb"
    13  
    14  	// envoy api v3
    15  	v3core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
    16  	v3endpoint "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/endpoint/v3"
    17  	v3listener "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/listener/v3"
    18  	v3httpman "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/extensions/filters/network/http_connection_manager/v3"
    19  
    20  	// envoy control plane
    21  	ecp_cache_types "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
    22  	ecp_wellknown "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/wellknown"
    23  
    24  	// first-party libraries
    25  	"github.com/datawire/dlib/dlog"
    26  	"github.com/emissary-ingress/emissary/v3/pkg/gateway"
    27  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    28  )
    29  
    30  func assertErrorContains(t *testing.T, err error, msg string) {
    31  	t.Helper()
    32  	if assert.Error(t, err) {
    33  		assert.Contains(t, err.Error(), msg)
    34  	}
    35  }
    36  
    37  func TestDispatcherRegister(t *testing.T) {
    38  	t.Parallel()
    39  	ctx := dlog.NewTestContext(t, false)
    40  	disp := gateway.NewDispatcher()
    41  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    42  	require.NoError(t, err)
    43  	foo := makeFoo("default", "foo", "bar")
    44  	assert.NoError(t, disp.Upsert(foo))
    45  	l := disp.GetListener(ctx, "bar")
    46  	require.NotNil(t, l)
    47  	assert.Equal(t, "bar", l.Name)
    48  }
    49  
    50  func TestDispatcherDuplicateRegister(t *testing.T) {
    51  	t.Parallel()
    52  	disp := gateway.NewDispatcher()
    53  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    54  	require.NoError(t, err)
    55  	err = disp.Register("Foo", wrapFooCompiler(compile_Foo))
    56  	assertErrorContains(t, err, "duplicate")
    57  }
    58  
    59  func TestIsRegistered(t *testing.T) {
    60  	t.Parallel()
    61  	disp := gateway.NewDispatcher()
    62  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    63  	require.NoError(t, err)
    64  	assert.True(t, disp.IsRegistered("Foo"))
    65  	assert.False(t, disp.IsRegistered("Bar"))
    66  }
    67  
    68  func TestDispatcherFaultIsolation1(t *testing.T) {
    69  	t.Parallel()
    70  	disp := gateway.NewDispatcher()
    71  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    72  	require.NoError(t, err)
    73  	foo := makeFoo("default", "foo", "bang")
    74  	foo.Spec.PanicArg = errors.New("bang bang!")
    75  	err = disp.Upsert(foo)
    76  	assertErrorContains(t, err, "error processing")
    77  }
    78  
    79  func TestDispatcherFaultIsolation2(t *testing.T) {
    80  	t.Parallel()
    81  	disp := gateway.NewDispatcher()
    82  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
    83  	require.NoError(t, err)
    84  	foo := makeFoo("default", "foo", "bang")
    85  	foo.Spec.PanicArg = errors.New("bang bang!")
    86  	err = disp.Upsert(foo)
    87  	assertErrorContains(t, err, "error processing")
    88  }
    89  
    90  func TestDispatcherTransformError(t *testing.T) {
    91  	t.Parallel()
    92  	disp := gateway.NewDispatcher()
    93  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithErrors))
    94  	require.NoError(t, err)
    95  	foo := makeFoo("default", "foo", "bar")
    96  	err = disp.Upsert(foo)
    97  	require.NoError(t, err)
    98  
    99  	errors := disp.GetErrors()
   100  	require.Len(t, errors, 6)
   101  	assert.Equal(t, "Foo foo.default", errors[0].Source.Location())
   102  	assert.Equal(t, "this is an error", errors[0].Error)
   103  
   104  	assert.Equal(t, "listener 1 in Foo foo.default", errors[1].Source.Location())
   105  	assert.Equal(t, "this is a listener error", errors[1].Error)
   106  
   107  	assert.Equal(t, "route in Foo foo.default", errors[2].Source.Location())
   108  	assert.Equal(t, "this is a route error", errors[2].Error)
   109  
   110  	assert.Equal(t, "clusterRef in Foo foo.default", errors[3].Source.Location())
   111  	assert.Equal(t, "this is a clusterRef error", errors[3].Error)
   112  
   113  	assert.Equal(t, "cluster in Foo foo.default", errors[4].Source.Location())
   114  	assert.Equal(t, "this is a cluster error", errors[4].Error)
   115  
   116  	assert.Equal(t, "load assignment in Foo foo.default", errors[5].Source.Location())
   117  	assert.Equal(t, "this is a load assignment error", errors[5].Error)
   118  }
   119  
   120  func compile_FooWithErrors(f *Foo) (*gateway.CompiledConfig, error) {
   121  	src := gateway.SourceFromResource(f)
   122  	return &gateway.CompiledConfig{
   123  		CompiledItem: gateway.NewCompiledItemError(src, "this is an error"),
   124  		Listeners: []*gateway.CompiledListener{
   125  			{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("listener %d in %s", 1, src),
   126  				"this is a listener error")},
   127  		},
   128  		Routes: []*gateway.CompiledRoute{
   129  			{
   130  				CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("route in %s", src), "this is a route error"),
   131  				ClusterRefs:  []*gateway.ClusterRef{{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("clusterRef in %s", src), "this is a clusterRef error")}},
   132  			},
   133  		},
   134  		Clusters: []*gateway.CompiledCluster{
   135  			{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("cluster in %s", src),
   136  				"this is a cluster error")},
   137  		},
   138  		LoadAssignments: []*gateway.CompiledLoadAssignment{
   139  			{CompiledItem: gateway.NewCompiledItemError(gateway.Sourcef("load assignment in %s", src),
   140  				"this is a load assignment error")},
   141  		},
   142  	}, nil
   143  }
   144  
   145  func TestDispatcherNoTransform(t *testing.T) {
   146  	t.Parallel()
   147  	disp := gateway.NewDispatcher()
   148  	foo := makeFoo("default", "foo", "bar")
   149  	err := disp.Upsert(foo)
   150  	assertErrorContains(t, err, "no transform for kind")
   151  }
   152  
   153  func TestDispatcherDelete(t *testing.T) {
   154  	t.Parallel()
   155  	ctx := dlog.NewTestContext(t, false)
   156  	disp := gateway.NewDispatcher()
   157  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
   158  	require.NoError(t, err)
   159  	foo := makeFoo("default", "foo", "bar")
   160  	assert.NoError(t, disp.Upsert(foo))
   161  	l := disp.GetListener(ctx, "bar")
   162  	require.NotNil(t, l)
   163  	assert.Equal(t, "bar", l.Name)
   164  	disp.Delete(foo)
   165  	l = disp.GetListener(ctx, "bar")
   166  	require.Nil(t, l)
   167  }
   168  
   169  func TestDispatcherDeleteKey(t *testing.T) {
   170  	t.Parallel()
   171  	ctx := dlog.NewTestContext(t, false)
   172  	disp := gateway.NewDispatcher()
   173  	err := disp.Register("Foo", wrapFooCompiler(compile_Foo))
   174  	require.NoError(t, err)
   175  	foo := makeFoo("default", "foo", "bar")
   176  	assert.NoError(t, disp.Upsert(foo))
   177  	l := disp.GetListener(ctx, "bar")
   178  	require.NotNil(t, l)
   179  	assert.Equal(t, "bar", l.Name)
   180  	disp.DeleteKey("Foo", "default", "foo")
   181  	l = disp.GetListener(ctx, "bar")
   182  	require.Nil(t, l)
   183  }
   184  
   185  func compile_Foo(f *Foo) (*gateway.CompiledConfig, error) {
   186  	if f.Spec.Value == "bang" {
   187  		return nil, f.Spec.PanicArg
   188  	}
   189  	return &gateway.CompiledConfig{
   190  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   191  		Listeners: []*gateway.CompiledListener{
   192  			{
   193  				Listener: &v3listener.Listener{Name: f.Spec.Value},
   194  			},
   195  		},
   196  	}, nil
   197  }
   198  
   199  func TestDispatcherUpsertYamlErr(t *testing.T) {
   200  	t.Parallel()
   201  	disp := gateway.NewDispatcher()
   202  	err := disp.UpsertYaml("{")
   203  	assertErrorContains(t, err, "error converting")
   204  	err = disp.UpsertYaml(`
   205  ---
   206  kind: Gatewayyyy
   207  apiVersion: networking.x-k8s.io/v1alpha1
   208  metadata:
   209    name: my-gateway
   210  spec:
   211    listeners:
   212    - protocol: HTTP
   213      port: 8080
   214  `)
   215  	assertErrorContains(t, err, "no transform for kind")
   216  }
   217  
   218  func TestDispatcherAssemblyWithRouteConfg(t *testing.T) {
   219  	t.Parallel()
   220  	ctx := dlog.NewTestContext(t, false)
   221  	disp := gateway.NewDispatcher()
   222  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithRouteConfigName))
   223  	require.NoError(t, err)
   224  	foo := makeFoo("default", "foo", "bar")
   225  	assert.NoError(t, disp.Upsert(foo))
   226  	l := disp.GetListener(ctx, "bar")
   227  	require.NotNil(t, l)
   228  	assert.Equal(t, "bar", l.Name)
   229  	r := disp.GetRouteConfiguration(ctx, "bar-routeconfig")
   230  	require.NotNil(t, r)
   231  	assert.Equal(t, "bar-routeconfig", r.Name)
   232  }
   233  
   234  func compile_FooWithRouteConfigName(f *Foo) (*gateway.CompiledConfig, error) {
   235  	if f.Spec.Value == "bang" {
   236  		return nil, f.Spec.PanicArg
   237  	}
   238  
   239  	name := f.Spec.Value
   240  	rcName := fmt.Sprintf("%s-routeconfig", name)
   241  
   242  	hcm := &v3httpman.HttpConnectionManager{
   243  		StatPrefix: name,
   244  		HttpFilters: []*v3httpman.HttpFilter{
   245  			{Name: ecp_wellknown.CORS},
   246  			{Name: ecp_wellknown.Router},
   247  		},
   248  		RouteSpecifier: &v3httpman.HttpConnectionManager_Rds{
   249  			Rds: &v3httpman.Rds{
   250  				ConfigSource: &v3core.ConfigSource{
   251  					ConfigSourceSpecifier: &v3core.ConfigSource_Ads{
   252  						Ads: &v3core.AggregatedConfigSource{},
   253  					},
   254  				},
   255  				RouteConfigName: rcName,
   256  			},
   257  		},
   258  	}
   259  	hcmAny, err := anypb.New(hcm)
   260  	if err != nil {
   261  		return nil, err
   262  	}
   263  
   264  	l := &v3listener.Listener{
   265  		Name: name,
   266  		FilterChains: []*v3listener.FilterChain{
   267  			{
   268  				Filters: []*v3listener.Filter{
   269  					{
   270  						Name:       ecp_wellknown.HTTPConnectionManager,
   271  						ConfigType: &v3listener.Filter_TypedConfig{TypedConfig: hcmAny},
   272  					},
   273  				},
   274  			},
   275  		},
   276  	}
   277  
   278  	return &gateway.CompiledConfig{
   279  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   280  		Listeners:    []*gateway.CompiledListener{{Listener: l}},
   281  	}, nil
   282  }
   283  
   284  func TestDispatcherAssemblyWithEmptyRouteConfigName(t *testing.T) {
   285  	t.Parallel()
   286  
   287  	ctx := dlog.NewTestContext(t, false)
   288  
   289  	disp := gateway.NewDispatcher()
   290  
   291  	err := disp.Register("Foo", wrapFooCompiler(compileFooWithEmptyRouteConfigName))
   292  	require.NoError(t, err)
   293  
   294  	foo := makeFoo("default", "foo", "bar")
   295  	err = disp.Upsert(foo)
   296  	assert.NoError(t, err)
   297  
   298  	// due to inconsistent SanptShot the listener returned should be nil
   299  	listener := disp.GetListener(ctx, "bar")
   300  	require.Nil(t, listener)
   301  }
   302  
   303  // compileFooWithEmptyRouteConfigName generates invalid RDS route configuration due to the
   304  // RouteConfigname being empty. This will lead to an inconsistent snapshot, so calls
   305  // to GetSnapShot will return a nil snapshot
   306  func compileFooWithEmptyRouteConfigName(f *Foo) (*gateway.CompiledConfig, error) {
   307  	name := f.Spec.Value
   308  
   309  	hcm := &v3httpman.HttpConnectionManager{
   310  		StatPrefix: name,
   311  		HttpFilters: []*v3httpman.HttpFilter{
   312  			{Name: ecp_wellknown.CORS},
   313  			{Name: ecp_wellknown.Router},
   314  		},
   315  		RouteSpecifier: &v3httpman.HttpConnectionManager_Rds{
   316  			Rds: &v3httpman.Rds{
   317  				// explicitly adding RDS Config with no name to trigger snapshot Consistency to fail
   318  				ConfigSource: &v3core.ConfigSource{
   319  					ConfigSourceSpecifier: &v3core.ConfigSource_Ads{
   320  						Ads: &v3core.AggregatedConfigSource{},
   321  					},
   322  				},
   323  			},
   324  		},
   325  	}
   326  	hcmAny, err := anypb.New(hcm)
   327  	if err != nil {
   328  		return nil, err
   329  	}
   330  
   331  	l := &v3listener.Listener{
   332  		Name: name,
   333  		FilterChains: []*v3listener.FilterChain{
   334  			{
   335  				Filters: []*v3listener.Filter{
   336  					{
   337  						Name:       ecp_wellknown.HTTPConnectionManager,
   338  						ConfigType: &v3listener.Filter_TypedConfig{TypedConfig: hcmAny},
   339  					},
   340  				},
   341  			},
   342  		},
   343  	}
   344  
   345  	return &gateway.CompiledConfig{
   346  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   347  		Listeners:    []*gateway.CompiledListener{{Listener: l}},
   348  	}, nil
   349  }
   350  
   351  func TestDispatcherAssemblyWithoutRds(t *testing.T) {
   352  	t.Parallel()
   353  	ctx := dlog.NewTestContext(t, false)
   354  	disp := gateway.NewDispatcher()
   355  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithoutRds))
   356  	require.NoError(t, err)
   357  	foo := makeFoo("default", "foo", "bar")
   358  	assert.NoError(t, disp.Upsert(foo))
   359  	l := disp.GetListener(ctx, "bar")
   360  	require.NotNil(t, l)
   361  	assert.Equal(t, "bar", l.Name)
   362  	r := disp.GetRouteConfiguration(ctx, "bar")
   363  	require.Nil(t, r)
   364  }
   365  
   366  func compile_FooWithoutRds(f *Foo) (*gateway.CompiledConfig, error) {
   367  	name := f.Spec.Value
   368  
   369  	hcm := &v3httpman.HttpConnectionManager{
   370  		StatPrefix: name,
   371  		HttpFilters: []*v3httpman.HttpFilter{
   372  			{Name: ecp_wellknown.CORS},
   373  			{Name: ecp_wellknown.Router},
   374  		},
   375  	}
   376  	hcmAny, err := anypb.New(hcm)
   377  	if err != nil {
   378  		return nil, err
   379  	}
   380  
   381  	l := &v3listener.Listener{
   382  		Name: name,
   383  		FilterChains: []*v3listener.FilterChain{
   384  			{
   385  				Filters: []*v3listener.Filter{
   386  					{
   387  						Name: ecp_wellknown.RateLimit,
   388  					},
   389  					{
   390  						Name:       ecp_wellknown.HTTPConnectionManager,
   391  						ConfigType: &v3listener.Filter_TypedConfig{TypedConfig: hcmAny},
   392  					},
   393  				},
   394  			},
   395  		},
   396  	}
   397  
   398  	return &gateway.CompiledConfig{
   399  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   400  		Listeners:    []*gateway.CompiledListener{{Listener: l}},
   401  	}, nil
   402  }
   403  
   404  func TestDispatcherAssemblyEndpointDefaulting(t *testing.T) {
   405  	t.Parallel()
   406  	ctx := dlog.NewTestContext(t, false)
   407  	disp := gateway.NewDispatcher()
   408  	err := disp.Register("Foo", wrapFooCompiler(compile_FooWithClusterRefs))
   409  	require.NoError(t, err)
   410  	foo := makeFoo("default", "foo", "bar")
   411  	err = disp.Upsert(foo)
   412  	require.NoError(t, err)
   413  
   414  	_, snapshot := disp.GetSnapshot(ctx)
   415  	if snapshot == nil {
   416  		assert.Fail(t, "unable to get a valid consistent snapshot")
   417  		return // ensure that linter is happy due to possible nil pointer dereference below
   418  	}
   419  
   420  	found := false
   421  	for _, r := range snapshot.Resources[ecp_cache_types.Endpoint].Items {
   422  		cla := r.Resource.(*v3endpoint.ClusterLoadAssignment)
   423  		if cla.ClusterName == "foo" && len(cla.Endpoints) == 0 {
   424  			found = true
   425  		}
   426  	}
   427  	if !found {
   428  		assert.Fail(t, "no defaulted cluster load assignment")
   429  	}
   430  }
   431  
   432  func wrapFooCompiler(inner func(*Foo) (*gateway.CompiledConfig, error)) func(kates.Object) (*gateway.CompiledConfig, error) {
   433  	return func(untyped kates.Object) (*gateway.CompiledConfig, error) {
   434  		return inner(untyped.(*Foo))
   435  	}
   436  }
   437  
   438  func compile_FooWithClusterRefs(f *Foo) (*gateway.CompiledConfig, error) {
   439  	return &gateway.CompiledConfig{
   440  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   441  		Routes: []*gateway.CompiledRoute{{
   442  			ClusterRefs: []*gateway.ClusterRef{{Name: "foo"}},
   443  		}},
   444  	}, nil
   445  }
   446  
   447  func TestDispatcherAssemblyEndpointWatches(t *testing.T) {
   448  	t.Parallel()
   449  	ctx := dlog.NewTestContext(t, false)
   450  	disp := gateway.NewDispatcher()
   451  	err := disp.Register("Foo", wrapFooCompiler(compile_FooEndpointWatches))
   452  	require.NoError(t, err)
   453  	foo := makeFoo("default", "foo", "bar")
   454  	err = disp.Upsert(foo)
   455  	require.NoError(t, err)
   456  	disp.GetSnapshot(ctx)
   457  	assert.True(t, disp.IsWatched("foo-ns", "foo"))
   458  }
   459  
   460  func compile_FooEndpointWatches(f *Foo) (*gateway.CompiledConfig, error) {
   461  	return &gateway.CompiledConfig{
   462  		CompiledItem: gateway.NewCompiledItem(gateway.SourceFromResource(f)),
   463  		Routes: []*gateway.CompiledRoute{{
   464  			CompiledItem: gateway.CompiledItem{Source: gateway.SourceFromResource(f), Namespace: "foo-ns"},
   465  			ClusterRefs:  []*gateway.ClusterRef{{Name: "foo"}},
   466  		}},
   467  	}, nil
   468  }
   469  

View as plain text