...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/kates/accumulator_test.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/kates

     1  package kates
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/stretchr/testify/assert"
    11  	"github.com/stretchr/testify/require"
    12  )
    13  
    14  type Snap struct {
    15  	ConfigMaps []*ConfigMap
    16  }
    17  
    18  // Make sure that we don't prematurely signal the first changed event. The first notification should
    19  // wait until we have done a complete List()ing of all existing resources.
    20  func TestBootstrapNoNotifyBeforeSync(t *testing.T) {
    21  	// Create a set of 10 configmaps to give us some resources to watch.
    22  	ctx, cli := testClient(t, nil)
    23  	var cms [10]*ConfigMap
    24  	for i := 0; i < 10; i++ {
    25  		cm := &ConfigMap{
    26  			TypeMeta: TypeMeta{
    27  				Kind: "ConfigMap",
    28  			},
    29  			ObjectMeta: ObjectMeta{
    30  				Name: fmt.Sprintf("test-bootstrap-%d", i),
    31  				Labels: map[string]string{
    32  					"test": "test-bootstrap",
    33  				},
    34  			},
    35  		}
    36  		err := cli.Upsert(ctx, cm, cm, &cm)
    37  		require.NoError(t, err)
    38  		cms[i] = cm
    39  	}
    40  
    41  	// Use a separate client for watching so we can bypass any caching.
    42  	_, cli2 := testClient(t, nil)
    43  	// Configure this to slow down dispatch add events. This will dramatically increase the chance
    44  	// of the edge case we are trying to test.
    45  	cli2.watchAdded = func(old *Unstructured, new *Unstructured) {
    46  		time.Sleep(1 * time.Second)
    47  	}
    48  	acc, err := cli2.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-bootstrap"})
    49  	require.NoError(t, err)
    50  
    51  	snap := &Snap{}
    52  	for {
    53  		<-acc.Changed()
    54  		updated, err := acc.Update(ctx, snap)
    55  		require.NoError(t, err)
    56  		if updated {
    57  			break
    58  		}
    59  	}
    60  
    61  	// When we are here the first notification will have happened, and since there were 10
    62  	// ConfigMaps prior to starting te Watch, all 10 of those ConfigMaps should be present in the
    63  	// first update.
    64  	assert.Equal(t, 10, len(snap.ConfigMaps))
    65  
    66  	t.Cleanup(func() {
    67  		for _, cm := range cms {
    68  			if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
    69  				t.Error(err)
    70  			}
    71  		}
    72  	})
    73  }
    74  
    75  // Make sure we still notify on bootstrap if there are no resources that satisfy a Watch.
    76  func TestBootstrapNotifyEvenOnEmptyWatch(t *testing.T) {
    77  	ctx, cli := testClient(t, nil)
    78  
    79  	// Create a watch with a nonexistent label filter to gaurantee no resources will satisfy the watch.
    80  	acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "nonexistent-label"})
    81  	require.NoError(t, err)
    82  
    83  	snap := &Snap{}
    84  	for {
    85  		<-acc.Changed()
    86  		updated, err := acc.Update(ctx, snap)
    87  		require.NoError(t, err)
    88  		if updated {
    89  			break
    90  		}
    91  	}
    92  
    93  	// When we are here the first notification will have happened, and since there were no resources
    94  	// that satisfy the selector, the ConfigMaps field should be empty.
    95  	assert.Equal(t, 0, len(snap.ConfigMaps))
    96  }
    97  
    98  // Make sure we coalesce raw changes before sending an update when a batch of resources
    99  // are created/modified in quick succession.
   100  func TestBatchChangesBeforeNotify(t *testing.T) {
   101  	ctx, cli := testClient(t, nil)
   102  	// Set a long enough interval to make sure all changes are batched before sending.
   103  	err := cli.MaxAccumulatorInterval(10 * time.Second)
   104  	require.NoError(t, err)
   105  	acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch"})
   106  	require.NoError(t, err)
   107  
   108  	snap := &Snap{}
   109  
   110  	// Listen for changes from the Accumulator. Here it will listen for only 2 updates
   111  	// The first update should be the one sent during bootstrap. No resources should have changed
   112  	// in this update. The second update should contain resource changes.
   113  	<-acc.Changed()
   114  	updated, err := acc.Update(ctx, snap)
   115  	require.NoError(t, err)
   116  	if !updated {
   117  		t.Error("Expected snapshot to be successfully updated after receiving first change event")
   118  	}
   119  	assert.Equal(t, 0, len(snap.ConfigMaps))
   120  
   121  	// Use a separate client to create resources to avoid any potential uses of the cache
   122  	_, cli2 := testClient(t, nil)
   123  
   124  	// Create a set of 10 Configmaps after the Accumulator is watching to simulate getting
   125  	// a bunch of resources at once mid-watch.
   126  	var cms [10]*ConfigMap
   127  	for i := 0; i < 10; i++ {
   128  		cm := &ConfigMap{
   129  			TypeMeta: TypeMeta{
   130  				Kind: "ConfigMap",
   131  			},
   132  			ObjectMeta: ObjectMeta{
   133  				Name: fmt.Sprintf("test-batch-%d", i),
   134  				Labels: map[string]string{
   135  					"test": "test-batch",
   136  				},
   137  			},
   138  		}
   139  		err := cli2.Upsert(ctx, cm, cm, &cm)
   140  		require.NoError(t, err)
   141  		cms[i] = cm
   142  	}
   143  
   144  	<-acc.Changed()
   145  	updated, err = acc.Update(ctx, snap)
   146  	require.NoError(t, err)
   147  	if !updated {
   148  		t.Error("Expected snapshot to be successfully updated after receiving second change event")
   149  	}
   150  
   151  	// After receiving 2 updates from the Accumulator, we should have 10 ConfigMaps
   152  	// in our Snapshot due to the Accumulator coalescing changes before sending an update.
   153  	assert.Equal(t, 10, len(snap.ConfigMaps))
   154  
   155  	t.Cleanup(func() {
   156  		for _, cm := range cms {
   157  			if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
   158  				t.Error(err)
   159  			}
   160  		}
   161  	})
   162  }
   163  
   164  // Make sure we send an update after the window period expires when we keep
   165  // sending changes less than the batch interval. This is to test against an edge case where a
   166  // a change event is never triggered due to constant changes.
   167  func TestNotifyNotInfinitelyBlocked(t *testing.T) {
   168  	ctx, cli := testClient(t, nil)
   169  	err := cli.MaxAccumulatorInterval(5 * time.Second)
   170  	require.NoError(t, err)
   171  	acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch-max"})
   172  	require.NoError(t, err)
   173  
   174  	snap := &Snap{}
   175  
   176  	<-acc.Changed()
   177  	updated, err := acc.Update(ctx, snap)
   178  	require.NoError(t, err)
   179  	if !updated {
   180  		t.Error("Expected snapshot to be successfully updated after receiving first change event")
   181  	}
   182  	assert.Equal(t, 0, len(snap.ConfigMaps))
   183  
   184  	var cms []*ConfigMap
   185  	ctx2, cli2 := testClient(t, nil)
   186  	ctx2, cancel := context.WithCancel(ctx2)
   187  	var wg sync.WaitGroup
   188  	// Create a new Configmap every 2 seconds < 5 second interval to simulate a constant changes
   189  	go func() {
   190  		wg.Add(1)
   191  		defer wg.Done()
   192  		var i int
   193  		ticker := time.NewTicker(2 * time.Second)
   194  		for {
   195  			select {
   196  			case <-ticker.C:
   197  				cm := &ConfigMap{
   198  					TypeMeta: TypeMeta{
   199  						Kind: "ConfigMap",
   200  					},
   201  					ObjectMeta: ObjectMeta{
   202  						Name: fmt.Sprintf("test-batch-%d", i),
   203  						Labels: map[string]string{
   204  							"test": "test-batch-max",
   205  						},
   206  					},
   207  				}
   208  				err := cli2.Upsert(ctx, cm, cm, &cm)
   209  				require.NoError(t, err)
   210  				cms = append(cms, cm)
   211  				i++
   212  			case <-ctx2.Done():
   213  				return
   214  			}
   215  		}
   216  	}()
   217  
   218  	// Watch for second change. Actually validating this is tricky. Idiosyncratic timing differences
   219  	// can cause the number of Configmaps in the change event to change across test runs resulting in a
   220  	// flakey test. We're just concerned that we got _a_ change when constant updates are being made
   221  	// less than the batch window interval so that we're not infinitely blocked. So we're just going to
   222  	// check that the snapshot is non-empty after we get the change. If we don't
   223  	// get a change after some time then we fail the test.
   224  	select {
   225  	case <-acc.Changed():
   226  		updated, err = acc.Update(ctx, snap)
   227  		require.NoError(t, err)
   228  		if !updated {
   229  			t.Error("Expected snapshot to be successfully updated after receiving second change event")
   230  		}
   231  		assert.Greater(t, len(snap.ConfigMaps), 0)
   232  		cancel()
   233  		wg.Wait()
   234  	case <-time.After(10 * time.Second):
   235  		cancel()
   236  		wg.Wait()
   237  		t.Error("Timeout after 10s listening for second change. It's possible it's infinitely blocked")
   238  	}
   239  
   240  	t.Cleanup(func() {
   241  		for _, cm := range cms {
   242  			if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
   243  				t.Error(err)
   244  			}
   245  		}
   246  	})
   247  }
   248  
   249  // Make sure we get single updates when changes are submitted after the batch interval has expired.
   250  func TestNotifyOnUpdate(t *testing.T) {
   251  	ctx, cli := testClient(t, nil)
   252  	err := cli.MaxAccumulatorInterval(2 * time.Second)
   253  	require.NoError(t, err)
   254  	acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-isolated"})
   255  	require.NoError(t, err)
   256  
   257  	snap := &Snap{}
   258  
   259  	waitForChange := func() {
   260  		<-acc.Changed()
   261  		updated, err := acc.Update(ctx, snap)
   262  		require.NoError(t, err)
   263  		if !updated {
   264  			t.Error("Expected snapshot to be successfully updated after receiving change event")
   265  		}
   266  	}
   267  
   268  	waitForChange()
   269  	assert.Equal(t, 0, len(snap.ConfigMaps))
   270  
   271  	var cms [2]*ConfigMap
   272  
   273  	cm := &ConfigMap{
   274  		TypeMeta: TypeMeta{
   275  			Kind: "ConfigMap",
   276  		},
   277  		ObjectMeta: ObjectMeta{
   278  			Name: "test-isolated-1",
   279  			Labels: map[string]string{
   280  				"test": "test-isolated",
   281  			},
   282  		},
   283  	}
   284  	err = cli.Upsert(ctx, cm, cm, &cm)
   285  	require.NoError(t, err)
   286  	cms[0] = cm
   287  
   288  	waitForChange()
   289  	assert.Equal(t, 1, len(snap.ConfigMaps))
   290  
   291  	// Send the next change after the 2 second batch interval
   292  	time.Sleep(3)
   293  
   294  	cm = &ConfigMap{
   295  		TypeMeta: TypeMeta{
   296  			Kind: "ConfigMap",
   297  		},
   298  		ObjectMeta: ObjectMeta{
   299  			Name: "test-isolated-2",
   300  			Labels: map[string]string{
   301  				"test": "test-isolated",
   302  			},
   303  		},
   304  	}
   305  	err = cli.Upsert(ctx, cm, cm, &cm)
   306  	require.NoError(t, err)
   307  	cms[1] = cm
   308  
   309  	waitForChange()
   310  	assert.Equal(t, 2, len(snap.ConfigMaps))
   311  
   312  	t.Cleanup(func() {
   313  		for _, cm := range cms {
   314  			if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
   315  				t.Error(err)
   316  			}
   317  		}
   318  	})
   319  }
   320  

View as plain text