...

Source file src/github.com/datawire/ambassador/v2/pkg/kates/client_test.go

Documentation: github.com/datawire/ambassador/v2/pkg/kates

     1  package kates
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"strconv"
     8  	"strings"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/stretchr/testify/assert"
    14  	"github.com/stretchr/testify/require"
    15  	"k8s.io/apimachinery/pkg/types"
    16  
    17  	"github.com/datawire/dlib/dlog"
    18  	dtest_k3s "github.com/datawire/dtest"
    19  )
    20  
    21  func testClient(t *testing.T, ctx context.Context) (context.Context, *Client) {
    22  	if ctx == nil {
    23  		ctx = dlog.NewTestContext(t, false)
    24  	}
    25  	cli, err := NewClient(ClientConfig{Kubeconfig: dtest_k3s.KubeVersionConfig(ctx, dtest_k3s.Kube22)})
    26  	require.NoError(t, err)
    27  	return ctx, cli
    28  }
    29  
    30  func TestCRUD(t *testing.T) {
    31  	ctx, cli := testClient(t, nil)
    32  
    33  	cm := &ConfigMap{
    34  		TypeMeta: TypeMeta{
    35  			Kind: "ConfigMap",
    36  		},
    37  		ObjectMeta: ObjectMeta{
    38  			Name: "test-crud-configmap",
    39  		},
    40  	}
    41  
    42  	assert.Equal(t, cm.GetResourceVersion(), "")
    43  
    44  	err := cli.Get(ctx, cm, nil)
    45  	assert.Error(t, err, "expecting not found error")
    46  	if !IsNotFound(err) {
    47  		t.Error(err)
    48  		return
    49  	}
    50  
    51  	created := &ConfigMap{}
    52  	err = cli.Create(ctx, cm, created)
    53  	assert.NoError(t, err)
    54  	assert.NotEqual(t, created.GetResourceVersion(), "")
    55  
    56  	created.Labels = map[string]string{"foo": "bar"}
    57  	updated := &ConfigMap{}
    58  	err = cli.Update(ctx, created, updated)
    59  	assert.NoError(t, err)
    60  
    61  	gotten := &ConfigMap{}
    62  	err = cli.Get(ctx, cm, gotten)
    63  	assert.NoError(t, err)
    64  	assert.Equal(t, gotten.GetName(), cm.GetName())
    65  	assert.Equal(t, gotten.Labels["foo"], "bar")
    66  
    67  	err = cli.Delete(ctx, cm, nil)
    68  	assert.NoError(t, err)
    69  
    70  	err = cli.Get(ctx, cm, nil)
    71  	assert.Error(t, err, "expecting not found error")
    72  	assert.True(t, IsNotFound(err), "expecting not found error")
    73  }
    74  
    75  func TestUpsert(t *testing.T) {
    76  	ctx, cli := testClient(t, nil)
    77  
    78  	cm := &ConfigMap{
    79  		TypeMeta: TypeMeta{
    80  			Kind: "ConfigMap",
    81  		},
    82  		ObjectMeta: ObjectMeta{
    83  			Name: "test-upsert-configmap",
    84  			Labels: map[string]string{
    85  				"foo": "bar",
    86  			},
    87  		},
    88  	}
    89  
    90  	defer func() {
    91  		assert.NoError(t, cli.Delete(ctx, cm, nil))
    92  	}()
    93  
    94  	err := cli.Upsert(ctx, cm, cm, cm)
    95  	assert.NoError(t, err)
    96  	assert.NotEqual(t, "", cm.GetResourceVersion())
    97  
    98  	src := &ConfigMap{
    99  		TypeMeta: TypeMeta{
   100  			Kind: "ConfigMap",
   101  		},
   102  		ObjectMeta: ObjectMeta{
   103  			Name: "test-upsert-configmap",
   104  			Labels: map[string]string{
   105  				"foo": "baz",
   106  			},
   107  		},
   108  	}
   109  
   110  	err = cli.Upsert(ctx, cm, src, cm)
   111  	assert.NoError(t, err)
   112  	assert.Equal(t, "baz", cm.Labels["foo"])
   113  }
   114  
   115  func TestPatch(t *testing.T) {
   116  	ctx, cli := testClient(t, nil)
   117  
   118  	cm := &ConfigMap{
   119  		TypeMeta: TypeMeta{
   120  			Kind: "ConfigMap",
   121  		},
   122  		ObjectMeta: ObjectMeta{
   123  			Name: "test-patch-configmap",
   124  			Labels: map[string]string{
   125  				"foo": "bar",
   126  			},
   127  		},
   128  	}
   129  
   130  	err := cli.Create(ctx, cm, cm)
   131  	assert.NoError(t, err)
   132  
   133  	defer func() {
   134  		assert.NoError(t, cli.Delete(ctx, cm, nil))
   135  	}()
   136  
   137  	err = cli.Patch(ctx, cm, StrategicMergePatchType, []byte(`{"metadata": {"annotations": {"moo": "arf"}}}`), cm)
   138  	assert.NoError(t, err)
   139  	assert.Equal(t, "arf", cm.GetAnnotations()["moo"])
   140  }
   141  
   142  func TestList(t *testing.T) {
   143  	ctx, cli := testClient(t, nil)
   144  
   145  	namespaces := make([]*Namespace, 0)
   146  
   147  	err := cli.List(ctx, Query{Kind: "namespaces"}, &namespaces)
   148  	assert.NoError(t, err)
   149  
   150  	// we know there should be at least the default namespace and
   151  	// the kube-system namespace
   152  	assert.True(t, len(namespaces) > 0)
   153  
   154  	found := false
   155  	for _, ns := range namespaces {
   156  		if ns.GetName() == "default" {
   157  			found = true
   158  			break
   159  		}
   160  	}
   161  
   162  	assert.True(t, found)
   163  }
   164  
   165  func TestListSelector(t *testing.T) {
   166  	ctx, cli := testClient(t, nil)
   167  
   168  	myns := &Namespace{
   169  		TypeMeta: TypeMeta{
   170  			Kind: "namespace",
   171  		},
   172  		ObjectMeta: ObjectMeta{
   173  			Name: "test-list-selector-namespace",
   174  			Labels: map[string]string{
   175  				"foo": "bar",
   176  			},
   177  		},
   178  	}
   179  
   180  	err := cli.Create(ctx, myns, myns)
   181  	assert.NoError(t, err)
   182  
   183  	namespaces := make([]*Namespace, 0)
   184  
   185  	err = cli.List(ctx, Query{Kind: "namespaces", LabelSelector: "foo=bar"}, &namespaces)
   186  	assert.NoError(t, err)
   187  
   188  	assert.Equal(t, len(namespaces), 1)
   189  
   190  	if len(namespaces) == 1 {
   191  		assert.Equal(t, namespaces[0].GetName(), myns.GetName())
   192  	}
   193  
   194  	err = cli.Delete(ctx, myns, myns)
   195  	assert.NoError(t, err)
   196  }
   197  
   198  func TestShortcut(t *testing.T) {
   199  	ctx, cli := testClient(t, nil)
   200  
   201  	cm := &ConfigMap{
   202  		TypeMeta: TypeMeta{
   203  			Kind: "cm",
   204  		},
   205  		ObjectMeta: ObjectMeta{
   206  			Name: "test-shortcut-configmap",
   207  		},
   208  	}
   209  
   210  	created := &ConfigMap{}
   211  	err := cli.Create(ctx, cm, created)
   212  	assert.NoError(t, err)
   213  
   214  	err = cli.Delete(ctx, created, nil)
   215  	assert.NoError(t, err)
   216  }
   217  
   218  type TestSnapshot struct {
   219  	ConfigMaps []*ConfigMap
   220  	Secrets    []*Secret
   221  }
   222  
   223  // Currently this whole test is probabilistic and somewhat end-to-endy (it requires a kubernetes
   224  // cluster, but makes very few assumptions about it). With a bit of a refactor to the kates
   225  // implementation to allow for more mocks, this could be made into a pure unit test and not be
   226  // probabilistic at all.
   227  func TestCoherence(t *testing.T) {
   228  	ctx, cli := testClient(t, nil)
   229  	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
   230  	defer cancel()
   231  
   232  	// This simulates an api server that is very slow at notifying its watch clients of updates to
   233  	// config maps, but notifies of other resources at normal speeds. This can really happen.
   234  	cli.watchUpdated = func(_, obj *Unstructured) {
   235  		if obj.GetKind() == "ConfigMap" {
   236  			time.Sleep(5 * time.Second)
   237  		}
   238  	}
   239  
   240  	// Our snapshot will include both config maps and secrets. We will watch them from one thread
   241  	// while simultaneously updating them both as fast as we can from another thread. While doing
   242  	// this we will make assertions that the watching thread always sees the state as last updated
   243  	// by the updating thread.
   244  	cm := &ConfigMap{
   245  		TypeMeta: TypeMeta{
   246  			Kind: "ConfigMap",
   247  		},
   248  		ObjectMeta: ObjectMeta{
   249  			Name:   "test-coherence",
   250  			Labels: map[string]string{},
   251  		},
   252  	}
   253  
   254  	// By updating a secret as well as a configmap, we force the accumulator to frequently report
   255  	// that changes have occurred (since watches for secrets are not artificially slowed down),
   256  	// thereby give the watch thread the opportunity see stale configmaps.
   257  	secret := &Secret{
   258  		TypeMeta: TypeMeta{
   259  			Kind: "Secret",
   260  		},
   261  		ObjectMeta: ObjectMeta{
   262  			Name:   "test-coherence",
   263  			Labels: map[string]string{},
   264  		},
   265  	}
   266  
   267  	defer func() {
   268  		ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
   269  		defer cancel()
   270  		err := cli.Delete(ctx, cm, nil)
   271  		if err != nil {
   272  			t.Log(err)
   273  		}
   274  		err = cli.Delete(ctx, secret, nil)
   275  		if err != nil {
   276  			t.Log(err)
   277  		}
   278  	}()
   279  
   280  	err := cli.Get(ctx, cm, nil)
   281  	assert.Error(t, err, "expecting not found error")
   282  	if !IsNotFound(err) {
   283  		t.Error(err)
   284  		return
   285  	}
   286  
   287  	err = cli.Get(ctx, secret, nil)
   288  	assert.Error(t, err, "expecting not found error")
   289  	if !IsNotFound(err) {
   290  		t.Error(err)
   291  		return
   292  	}
   293  
   294  	acc, err := cli.Watch(ctx,
   295  		Query{Name: "ConfigMaps", Kind: "ConfigMap"},
   296  		Query{Name: "Secrets", Kind: "Secret"})
   297  	require.NoError(t, err)
   298  	snap := &TestSnapshot{}
   299  
   300  	COUNT := 25
   301  
   302  	// The mutex protects access to the shared counters lastSentByUpsert and lastSeenByWatch, as
   303  	// well as allowing us to synchronize cli.Upsert() and acc.Update() invocations. The kates API
   304  	// does not require those invocations to be synchronized, however the design of this test does
   305  	// require that.
   306  	mutex := &sync.Mutex{}
   307  	lastSentByUpsert := 0
   308  	lastSeenByWatch := 0
   309  
   310  	done := make(chan struct{})
   311  	go func() {
   312  		defer cancel()
   313  		defer close(done)
   314  
   315  		for {
   316  			var deltas []*Delta
   317  			select {
   318  			case <-acc.Changed():
   319  				mutex.Lock()
   320  				updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
   321  				assert.NoError(t, err)
   322  				if !updated {
   323  					mutex.Unlock()
   324  					continue
   325  				}
   326  			case <-ctx.Done():
   327  				return
   328  			}
   329  
   330  			for _, delta := range deltas {
   331  				bytes, err := json.Marshal(delta)
   332  				assert.NoError(t, err)
   333  				t.Log(string(bytes))
   334  			}
   335  
   336  			func() {
   337  				defer mutex.Unlock()
   338  
   339  				var cmFromWatch *ConfigMap
   340  				for _, c := range snap.ConfigMaps {
   341  					if c.GetName() == "test-coherence" {
   342  						cmFromWatch = c
   343  						break
   344  					}
   345  				}
   346  
   347  				if lastSentByUpsert > 0 {
   348  					assert.NotNil(t, cmFromWatch)
   349  					if cmFromWatch != nil {
   350  						lbl := cmFromWatch.GetLabels()["counter"]
   351  						parts := strings.Split(lbl, "-")
   352  						require.Equal(t, 2, len(parts))
   353  						i, err := strconv.Atoi(parts[1])
   354  						require.NoError(t, err)
   355  						lastSeenByWatch = i
   356  						// This assertion is the core of this test. Despite the design of the test
   357  						// artificially delaying the updates for all configmaps while
   358  						// simultaneiously updating secrets to provide a very high probability the
   359  						// configmaps returned by the watch are stale, we will still always have an
   360  						// up-to-date view of the configmap that we have modified.
   361  						assert.Equal(t, lastSentByUpsert, lastSeenByWatch)
   362  					}
   363  				}
   364  
   365  				if lastSeenByWatch == COUNT {
   366  					cancel()
   367  				}
   368  			}()
   369  		}
   370  	}()
   371  
   372  	// Increment the counter label of the secret and configmap as quickly as we can.
   373  	for counter := 0; counter <= COUNT; counter += 1 {
   374  		mutex.Lock()
   375  		func() {
   376  			defer mutex.Unlock()
   377  			lbl := fmt.Sprintf("upsert-%d", counter)
   378  			t.Log(lbl)
   379  
   380  			labels := cm.GetLabels()
   381  			labels["counter"] = lbl
   382  			cm.SetLabels(labels)
   383  
   384  			err := cli.Upsert(ctx, cm, cm, nil)
   385  			require.NoError(t, err)
   386  
   387  			labels = secret.GetLabels()
   388  			labels["counter"] = lbl
   389  			secret.SetLabels(labels)
   390  			err = cli.Upsert(ctx, secret, secret, nil)
   391  			require.NoError(t, err)
   392  
   393  			lastSentByUpsert = counter
   394  		}()
   395  	}
   396  
   397  	<-done
   398  }
   399  
   400  func TestDeltas(t *testing.T) {
   401  	doDeltaTest(t, 0, func(_, _ *Unstructured) {})
   402  }
   403  
   404  func TestDeltasWithLocalDelay(t *testing.T) {
   405  	doDeltaTest(t, 3*time.Second, func(_, _ *Unstructured) {})
   406  }
   407  
   408  func TestDeltasWithRemoteDelay(t *testing.T) {
   409  	doDeltaTest(t, 0, func(old, new *Unstructured) {
   410  		// This will slow down updates to just the resources we are paying attention to in this test.
   411  		obj := new
   412  		if obj == nil {
   413  			obj = old
   414  		}
   415  
   416  		if strings.HasPrefix(obj.GetName(), "test-deltas") {
   417  			time.Sleep(3 * time.Second)
   418  		}
   419  	})
   420  }
   421  
   422  func doDeltaTest(t *testing.T, localDelay time.Duration, watchHook func(*Unstructured, *Unstructured)) {
   423  	_ctx, cli := testClient(t, nil)
   424  	var (
   425  		_cm1 = &ConfigMap{
   426  			TypeMeta: TypeMeta{
   427  				Kind: "ConfigMap",
   428  			},
   429  			ObjectMeta: ObjectMeta{
   430  				Name:   "test-deltas-1",
   431  				Labels: map[string]string{},
   432  			},
   433  		}
   434  		_cm2 = &ConfigMap{
   435  			TypeMeta: TypeMeta{
   436  				Kind: "ConfigMap",
   437  			},
   438  			ObjectMeta: ObjectMeta{
   439  				Name:   "test-deltas-2",
   440  				Labels: map[string]string{},
   441  			},
   442  		}
   443  	)
   444  	t.Cleanup(func() {
   445  		if err := cli.Delete(_ctx, _cm1, nil); err != nil && !IsNotFound(err) {
   446  			t.Error(err)
   447  		}
   448  		if err := cli.Delete(_ctx, _cm2, nil); err != nil && !IsNotFound(err) {
   449  			t.Error(err)
   450  		}
   451  	})
   452  
   453  	ctx, cancel := context.WithTimeout(_ctx, 30*time.Second)
   454  	defer cancel()
   455  
   456  	cli.watchAdded = watchHook
   457  	cli.watchUpdated = watchHook
   458  	cli.watchDeleted = watchHook
   459  
   460  	cm1, cm2 := _cm1, _cm2
   461  
   462  	defer func() {
   463  		ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
   464  		defer cancel()
   465  		if cm1 != nil {
   466  			err := cli.Delete(ctx, cm1, nil)
   467  			if err != nil {
   468  				t.Log(err)
   469  			}
   470  		}
   471  		err := cli.Delete(ctx, cm2, nil)
   472  		if err != nil {
   473  			t.Log(err)
   474  		}
   475  	}()
   476  
   477  	err := cli.Get(ctx, cm1, nil)
   478  	assert.Error(t, err, "expecting not found error")
   479  	if !IsNotFound(err) {
   480  		t.Error(err)
   481  		return
   482  	}
   483  
   484  	err = cli.Get(ctx, cm2, nil)
   485  	assert.Error(t, err, "expecting not found error")
   486  	if !IsNotFound(err) {
   487  		t.Error(err)
   488  		return
   489  	}
   490  
   491  	acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap"})
   492  	require.NoError(t, err)
   493  	snap := &TestSnapshot{}
   494  
   495  	err = cli.Upsert(ctx, cm1, cm1, nil)
   496  	require.NoError(t, err)
   497  	err = cli.Upsert(ctx, cm2, cm2, nil)
   498  	require.NoError(t, err)
   499  
   500  	time.Sleep(localDelay)
   501  
   502  	for {
   503  		<-acc.Changed()
   504  		var deltas []*Delta
   505  		updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
   506  		require.NoError(t, err)
   507  		if !updated {
   508  			continue
   509  		}
   510  
   511  		checkForDelta(t, ObjectAdd, "test-deltas-1", deltas)
   512  		checkForDelta(t, ObjectAdd, "test-deltas-2", deltas)
   513  		break
   514  	}
   515  
   516  	cm1.SetLabels(map[string]string{"foo": "bar"})
   517  	err = cli.Upsert(ctx, cm1, cm1, nil)
   518  	require.NoError(t, err)
   519  
   520  	for {
   521  		<-acc.Changed()
   522  		var deltas []*Delta
   523  		updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
   524  		require.NoError(t, err)
   525  		if !updated {
   526  			continue
   527  		}
   528  
   529  		checkForDelta(t, ObjectUpdate, "test-deltas-1", deltas)
   530  		checkNoDelta(t, "test-deltas-2", deltas)
   531  		break
   532  	}
   533  
   534  	err = cli.Delete(ctx, cm1, nil)
   535  	require.NoError(t, err)
   536  	cm1 = nil
   537  
   538  	time.Sleep(localDelay)
   539  
   540  	for {
   541  		<-acc.Changed()
   542  		var deltas []*Delta
   543  		updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
   544  		require.NoError(t, err)
   545  		if !updated {
   546  			continue
   547  		}
   548  
   549  		checkForDelta(t, ObjectDelete, "test-deltas-1", deltas)
   550  		checkNoDelta(t, "test-deltas-2", deltas)
   551  		break
   552  	}
   553  
   554  	cancel()
   555  }
   556  
   557  func checkForDelta(t *testing.T, dt DeltaType, name string, deltas []*Delta) {
   558  	for _, delta := range deltas {
   559  		if delta.DeltaType == dt && delta.GetName() == name {
   560  			return
   561  		}
   562  	}
   563  
   564  	assert.Fail(t, fmt.Sprintf("could not find delta %d %s", dt, name))
   565  }
   566  
   567  func checkNoDelta(t *testing.T, name string, deltas []*Delta) {
   568  	for _, delta := range deltas {
   569  		if delta.GetName() == name {
   570  			assert.Fail(t, fmt.Sprintf("found delta %s: %d", name, delta.DeltaType))
   571  			return
   572  		}
   573  	}
   574  }
   575  
   576  // This is a unit test for the patchWatch method of client. When you are watching resources and also
   577  // modifying the same set that you are watching (as is the case with a read/write controller), the
   578  // client has two sources of information for any given resource: (1) the version of the resource
   579  // reported by the watch, and (2) the version of the resource returned whenever a
   580  // Create/Update/Delete is performed. The patchWatch method updates the results of a watch to ensure
   581  // we always report back the newest version for any given resource.
   582  func TestPatchWatch(t *testing.T) {
   583  	require := require.New(t)
   584  	assert := assert.New(t)
   585  
   586  	ctx := context.Background()
   587  
   588  	cli, err := NewClient(ClientConfig{})
   589  	require.NoError(err)
   590  
   591  	// Make a field the same way newAccumulator does.
   592  	field, err := cli.newField(Query{Name: "Pods", Kind: "pods"})
   593  	require.NoError(err)
   594  
   595  	// Convenience function for making multiple versions of a given pod.
   596  	makePod := func(namespace, name string, version int) *Unstructured {
   597  		un := &Unstructured{}
   598  		un.SetGroupVersionKind(field.mapping.GroupVersionKind)
   599  		un.SetNamespace(namespace)
   600  		un.SetName(name)
   601  		un.SetUID(types.UID(fmt.Sprintf("UID:%s.%s", namespace, name)))
   602  		un.SetResourceVersion(fmt.Sprintf("%d", version))
   603  		return un
   604  	}
   605  
   606  	// The field.values map holds the version of a resource reported by watch.
   607  	//
   608  	// The cli.canonical map stores any resource that we Get/List/Create/Update/Delete.
   609  	//
   610  	// We can exercise all logic in patchWatch by populating these two maps in various permutations
   611  	// as is done below:
   612  
   613  	// Make a pod to take through the CRUD cycle.
   614  	p1 := makePod("default", "foo", 1)
   615  	p1Key := unKey(p1)
   616  
   617  	p1Newer := makePod("default", "foo", 2)
   618  	require.Equal(p1Key, unKey(p1Newer))
   619  
   620  	// Create: something in cli.canonical, nothing in field.values
   621  	cli.canonical[p1Key] = p1
   622  	delete(field.values, p1Key)
   623  	err = cli.patchWatch(ctx, field)
   624  	require.NoError(err)
   625  	assert.Equal(p1, field.values[p1Key])
   626  
   627  	// Local Update: something newer in cli.canonical, older version in field.values
   628  	cli.canonical[p1Key] = p1Newer
   629  	field.values[p1Key] = p1
   630  	err = cli.patchWatch(ctx, field)
   631  	require.NoError(err)
   632  	assert.Equal(p1Newer, field.values[p1Key])
   633  
   634  	// Remote Update: something older in cli.canonical, something newer in field.values
   635  	cli.canonical[p1Key] = p1
   636  	field.values[p1Key] = p1Newer
   637  	err = cli.patchWatch(ctx, field)
   638  	require.NoError(err)
   639  	assert.Equal(p1Newer, field.values[p1Key])
   640  	assert.NotContains(cli.canonical, p1Key)
   641  
   642  	// Delete: nil value in cli.canonical, something in field.values
   643  	cli.canonical[p1Key] = nil
   644  	field.values[p1Key] = p1Newer
   645  	err = cli.patchWatch(ctx, field)
   646  	require.NoError(err)
   647  	assert.NotContains(field.values, p1Key)
   648  }
   649  

View as plain text