...

Source file src/github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3/simple_test.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3

     1  // Code generated by create_version. DO NOT EDIT.
     2  // Copyright 2018 Envoyproxy Authors
     3  //
     4  //   Licensed under the Apache License, Version 2.0 (the "License");
     5  //   you may not use this file except in compliance with the License.
     6  //   You may obtain a copy of the License at
     7  //
     8  //       http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  //   Unless required by applicable law or agreed to in writing, software
    11  //   distributed under the License is distributed on an "AS IS" BASIS,
    12  //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  //   See the License for the specific language governing permissions and
    14  //   limitations under the License.
    15  
    16  package cache_test
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"reflect"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	core "github.com/datawire/ambassador/v2/pkg/api/envoy/config/core/v3"
    27  	discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
    28  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    29  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3"
    30  	rsrc "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/resource/v3"
    31  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/test/resource/v3"
    32  )
    33  
    34  type group struct{}
    35  
    36  const (
    37  	key = "node"
    38  )
    39  
    40  func (group) ID(node *core.Node) string {
    41  	if node != nil {
    42  		return node.Id
    43  	}
    44  	return key
    45  }
    46  
    47  var (
    48  	version  = "x"
    49  	version2 = "y"
    50  
    51  	snapshot = cache.NewSnapshot(version,
    52  		[]types.Resource{testEndpoint},
    53  		[]types.Resource{testCluster},
    54  		[]types.Resource{testRoute},
    55  		[]types.Resource{testListener},
    56  		[]types.Resource{testRuntime},
    57  		[]types.Resource{testSecret[0]})
    58  
    59  	ttl       = 2 * time.Second
    60  	heartbeat = time.Second
    61  
    62  	snapshotWithTtl = cache.NewSnapshotWithTtls(version,
    63  		[]types.ResourceWithTtl{{Resource: testEndpoint, Ttl: &ttl}},
    64  		[]types.ResourceWithTtl{{Resource: testCluster}},
    65  		[]types.ResourceWithTtl{{Resource: testRoute}},
    66  		[]types.ResourceWithTtl{{Resource: testListener}},
    67  		[]types.ResourceWithTtl{{Resource: testRuntime}},
    68  		[]types.ResourceWithTtl{{Resource: testSecret[0]}})
    69  
    70  	names = map[string][]string{
    71  		rsrc.EndpointType: {clusterName},
    72  		rsrc.ClusterType:  nil,
    73  		rsrc.RouteType:    {routeName},
    74  		rsrc.ListenerType: nil,
    75  		rsrc.RuntimeType:  nil,
    76  	}
    77  
    78  	testTypes = []string{
    79  		rsrc.EndpointType,
    80  		rsrc.ClusterType,
    81  		rsrc.RouteType,
    82  		rsrc.ListenerType,
    83  		rsrc.RuntimeType,
    84  	}
    85  )
    86  
    87  type logger struct {
    88  	t *testing.T
    89  }
    90  
    91  func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
    92  func (log logger) Infof(format string, args ...interface{})  { log.t.Logf(format, args...) }
    93  func (log logger) Warnf(format string, args ...interface{})  { log.t.Logf(format, args...) }
    94  func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
    95  
    96  func TestSnapshotCacheWithTtl(t *testing.T) {
    97  	ctx, cancel := context.WithCancel(context.Background())
    98  	defer cancel()
    99  	c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second)
   100  
   101  	if _, err := c.GetSnapshot(key); err == nil {
   102  		t.Errorf("unexpected snapshot found for key %q", key)
   103  	}
   104  
   105  	if err := c.SetSnapshot(key, snapshotWithTtl); err != nil {
   106  		t.Fatal(err)
   107  	}
   108  
   109  	snap, err := c.GetSnapshot(key)
   110  	if err != nil {
   111  		t.Fatal(err)
   112  	}
   113  	if !reflect.DeepEqual(snap, snapshotWithTtl) {
   114  		t.Errorf("expect snapshot: %v, got: %v", snapshotWithTtl, snap)
   115  	}
   116  
   117  	wg := sync.WaitGroup{}
   118  	// All the resources should respond immediately when version is not up to date.
   119  	for _, typ := range testTypes {
   120  		wg.Add(1)
   121  		t.Run(typ, func(t *testing.T) {
   122  			defer wg.Done()
   123  			value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
   124  			select {
   125  			case out := <-value:
   126  				if gotVersion, _ := out.GetVersion(); gotVersion != version {
   127  					t.Errorf("got version %q, want %q", gotVersion, version)
   128  				}
   129  				if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) {
   130  					t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResourcesAndTtl(typ))
   131  				}
   132  			case <-time.After(2 * time.Second):
   133  				t.Errorf("failed to receive snapshot response")
   134  			}
   135  		})
   136  	}
   137  	wg.Wait()
   138  
   139  	// Once everything is up to date, only the TTL'd resource should send out updates.
   140  	wg = sync.WaitGroup{}
   141  	updatesByType := map[string]int{}
   142  	for _, typ := range testTypes {
   143  		wg.Add(1)
   144  		go func(typ string) {
   145  			defer wg.Done()
   146  
   147  			end := time.After(5 * time.Second)
   148  			for {
   149  				value, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version})
   150  
   151  				select {
   152  				case out := <-value:
   153  					if gotVersion, _ := out.GetVersion(); gotVersion != version {
   154  						t.Errorf("got version %q, want %q", gotVersion, version)
   155  					}
   156  					if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) {
   157  						t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ))
   158  					}
   159  
   160  					if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) {
   161  						t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ))
   162  					}
   163  
   164  					updatesByType[typ]++
   165  				case <-end:
   166  					cancel()
   167  					return
   168  				}
   169  			}
   170  		}(typ)
   171  	}
   172  
   173  	wg.Wait()
   174  
   175  	if len(updatesByType) != 1 {
   176  		t.Errorf("expected to only receive updates for TTL'd type, got %v", updatesByType)
   177  	}
   178  	// Avoid an exact match on number of triggers to avoid this being flaky.
   179  	if updatesByType[rsrc.EndpointType] < 2 {
   180  		t.Errorf("expected at least two TTL updates for endpoints, got %d", updatesByType[rsrc.EndpointType])
   181  	}
   182  }
   183  
   184  func TestSnapshotCache(t *testing.T) {
   185  	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
   186  
   187  	if _, err := c.GetSnapshot(key); err == nil {
   188  		t.Errorf("unexpected snapshot found for key %q", key)
   189  	}
   190  
   191  	if err := c.SetSnapshot(key, snapshot); err != nil {
   192  		t.Fatal(err)
   193  	}
   194  
   195  	snap, err := c.GetSnapshot(key)
   196  	if err != nil {
   197  		t.Fatal(err)
   198  	}
   199  	if !reflect.DeepEqual(snap, snapshot) {
   200  		t.Errorf("expect snapshot: %v, got: %v", snapshot, snap)
   201  	}
   202  
   203  	// try to get endpoints with incorrect list of names
   204  	// should not receive response
   205  	value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}})
   206  	select {
   207  	case out := <-value:
   208  		t.Errorf("watch for endpoints and mismatched names => got %v, want none", out)
   209  	case <-time.After(time.Second / 4):
   210  	}
   211  
   212  	for _, typ := range testTypes {
   213  		t.Run(typ, func(t *testing.T) {
   214  			value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
   215  			select {
   216  			case out := <-value:
   217  				if gotVersion, _ := out.GetVersion(); gotVersion != version {
   218  					t.Errorf("got version %q, want %q", gotVersion, version)
   219  				}
   220  				if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
   221  					t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
   222  				}
   223  			case <-time.After(time.Second):
   224  				t.Fatal("failed to receive snapshot response")
   225  			}
   226  		})
   227  	}
   228  }
   229  
   230  func TestSnapshotCacheFetch(t *testing.T) {
   231  	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
   232  	if err := c.SetSnapshot(key, snapshot); err != nil {
   233  		t.Fatal(err)
   234  	}
   235  
   236  	for _, typ := range testTypes {
   237  		t.Run(typ, func(t *testing.T) {
   238  			resp, err := c.Fetch(context.Background(), &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
   239  			if err != nil || resp == nil {
   240  				t.Fatal("unexpected error or null response")
   241  			}
   242  			if gotVersion, _ := resp.GetVersion(); gotVersion != version {
   243  				t.Errorf("got version %q, want %q", gotVersion, version)
   244  			}
   245  		})
   246  	}
   247  
   248  	// no response for missing snapshot
   249  	if resp, err := c.Fetch(context.Background(),
   250  		&discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, Node: &core.Node{Id: "oof"}}); resp != nil || err == nil {
   251  		t.Errorf("missing snapshot: response is not nil %v", resp)
   252  	}
   253  
   254  	// no response for latest version
   255  	if resp, err := c.Fetch(context.Background(),
   256  		&discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, VersionInfo: version}); resp != nil || err == nil {
   257  		t.Errorf("latest version: response is not nil %v", resp)
   258  	}
   259  }
   260  
   261  func TestSnapshotCacheWatch(t *testing.T) {
   262  	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
   263  	watches := make(map[string]chan cache.Response)
   264  	for _, typ := range testTypes {
   265  		watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
   266  	}
   267  	if err := c.SetSnapshot(key, snapshot); err != nil {
   268  		t.Fatal(err)
   269  	}
   270  	for _, typ := range testTypes {
   271  		t.Run(typ, func(t *testing.T) {
   272  			select {
   273  			case out := <-watches[typ]:
   274  				if gotVersion, _ := out.GetVersion(); gotVersion != version {
   275  					t.Errorf("got version %q, want %q", gotVersion, version)
   276  				}
   277  				if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
   278  					t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
   279  				}
   280  			case <-time.After(time.Second):
   281  				t.Fatal("failed to receive snapshot response")
   282  			}
   283  		})
   284  	}
   285  
   286  	// open new watches with the latest version
   287  	for _, typ := range testTypes {
   288  		watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version})
   289  	}
   290  	if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) {
   291  		t.Errorf("watches should be created for the latest version: %d", count)
   292  	}
   293  
   294  	// set partially-versioned snapshot
   295  	snapshot2 := snapshot
   296  	snapshot2.Resources[types.Endpoint] = cache.NewResources(version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
   297  	if err := c.SetSnapshot(key, snapshot2); err != nil {
   298  		t.Fatal(err)
   299  	}
   300  	if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes)-1 {
   301  		t.Errorf("watches should be preserved for all but one: %d", count)
   302  	}
   303  
   304  	// validate response for endpoints
   305  	select {
   306  	case out := <-watches[rsrc.EndpointType]:
   307  		if gotVersion, _ := out.GetVersion(); gotVersion != version2 {
   308  			t.Errorf("got version %q, want %q", gotVersion, version2)
   309  		}
   310  		if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot2.Resources[types.Endpoint].Items) {
   311  			t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items)
   312  		}
   313  	case <-time.After(time.Second):
   314  		t.Fatal("failed to receive snapshot response")
   315  	}
   316  }
   317  
   318  func TestConcurrentSetWatch(t *testing.T) {
   319  	c := cache.NewSnapshotCache(false, group{}, logger{t: t})
   320  	for i := 0; i < 50; i++ {
   321  		func(i int) {
   322  			t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) {
   323  				t.Parallel()
   324  				id := fmt.Sprintf("%d", i%2)
   325  				var cancel func()
   326  				if i < 25 {
   327  					snap := cache.Snapshot{}
   328  					snap.Resources[types.Endpoint] = cache.NewResources(fmt.Sprintf("v%d", i), []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))})
   329  					c.SetSnapshot(id, snap)
   330  				} else {
   331  					if cancel != nil {
   332  						cancel()
   333  					}
   334  					_, cancel = c.CreateWatch(&discovery.DiscoveryRequest{
   335  						Node:    &core.Node{Id: id},
   336  						TypeUrl: rsrc.EndpointType,
   337  					})
   338  				}
   339  			})
   340  		}(i)
   341  	}
   342  }
   343  
   344  func TestSnapshotCacheWatchCancel(t *testing.T) {
   345  	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
   346  	for _, typ := range testTypes {
   347  		_, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
   348  		cancel()
   349  	}
   350  	// should be status info for the node
   351  	if keys := c.GetStatusKeys(); len(keys) == 0 {
   352  		t.Error("got 0, want status info for the node")
   353  	}
   354  
   355  	for _, typ := range testTypes {
   356  		if count := c.GetStatusInfo(key).GetNumWatches(); count > 0 {
   357  			t.Errorf("watches should be released for %s", typ)
   358  		}
   359  	}
   360  
   361  	if empty := c.GetStatusInfo("missing"); empty != nil {
   362  		t.Errorf("should not return a status for unknown key: got %#v", empty)
   363  	}
   364  }
   365  
   366  func TestSnapshotClear(t *testing.T) {
   367  	c := cache.NewSnapshotCache(true, group{}, logger{t: t})
   368  	if err := c.SetSnapshot(key, snapshot); err != nil {
   369  		t.Fatal(err)
   370  	}
   371  	c.ClearSnapshot(key)
   372  	if empty := c.GetStatusInfo(key); empty != nil {
   373  		t.Errorf("cache should be cleared")
   374  	}
   375  	if keys := c.GetStatusKeys(); len(keys) != 0 {
   376  		t.Errorf("keys should be empty")
   377  	}
   378  }
   379  

View as plain text