...

Source file src/github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3/linear_test.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3

     1  // Code generated by create_version. DO NOT EDIT.
     2  // Copyright 2020 Envoyproxy Authors
     3  //
     4  //   Licensed under the Apache License, Version 2.0 (the "License");
     5  //   you may not use this file except in compliance with the License.
     6  //   You may obtain a copy of the License at
     7  //
     8  //       http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  //   Unless required by applicable law or agreed to in writing, software
    11  //   distributed under the License is distributed on an "AS IS" BASIS,
    12  //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  //   See the License for the specific language governing permissions and
    14  //   limitations under the License.
    15  
    16  package cache
    17  
    18  import (
    19  	"fmt"
    20  	"testing"
    21  
    22  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    23  	wrappers "github.com/golang/protobuf/ptypes/wrappers"
    24  )
    25  
    26  const (
    27  	testType = "google.protobuf.StringValue"
    28  )
    29  
    30  func testResource(s string) types.Resource {
    31  	return &wrappers.StringValue{Value: s}
    32  }
    33  
    34  func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
    35  	t.Helper()
    36  	r := <-ch
    37  	if r.GetRequest().TypeUrl != testType {
    38  		t.Errorf("unexpected empty request type URL: %q", r.GetRequest().TypeUrl)
    39  	}
    40  	out, err := r.GetDiscoveryResponse()
    41  	if err != nil {
    42  		t.Fatal(err)
    43  	}
    44  	if out.VersionInfo == "" {
    45  		t.Error("unexpected response empty version")
    46  	}
    47  	if n := len(out.Resources); n != num {
    48  		t.Errorf("unexpected number of responses: got %d, want %d", n, num)
    49  	}
    50  	if version != "" && out.VersionInfo != version {
    51  		t.Errorf("unexpected version: got %q, want %q", out.VersionInfo, version)
    52  	}
    53  	if out.TypeUrl != testType {
    54  		t.Errorf("unexpected type URL: %q", out.TypeUrl)
    55  	}
    56  }
    57  
    58  func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) {
    59  	t.Helper()
    60  	if i := c.NumWatches(name); i != count {
    61  		t.Errorf("unexpected number of watches for %q: got %d, want %d", name, i, count)
    62  	}
    63  }
    64  
    65  func mustBlock(t *testing.T, w <-chan Response) {
    66  	select {
    67  	case <-w:
    68  		t.Error("watch must block")
    69  	default:
    70  	}
    71  }
    72  
    73  func TestLinearInitialResources(t *testing.T) {
    74  	c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
    75  	w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType})
    76  	verifyResponse(t, w, "0", 1)
    77  	w, _ = c.CreateWatch(&Request{TypeUrl: testType})
    78  	verifyResponse(t, w, "0", 2)
    79  }
    80  
    81  func TestLinearCornerCases(t *testing.T) {
    82  	c := NewLinearCache(testType)
    83  	err := c.UpdateResource("a", nil)
    84  	if err == nil {
    85  		t.Error("expected error on nil resource")
    86  	}
    87  	// create an incorrect type URL request
    88  	w, _ := c.CreateWatch(&Request{TypeUrl: "test"})
    89  	select {
    90  	case _, more := <-w:
    91  		if more {
    92  			t.Error("should be closed by the producer")
    93  		}
    94  	default:
    95  		t.Error("channel should be closed")
    96  	}
    97  }
    98  
    99  func TestLinearBasic(t *testing.T) {
   100  	c := NewLinearCache(testType)
   101  
   102  	// Create watches before a resource is ready
   103  	w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
   104  	mustBlock(t, w1)
   105  	w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
   106  	mustBlock(t, w)
   107  	checkWatchCount(t, c, "a", 2)
   108  	checkWatchCount(t, c, "b", 1)
   109  	c.UpdateResource("a", testResource("a"))
   110  	checkWatchCount(t, c, "a", 0)
   111  	checkWatchCount(t, c, "b", 0)
   112  	verifyResponse(t, w1, "1", 1)
   113  	verifyResponse(t, w, "1", 1)
   114  
   115  	// Request again, should get same response
   116  	w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
   117  	checkWatchCount(t, c, "a", 0)
   118  	verifyResponse(t, w, "1", 1)
   119  	w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
   120  	checkWatchCount(t, c, "a", 0)
   121  	verifyResponse(t, w, "1", 1)
   122  
   123  	// Add another element and update the first, response should be different
   124  	c.UpdateResource("b", testResource("b"))
   125  	c.UpdateResource("a", testResource("aa"))
   126  	w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
   127  	verifyResponse(t, w, "3", 1)
   128  	w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
   129  	verifyResponse(t, w, "3", 2)
   130  }
   131  
   132  func TestLinearVersionPrefix(t *testing.T) {
   133  	c := NewLinearCache(testType, WithVersionPrefix("instance1-"))
   134  
   135  	w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
   136  	verifyResponse(t, w, "instance1-0", 0)
   137  
   138  	c.UpdateResource("a", testResource("a"))
   139  	w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
   140  	verifyResponse(t, w, "instance1-1", 1)
   141  
   142  	w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"})
   143  	mustBlock(t, w)
   144  	checkWatchCount(t, c, "a", 1)
   145  }
   146  
   147  func TestLinearDeletion(t *testing.T) {
   148  	c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
   149  	w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
   150  	mustBlock(t, w)
   151  	checkWatchCount(t, c, "a", 1)
   152  	c.DeleteResource("a")
   153  	verifyResponse(t, w, "1", 0)
   154  	checkWatchCount(t, c, "a", 0)
   155  	w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
   156  	verifyResponse(t, w, "1", 1)
   157  	checkWatchCount(t, c, "b", 0)
   158  	c.DeleteResource("b")
   159  	w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
   160  	verifyResponse(t, w, "2", 0)
   161  	checkWatchCount(t, c, "b", 0)
   162  }
   163  
   164  func TestLinearWatchTwo(t *testing.T) {
   165  	c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
   166  	w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
   167  	mustBlock(t, w)
   168  	w1, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
   169  	mustBlock(t, w1)
   170  	c.UpdateResource("a", testResource("aa"))
   171  	// should only get the modified resource
   172  	verifyResponse(t, w, "1", 1)
   173  	verifyResponse(t, w1, "1", 2)
   174  }
   175  
   176  func TestLinearCancel(t *testing.T) {
   177  	c := NewLinearCache(testType)
   178  	c.UpdateResource("a", testResource("a"))
   179  
   180  	// cancel watch-all
   181  	w, cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
   182  	mustBlock(t, w)
   183  	checkWatchCount(t, c, "a", 1)
   184  	cancel()
   185  	checkWatchCount(t, c, "a", 0)
   186  
   187  	// cancel watch for "a"
   188  	w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
   189  	mustBlock(t, w)
   190  	checkWatchCount(t, c, "a", 1)
   191  	cancel()
   192  	checkWatchCount(t, c, "a", 0)
   193  
   194  	// open four watches for "a" and "b" and two for all, cancel one of each, make sure the second one is unaffected
   195  	w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
   196  	w2, cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
   197  	w3, cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
   198  	w4, cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
   199  	mustBlock(t, w)
   200  	mustBlock(t, w2)
   201  	mustBlock(t, w3)
   202  	mustBlock(t, w4)
   203  	checkWatchCount(t, c, "a", 3)
   204  	checkWatchCount(t, c, "b", 3)
   205  	cancel()
   206  	checkWatchCount(t, c, "a", 2)
   207  	checkWatchCount(t, c, "b", 3)
   208  	cancel3()
   209  	checkWatchCount(t, c, "a", 1)
   210  	checkWatchCount(t, c, "b", 2)
   211  	cancel2()
   212  	cancel4()
   213  	checkWatchCount(t, c, "a", 0)
   214  	checkWatchCount(t, c, "b", 0)
   215  }
   216  
   217  func TestLinearConcurrentSetWatch(t *testing.T) {
   218  	c := NewLinearCache(testType)
   219  	n := 50
   220  	for i := 0; i < 2*n; i++ {
   221  		func(i int) {
   222  			t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) {
   223  				t.Parallel()
   224  				id := fmt.Sprintf("%d", i)
   225  				if i%2 == 0 {
   226  					t.Logf("update resource %q", id)
   227  					c.UpdateResource(id, testResource(id))
   228  				} else {
   229  					id2 := fmt.Sprintf("%d", i-1)
   230  					t.Logf("request resources %q and %q", id, id2)
   231  					value, _ := c.CreateWatch(&Request{
   232  						// Only expect one to become stale
   233  						ResourceNames: []string{id, id2},
   234  						VersionInfo:   "0",
   235  						TypeUrl:       testType,
   236  					})
   237  					// wait until all updates apply
   238  					verifyResponse(t, value, "", 1)
   239  				}
   240  			})
   241  		}(i)
   242  	}
   243  }
   244  

View as plain text