...

Source file src/github.com/datawire/ambassador/v2/pkg/agent/agent_internal_test.go

Documentation: github.com/datawire/ambassador/v2/pkg/agent

     1  package agent
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"math/rand"
     8  	"net/http"
     9  	"net/http/httptest"
    10  	"net/url"
    11  	"testing"
    12  	"time"
    13  
    14  	"github.com/stretchr/testify/assert"
    15  
    16  	"google.golang.org/protobuf/types/known/durationpb"
    17  
    18  	v1 "k8s.io/api/core/v1"
    19  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    20  
    21  	"github.com/datawire/ambassador/v2/pkg/api/agent"
    22  	"github.com/datawire/ambassador/v2/pkg/kates"
    23  	snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    24  	"github.com/datawire/dlib/dlog"
    25  )
    26  
    27  // Take a json formatted string and transform it to kates.Unstructured
    28  // for easy formatting of Snapshot.Invalid members
    29  func getUnstructured(objStr string) *kates.Unstructured {
    30  	var obj map[string]interface{}
    31  	_ = json.Unmarshal([]byte(objStr), &obj)
    32  	unstructured := &kates.Unstructured{}
    33  	unstructured.SetUnstructuredContent(obj)
    34  	return unstructured
    35  }
    36  
    37  const letterBytes = "abcdefghijklmnopqrstuvwxyz"
    38  
    39  func getRandomAmbassadorID() string {
    40  	b := make([]byte, 10)
    41  	for i := range b {
    42  		b[i] = letterBytes[rand.Intn(len(letterBytes))]
    43  	}
    44  	return string(b)
    45  }
    46  
    47  func TestHandleAPIKeyConfigChange(t *testing.T) {
    48  	t.Parallel()
    49  	objMeta := metav1.ObjectMeta{
    50  		Name:      "coolname",
    51  		Namespace: "coolnamespace",
    52  	}
    53  	testcases := []struct {
    54  		testName       string
    55  		agent          *Agent
    56  		secrets        []kates.Secret
    57  		configMaps     []kates.ConfigMap
    58  		expectedAPIKey string
    59  	}{
    60  		{
    61  			testName: "configmap-wins",
    62  			agent: &Agent{
    63  				agentNamespace:               "coolnamespace",
    64  				agentCloudResourceConfigName: "coolname",
    65  				ambassadorAPIKey:             "",
    66  				ambassadorAPIKeyEnvVarValue:  "",
    67  			},
    68  			secrets: []kates.Secret{},
    69  			configMaps: []kates.ConfigMap{
    70  				{
    71  					ObjectMeta: objMeta,
    72  					Data: map[string]string{
    73  						"CLOUD_CONNECT_TOKEN": "beepboop",
    74  					},
    75  				},
    76  			},
    77  			expectedAPIKey: "beepboop",
    78  		},
    79  		{
    80  			testName: "secret-over-configmap",
    81  			agent: &Agent{
    82  				agentNamespace:               "coolnamespace",
    83  				agentCloudResourceConfigName: "coolname",
    84  				ambassadorAPIKey:             "",
    85  				ambassadorAPIKeyEnvVarValue:  "",
    86  			},
    87  			secrets: []kates.Secret{
    88  				{
    89  					ObjectMeta: objMeta,
    90  					Data: map[string][]byte{
    91  						"CLOUD_CONNECT_TOKEN": []byte("secretvalue"),
    92  					},
    93  				},
    94  			},
    95  			configMaps: []kates.ConfigMap{
    96  				{
    97  					ObjectMeta: objMeta,
    98  					Data: map[string]string{
    99  						"CLOUD_CONNECT_TOKEN": "beepboop",
   100  					},
   101  				},
   102  			},
   103  			expectedAPIKey: "secretvalue",
   104  		},
   105  		{
   106  			testName: "from-secret",
   107  			agent: &Agent{
   108  				agentNamespace:               "coolnamespace",
   109  				agentCloudResourceConfigName: "coolname",
   110  				ambassadorAPIKey:             "",
   111  				ambassadorAPIKeyEnvVarValue:  "",
   112  			},
   113  			secrets: []kates.Secret{
   114  				{
   115  					ObjectMeta: objMeta,
   116  					Data: map[string][]byte{
   117  						"CLOUD_CONNECT_TOKEN": []byte("secretvalue"),
   118  					},
   119  				},
   120  			},
   121  			configMaps:     []kates.ConfigMap{},
   122  			expectedAPIKey: "secretvalue",
   123  		},
   124  		{
   125  			testName: "configmap-empty-string-value",
   126  			agent: &Agent{
   127  				agentNamespace:               "coolnamespace",
   128  				agentCloudResourceConfigName: "coolname",
   129  				ambassadorAPIKey:             "someexistingvalue",
   130  				ambassadorAPIKeyEnvVarValue:  "",
   131  			},
   132  			secrets: []kates.Secret{},
   133  			configMaps: []kates.ConfigMap{
   134  				{
   135  					ObjectMeta: objMeta,
   136  					Data:       map[string]string{},
   137  				},
   138  			},
   139  			expectedAPIKey: "",
   140  		},
   141  		{
   142  			testName: "secret-empty-string-value",
   143  			agent: &Agent{
   144  				agentNamespace:               "coolnamespace",
   145  				agentCloudResourceConfigName: "coolname",
   146  				ambassadorAPIKey:             "someexistingvalue",
   147  				ambassadorAPIKeyEnvVarValue:  "",
   148  			},
   149  			secrets: []kates.Secret{
   150  				{
   151  					ObjectMeta: objMeta,
   152  					Data:       map[string][]byte{},
   153  				},
   154  			},
   155  			configMaps:     []kates.ConfigMap{},
   156  			expectedAPIKey: "",
   157  		},
   158  		{
   159  			testName: "fall-back-envvar",
   160  			agent: &Agent{
   161  				agentNamespace:               "coolnamespace",
   162  				agentCloudResourceConfigName: "coolname",
   163  				ambassadorAPIKey:             "somevaluefromsomewhereelse",
   164  				ambassadorAPIKeyEnvVarValue:  "gotfromenv",
   165  			},
   166  			expectedAPIKey: "gotfromenv",
   167  		},
   168  		{
   169  			testName: "fall-back-envvar-bad-configs",
   170  			agent: &Agent{
   171  				agentNamespace:               "notcoolnamespace",
   172  				agentCloudResourceConfigName: "notcoolname",
   173  				ambassadorAPIKey:             "somevaluefromsomewhereelse",
   174  				ambassadorAPIKeyEnvVarValue:  "gotfromenv",
   175  			},
   176  			secrets: []kates.Secret{
   177  				{
   178  					ObjectMeta: objMeta,
   179  					Data: map[string][]byte{
   180  						"CLOUD_CONNECT_TOKEN": []byte("secretvalue"),
   181  					},
   182  				},
   183  			},
   184  			configMaps: []kates.ConfigMap{
   185  				{
   186  					ObjectMeta: objMeta,
   187  					Data: map[string]string{
   188  						"CLOUD_CONNECT_TOKEN": "secretvalue",
   189  					},
   190  				},
   191  			},
   192  			expectedAPIKey: "gotfromenv",
   193  		},
   194  	}
   195  	for _, tc := range testcases {
   196  		t.Run(tc.testName, func(t *testing.T) {
   197  			ctx := dlog.NewTestContext(t, false)
   198  
   199  			tc.agent.handleAPIKeyConfigChange(ctx, tc.secrets, tc.configMaps)
   200  
   201  			assert.Equal(t, tc.agent.ambassadorAPIKey, tc.expectedAPIKey)
   202  
   203  		})
   204  	}
   205  }
   206  
   207  func TestProcessSnapshot(t *testing.T) {
   208  	t.Parallel()
   209  	snapshotTests := []struct {
   210  		// name of test (passed to t.Run())
   211  		testName string
   212  		// snapshot to call ProcessSnapshot with
   213  		inputSnap *snapshotTypes.Snapshot
   214  		// expected return value of ProcessSnapshot
   215  		ret error
   216  		// expected value of inputSnap.reportToSend after calling ProcessSnapshot
   217  		res *agent.Snapshot
   218  		// expected value of Agent.connInfo after calling ProcessSnapshot
   219  		// in certain circumstances, ProcessSnapshot resets that info
   220  		expectedConnInfo *ConnInfo
   221  		podStore         *podStore
   222  		assertionFunc    func(*testing.T, *agent.Snapshot)
   223  		address          string
   224  	}{
   225  		{
   226  			// Totally nil inputs should not error and not panic, and should not set
   227  			// snapshot.reportToSend
   228  			testName:  "nil-snapshot",
   229  			inputSnap: nil,
   230  			ret:       nil,
   231  			res:       nil,
   232  		},
   233  		{
   234  			// If no ambassador modules exist in the snapshot, we should not try to send
   235  			// a report.
   236  			// More granular tests for this are in report_test.go
   237  			testName: "no-modules",
   238  			inputSnap: &snapshotTypes.Snapshot{
   239  				AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{},
   240  				Kubernetes:     &snapshotTypes.KubernetesSnapshot{},
   241  			},
   242  			ret: nil,
   243  			res: nil,
   244  		},
   245  		{
   246  			// if we let address be an empty string, the defaults should get set
   247  			testName: "default-connection-info",
   248  			inputSnap: &snapshotTypes.Snapshot{
   249  				AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
   250  					AmbassadorID:      "default",
   251  					ClusterID:         "dopecluster",
   252  					AmbassadorVersion: "v1.0",
   253  				},
   254  				Kubernetes: &snapshotTypes.KubernetesSnapshot{},
   255  			},
   256  			// should not error
   257  			ret: nil,
   258  			res: &agent.Snapshot{
   259  				Identity: &agent.Identity{
   260  					Version:   "",
   261  					Hostname:  "ambassador-host",
   262  					License:   "",
   263  					ClusterId: "dopecluster",
   264  					Label:     "",
   265  				},
   266  				ContentType: snapshotTypes.ContentTypeJSON,
   267  				ApiVersion:  snapshotTypes.ApiVersion,
   268  			},
   269  			expectedConnInfo: &ConnInfo{hostname: "app.getambassador.io", port: "443", secure: true},
   270  		},
   271  		{
   272  			// ProcessSnapshot should set the Agent.connInfo to the parsed url from the
   273  			// ambassador module's DCP config
   274  			testName: "module-contains-connection-info",
   275  			address:  "http://somecooladdress:1234",
   276  			inputSnap: &snapshotTypes.Snapshot{
   277  				AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
   278  					AmbassadorID:      "default",
   279  					AmbassadorVersion: "v1.1",
   280  					ClusterID:         "clusterid",
   281  				},
   282  				Kubernetes: &snapshotTypes.KubernetesSnapshot{},
   283  			},
   284  			ret: nil,
   285  			res: &agent.Snapshot{
   286  				Identity: &agent.Identity{
   287  					Version:   "",
   288  					Hostname:  "ambassador-host",
   289  					License:   "",
   290  					ClusterId: "clusterid",
   291  					Label:     "",
   292  				},
   293  				ContentType: snapshotTypes.ContentTypeJSON,
   294  				ApiVersion:  snapshotTypes.ApiVersion,
   295  			},
   296  			// this matches what's in
   297  			// `address`
   298  			expectedConnInfo: &ConnInfo{hostname: "somecooladdress", port: "1234", secure: false},
   299  		},
   300  		{
   301  			// if the agent has pods that match the service selector labels, it should
   302  			// return those pods in the snapshot
   303  			testName: "pods-in-snapshot",
   304  			inputSnap: &snapshotTypes.Snapshot{
   305  				AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
   306  					AmbassadorID:      "default",
   307  					ClusterID:         "dopecluster",
   308  					AmbassadorVersion: "v1.0",
   309  				},
   310  				Kubernetes: &snapshotTypes.KubernetesSnapshot{
   311  					Services: []*kates.Service{
   312  						{
   313  							Spec: kates.ServiceSpec{
   314  								Selector: map[string]string{"label": "matching"},
   315  							},
   316  						},
   317  						{
   318  							Spec: kates.ServiceSpec{
   319  								Selector: map[string]string{"label2": "alsomatching", "label3": "yay"},
   320  							},
   321  						},
   322  					},
   323  				},
   324  			},
   325  			podStore: NewPodStore([]*kates.Pod{
   326  				{
   327  					ObjectMeta: metav1.ObjectMeta{
   328  						Name:      "pod1",
   329  						Namespace: "ns",
   330  						Labels:    map[string]string{"label": "matching", "tag": "1.0"},
   331  					},
   332  					Status: v1.PodStatus{
   333  						Phase: v1.PodRunning,
   334  					},
   335  				},
   336  				{
   337  					ObjectMeta: metav1.ObjectMeta{
   338  						Name:      "pod2",
   339  						Namespace: "ns",
   340  						Labels:    map[string]string{"label2": "alsomatching", "tag": "1.0", "label3": "yay"},
   341  					},
   342  					Status: v1.PodStatus{
   343  						Phase: v1.PodFailed,
   344  					},
   345  				},
   346  				{
   347  					ObjectMeta: metav1.ObjectMeta{
   348  						Name:      "pod3",
   349  						Namespace: "ns",
   350  						Labels:    map[string]string{"label2": "alsomatching", "tag": "1.0"},
   351  					},
   352  					Status: v1.PodStatus{
   353  						Phase: v1.PodSucceeded,
   354  					},
   355  				},
   356  			}),
   357  			// should not error
   358  			ret: nil,
   359  			res: &agent.Snapshot{
   360  				Identity: &agent.Identity{
   361  					Version:   "",
   362  					Hostname:  "ambassador-host",
   363  					License:   "",
   364  					ClusterId: "dopecluster",
   365  					Label:     "",
   366  				},
   367  				ContentType: snapshotTypes.ContentTypeJSON,
   368  				ApiVersion:  snapshotTypes.ApiVersion,
   369  			},
   370  			expectedConnInfo: &ConnInfo{hostname: "app.getambassador.io", port: "443", secure: true},
   371  			assertionFunc: func(t *testing.T, agentSnap *agent.Snapshot) {
   372  				assert.NotEmpty(t, agentSnap.RawSnapshot)
   373  				ambSnap := &snapshotTypes.Snapshot{}
   374  				err := json.Unmarshal(agentSnap.RawSnapshot, ambSnap)
   375  				assert.Nil(t, err)
   376  				assert.Equal(t, len(ambSnap.Kubernetes.Services), 2)
   377  				assert.Equal(t, len(ambSnap.Kubernetes.Pods), 2)
   378  				for _, p := range ambSnap.Kubernetes.Pods {
   379  					assert.Contains(t, []string{"pod1", "pod2"}, p.ObjectMeta.Name)
   380  				}
   381  			},
   382  		},
   383  	}
   384  
   385  	for _, testcase := range snapshotTests {
   386  		t.Run(testcase.testName, func(t *testing.T) {
   387  			a := NewAgent(nil, nil)
   388  			ctx := dlog.NewTestContext(t, false)
   389  			a.coreStore = &coreStore{podStore: testcase.podStore}
   390  			a.connAddress = testcase.address
   391  
   392  			actualRet := a.ProcessSnapshot(ctx, testcase.inputSnap, "ambassador-host")
   393  
   394  			assert.Equal(t, testcase.ret, actualRet)
   395  			if testcase.res == nil {
   396  				assert.Nil(t, a.reportToSend)
   397  			} else {
   398  				assert.NotNil(t, a.reportToSend)
   399  				assert.Equal(t, testcase.res.Identity, a.reportToSend.Identity)
   400  				assert.Equal(t, testcase.res.ContentType, a.reportToSend.ContentType)
   401  				assert.Equal(t, testcase.res.ApiVersion, a.reportToSend.ApiVersion)
   402  			}
   403  			if testcase.expectedConnInfo != nil {
   404  				assert.Equal(t, testcase.expectedConnInfo, a.connInfo)
   405  			}
   406  			if testcase.assertionFunc != nil {
   407  				testcase.assertionFunc(t, a.reportToSend)
   408  			}
   409  		})
   410  	}
   411  }
   412  
   413  type mockAccumulator struct {
   414  	changedChan     chan struct{}
   415  	targetInterface interface{}
   416  }
   417  
   418  func (m *mockAccumulator) Changed() <-chan struct{} {
   419  	return m.changedChan
   420  }
   421  
   422  func (m *mockAccumulator) FilteredUpdate(_ context.Context, target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) (bool, error) {
   423  	rawtarget, err := json.Marshal(m.targetInterface)
   424  	if err != nil {
   425  		return false, err
   426  	}
   427  	if err := json.Unmarshal(rawtarget, target); err != nil {
   428  		return false, err
   429  	}
   430  	return true, nil
   431  }
   432  
   433  // Set up a watch and send a MinReportPeriod directive to the directive channel
   434  // Make sure that Agent.MinReportPeriod is set to this new value
   435  func TestWatchReportPeriodDirective(t *testing.T) {
   436  	t.Parallel()
   437  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   438  
   439  	a := NewAgent(nil, nil)
   440  	watchDone := make(chan error)
   441  
   442  	directiveChan := make(chan *agent.Directive)
   443  	a.newDirective = directiveChan
   444  	cfgDuration, err := time.ParseDuration("1ms")
   445  	assert.Nil(t, err)
   446  	// initial report period is 1 second
   447  	a.minReportPeriod = cfgDuration
   448  	// we expect it to be set to 5 seconds
   449  	expectedDuration, err := time.ParseDuration("50s10ns")
   450  	assert.Nil(t, err)
   451  
   452  	podAcc := &mockAccumulator{
   453  		changedChan: make(chan struct{}),
   454  	}
   455  	configAcc := &mockAccumulator{
   456  		changedChan: make(chan struct{}),
   457  	}
   458  	rolloutCallback := make(chan *GenericCallback)
   459  	appCallback := make(chan *GenericCallback)
   460  
   461  	go func() {
   462  		err := a.watch(ctx, "http://localhost:9697", configAcc, podAcc, rolloutCallback, appCallback)
   463  		watchDone <- err
   464  	}()
   465  	dur := durationpb.Duration{
   466  		Seconds: 50,
   467  		Nanos:   10,
   468  	}
   469  
   470  	// send directive thru the channel
   471  	directive := &agent.Directive{
   472  		ID:              "myid123",
   473  		MinReportPeriod: &dur,
   474  	}
   475  	directiveChan <- directive
   476  
   477  	// since we're async let's just sleep for a sec
   478  	time.Sleep(1)
   479  
   480  	// stop the watch
   481  	cancel()
   482  
   483  	select {
   484  	case err := <-watchDone:
   485  		assert.Nil(t, err)
   486  	case <-time.After(10 * time.Second):
   487  		t.Fatal("Timed out waiting for watch to finish after cancelling context")
   488  	}
   489  	// make sure that the agent's min report period is what we expect
   490  	assert.Equal(t, expectedDuration, a.minReportPeriod)
   491  	assert.False(t, a.reportRunning.Value())
   492  }
   493  
   494  // Start a watch and send a nil then empty directive through the channel
   495  // make sure nothing errors or panics
   496  func TestWatchEmptyDirectives(t *testing.T) {
   497  	t.Parallel()
   498  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   499  
   500  	a := NewAgent(nil, nil)
   501  	id := agent.Identity{}
   502  	a.agentID = &id
   503  	watchDone := make(chan error)
   504  	directiveChan := make(chan *agent.Directive)
   505  	a.newDirective = directiveChan
   506  
   507  	podAcc := &mockAccumulator{
   508  		changedChan: make(chan struct{}),
   509  	}
   510  	configAcc := &mockAccumulator{
   511  		changedChan: make(chan struct{}),
   512  	}
   513  	rolloutCallback := make(chan *GenericCallback)
   514  	appCallback := make(chan *GenericCallback)
   515  	go func() {
   516  		err := a.watch(ctx, "http://localhost:9697", configAcc, podAcc, rolloutCallback, appCallback)
   517  		watchDone <- err
   518  	}()
   519  
   520  	// sending a direcitve with nothing set should not error
   521  	directive := &agent.Directive{}
   522  	directiveChan <- directive
   523  	select {
   524  	case err := <-watchDone:
   525  		eString := "No error"
   526  		if err != nil {
   527  			eString = err.Error()
   528  		}
   529  		t.Fatalf("Sending empty directive stopped the watch and shouldn't have. Error: %s", eString)
   530  	case <-time.After(2 * time.Second):
   531  	}
   532  
   533  	// sending nil also shouldn't crash things
   534  	directiveChan <- nil
   535  	select {
   536  	case err := <-watchDone:
   537  		eString := "No error"
   538  		if err != nil {
   539  			eString = err.Error()
   540  		}
   541  		t.Fatalf("Sending empty directive stopped the watch and shouldn't have. Error: %s", eString)
   542  	case <-time.After(2 * time.Second):
   543  	}
   544  
   545  	cancel()
   546  
   547  	select {
   548  	case err := <-watchDone:
   549  		assert.Nil(t, err)
   550  	case <-time.After(10 * time.Second):
   551  		t.Fatal("Timed out waiting for watch to finish after cancelling context")
   552  	}
   553  }
   554  
   555  // Setup a watch
   556  // send a directive to tell the agent to stop sending reports to the agent comm.
   557  // Then, send a snapshot through the channel and ensure that it doesn't get sent to the agent com
   558  func TestWatchStopReportingDirective(t *testing.T) {
   559  	t.Parallel()
   560  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   561  
   562  	a := NewAgent(nil, nil)
   563  	id := agent.Identity{}
   564  	a.agentID = &id
   565  	watchDone := make(chan error)
   566  	directiveChan := make(chan *agent.Directive)
   567  	a.newDirective = directiveChan
   568  
   569  	// setup our mock client
   570  	client := &MockClient{}
   571  	c := &RPCComm{
   572  		conn:       client,
   573  		client:     client,
   574  		rptWake:    make(chan struct{}, 1),
   575  		retCancel:  cancel,
   576  		agentID:    &id,
   577  		directives: directiveChan,
   578  	}
   579  	a.comm = c
   580  	a.connInfo = &ConnInfo{hostname: "localhost", port: "8080", secure: false}
   581  	podAcc := &mockAccumulator{
   582  		changedChan: make(chan struct{}),
   583  	}
   584  	configAcc := &mockAccumulator{
   585  		changedChan: make(chan struct{}),
   586  	}
   587  	rolloutCallback := make(chan *GenericCallback)
   588  	appCallback := make(chan *GenericCallback)
   589  
   590  	// start watch
   591  	go func() {
   592  		err := a.watch(ctx, "http://localhost:9697", configAcc, podAcc, rolloutCallback, appCallback)
   593  		watchDone <- err
   594  	}()
   595  
   596  	// send directive to stop reporting
   597  	directive := &agent.Directive{
   598  		ID:            "1234",
   599  		StopReporting: true,
   600  	}
   601  	directiveChan <- directive
   602  	// since we're async just wait a sec
   603  	time.Sleep(time.Second * 3)
   604  
   605  	// cancel the watch
   606  	cancel()
   607  
   608  	select {
   609  	case err := <-watchDone:
   610  		assert.Nil(t, err)
   611  	case <-time.After(10 * time.Second):
   612  		t.Fatal("Timed out waiting for watch to finish after cancelling context")
   613  	}
   614  	// make sure that reportingStopped is still set
   615  	assert.True(t, a.reportingStopped)
   616  	// assert that no snapshots were sent
   617  	assert.Equal(t, len(client.GetSnapshots()), 0, "No snapshots should have been sent to the client")
   618  	assert.False(t, a.reportRunning.Value())
   619  }
   620  
   621  // Start a watch. Configure the mock client to error when Report() is called
   622  // Send a snapshot through the channel, and make sure the error propogates thru the agent.reportComplete
   623  // channel, and that the error doesn't make things sad.
   624  func TestWatchErrorSendingSnapshot(t *testing.T) {
   625  	t.Parallel()
   626  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   627  	ambId := getRandomAmbassadorID()
   628  	a := NewAgent(nil, nil)
   629  	a.reportingStopped = false
   630  	a.reportRunning.Set(false)
   631  	// set to 3 seconds so we can reliably assert that reportRunning is true later
   632  	minReport, err := time.ParseDuration("3s")
   633  	assert.Nil(t, err)
   634  	a.minReportPeriod = minReport
   635  	id := agent.Identity{}
   636  	a.agentID = &id
   637  	a.ambassadorAPIKey = "mycoolapikey"
   638  	a.ambassadorAPIKeyEnvVarValue = a.ambassadorAPIKey
   639  	a.agentCloudResourceConfigName = "bogusvalue"
   640  	// needs to match `address` from moduleConfigRaw below
   641  	a.connAddress = "http://localhost:8080"
   642  	a.connInfo = &ConnInfo{hostname: "localhost", port: "8080", secure: false}
   643  	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   644  		// setup the snapshot we'll send
   645  		snapshot := snapshotTypes.Snapshot{
   646  			AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
   647  				AmbassadorID: ambId,
   648  				ClusterID:    "reallylongthing",
   649  			},
   650  			Kubernetes: &snapshotTypes.KubernetesSnapshot{},
   651  		}
   652  		enSnapshot, err := json.Marshal(&snapshot)
   653  		if !assert.NoError(t, err) {
   654  			return
   655  		}
   656  		_, err = w.Write(enSnapshot)
   657  		assert.NoError(t, err)
   658  
   659  	}))
   660  	defer ts.Close()
   661  	mockError := errors.New("MockClient: Error sending report")
   662  
   663  	client := &MockClient{
   664  		// force an error
   665  		reportFunc: func(ctx context.Context, in *agent.Snapshot) (*agent.SnapshotResponse, error) {
   666  			return nil, mockError
   667  		},
   668  	}
   669  	c := &RPCComm{
   670  		conn:       client,
   671  		client:     client,
   672  		rptWake:    make(chan struct{}, 1),
   673  		retCancel:  cancel,
   674  		agentID:    &id,
   675  		directives: make(chan *agent.Directive, 1),
   676  	}
   677  	a.comm = c
   678  
   679  	watchDone := make(chan error)
   680  	podAcc := &mockAccumulator{
   681  		changedChan: make(chan struct{}),
   682  	}
   683  	configAcc := &mockAccumulator{
   684  		changedChan: make(chan struct{}),
   685  	}
   686  	rolloutCallback := make(chan *GenericCallback)
   687  	appCallback := make(chan *GenericCallback)
   688  
   689  	// start the watch
   690  	go func() {
   691  		err := a.watch(ctx, ts.URL, configAcc, podAcc, rolloutCallback, appCallback)
   692  		watchDone <- err
   693  	}()
   694  
   695  	// assert that report completes
   696  	select {
   697  	case err := <-a.reportComplete:
   698  		// make sure that we got an error and that error is the same one we configured the
   699  		// mock client to send
   700  		assert.ErrorIs(t, err, mockError)
   701  		assert.False(t, a.reportRunning.Value())
   702  		cancel()
   703  	case err := <-watchDone:
   704  		if err != nil {
   705  			t.Fatalf("Watch ended early with error %s", err.Error())
   706  		} else {
   707  			t.Fatal("Watch ended early with no error.")
   708  		}
   709  	case <-time.After(10 * time.Second):
   710  		cancel()
   711  		t.Fatal("Timed out waiting for report to complete.")
   712  	}
   713  	select {
   714  	case err := <-watchDone:
   715  		assert.Nil(t, err)
   716  	case <-time.After(10 * time.Second):
   717  		t.Fatal("Timed out waiting for watch to end")
   718  	}
   719  }
   720  
   721  // Start a watch. Setup a mock client to capture what we would have sent to the agent com
   722  // Send a snapshot with some data in it thru the channel
   723  // Make sure the Snapshot.KubernetesSecrets and Snapshot.Invalid get scrubbed of sensitive data and
   724  // we send a SnapshotTs that makes sense (so the agent com can throw out older snapshots)
   725  func TestWatchWithSnapshot(t *testing.T) {
   726  	t.Parallel()
   727  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   728  	clusterID := "coolcluster"
   729  	ambId := getRandomAmbassadorID()
   730  	a := NewAgent(nil, nil)
   731  	a.reportingStopped = false
   732  	a.reportRunning.Set(false)
   733  
   734  	id := agent.Identity{}
   735  	// set to 0 seconds so we can reliably assert that report running is false later
   736  	minReport, err := time.ParseDuration("0s")
   737  	assert.Nil(t, err)
   738  	a.minReportPeriod = minReport
   739  	a.agentID = &id
   740  	// needs to matched parsed ish below
   741  	a.connAddress = "http://localhost:8080/"
   742  	a.connInfo = &ConnInfo{hostname: "localhost", port: "8080", secure: false}
   743  	apiKey := "coolapikey"
   744  	a.ambassadorAPIKey = apiKey
   745  	a.ambassadorAPIKeyEnvVarValue = apiKey
   746  	a.agentCloudResourceConfigName = "bogusvalue"
   747  	snapshot := &snapshotTypes.Snapshot{
   748  		Invalid: []*kates.Unstructured{
   749  			// everything that's not errors or metadata here needs to get scrubbed
   750  			getUnstructured(`
   751  {
   752  "kind":"WeirdKind",
   753  "apiVersion":"v1",
   754  "metadata": {
   755  "name":"hi",
   756  "namespace":"default"
   757  },
   758  "errors": "someerrors",
   759  "wat":"dontshowthis"
   760  }`),
   761  		},
   762  		Kubernetes: &snapshotTypes.KubernetesSnapshot{
   763  			Secrets: []*kates.Secret{
   764  				{
   765  					TypeMeta: metav1.TypeMeta{
   766  						Kind:       "Secret",
   767  						APIVersion: "v1",
   768  					},
   769  					ObjectMeta: metav1.ObjectMeta{
   770  						Name:      "secret-1",
   771  						Namespace: "ns",
   772  						// make sure this gets unset
   773  						Annotations: map[string]string{"also": "unset"},
   774  					},
   775  					Type: "Opaque",
   776  					Data: map[string][]byte{
   777  						// make sure these values get scrubbed
   778  						"data1": []byte("d293YXNlY3JldA=="),
   779  						"data2": []byte("d293YW5vdGhlcm9uZQ=="),
   780  					},
   781  				},
   782  			},
   783  		},
   784  		AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
   785  			AmbassadorID:      ambId,
   786  			ClusterID:         clusterID,
   787  			AmbassadorVersion: "v1.0",
   788  		},
   789  	}
   790  	// send a snapshot thru the channel
   791  	// keep track of when we did that for assertions
   792  	var snapshotSentTime time.Time
   793  	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   794  		enSnapshot, err := json.Marshal(&snapshot)
   795  		if !assert.NoError(t, err) {
   796  			return
   797  		}
   798  		_, err = w.Write(enSnapshot)
   799  		assert.NoError(t, err)
   800  		snapshotSentTime = time.Now()
   801  	}))
   802  	defer ts.Close()
   803  
   804  	// setup the mock client
   805  	client := &MockClient{}
   806  	c := &RPCComm{
   807  		conn:       client,
   808  		client:     client,
   809  		rptWake:    make(chan struct{}, 1),
   810  		retCancel:  cancel,
   811  		agentID:    &id,
   812  		directives: make(chan *agent.Directive, 1),
   813  	}
   814  	a.comm = c
   815  
   816  	watchDone := make(chan error)
   817  	podAcc := &mockAccumulator{
   818  		changedChan: make(chan struct{}),
   819  		targetInterface: CoreSnapshot{
   820  			Pods: []*kates.Pod{
   821  				{
   822  					TypeMeta: metav1.TypeMeta{
   823  						Kind:       "Pod",
   824  						APIVersion: "v1",
   825  					},
   826  					ObjectMeta: metav1.ObjectMeta{
   827  						Name:      "some-pod",
   828  						Namespace: "default",
   829  					},
   830  					Status: v1.PodStatus{
   831  						Phase: v1.PodRunning,
   832  					},
   833  				},
   834  			},
   835  			Endpoints: []*kates.Endpoints{
   836  				{
   837  					TypeMeta: metav1.TypeMeta{
   838  						Kind:       "Endpoints",
   839  						APIVersion: "v1",
   840  					},
   841  					ObjectMeta: metav1.ObjectMeta{
   842  						Name:      "some-endpoint",
   843  						Namespace: "default",
   844  					},
   845  				},
   846  			},
   847  			Deployments: []*kates.Deployment{
   848  				{
   849  					TypeMeta: metav1.TypeMeta{
   850  						Kind:       "Deployment",
   851  						APIVersion: "apps/v1",
   852  					},
   853  					ObjectMeta: metav1.ObjectMeta{
   854  						Name:      "some-deployment",
   855  						Namespace: "default",
   856  					},
   857  				},
   858  			},
   859  			ConfigMaps: []*kates.ConfigMap{
   860  				{
   861  					TypeMeta: metav1.TypeMeta{
   862  						Kind:       "ConfigMap",
   863  						APIVersion: "",
   864  					},
   865  					ObjectMeta: metav1.ObjectMeta{
   866  						Name:      "some-config-map",
   867  						Namespace: "default",
   868  					},
   869  				},
   870  			},
   871  		},
   872  	}
   873  	configAcc := &mockAccumulator{
   874  		changedChan: make(chan struct{}),
   875  	}
   876  	rolloutCallback := make(chan *GenericCallback)
   877  	appCallback := make(chan *GenericCallback)
   878  
   879  	// start the watch
   880  	go func() {
   881  		err := a.watch(ctx, ts.URL, configAcc, podAcc, rolloutCallback, appCallback)
   882  		watchDone <- err
   883  	}()
   884  
   885  	// assert that we send a couple of reports.
   886  	// we just want to make sure we don't get stuck after sending one report
   887  	// each report will be the same because the snapshot server we setup for this test is just
   888  	// returning static content
   889  	reportsSent := 0
   890  	for reportsSent < 2 {
   891  		podAcc.changedChan <- struct{}{}
   892  		select {
   893  		case err := <-a.reportComplete:
   894  			assert.Nil(t, err)
   895  			reportsSent += 1
   896  		case err := <-watchDone:
   897  			t.Fatalf("Watch ended early with error %s", err.Error())
   898  		case <-time.After(10 * time.Second):
   899  			cancel()
   900  			t.Fatal("Timed out waiting for report to complete.")
   901  		}
   902  	}
   903  	cancel()
   904  
   905  	// stop the watch and make sure if finishes without an error
   906  	select {
   907  	case err := <-watchDone:
   908  		// make sure the watch finishes without a problem
   909  		assert.Nil(t, err)
   910  	case <-time.After(10 * time.Second):
   911  		t.Fatal("Timed out waiting for watch to finish after cancelling context")
   912  	}
   913  	sentSnaps := client.GetSnapshots()
   914  
   915  	// Make sure that the client got a snapshot to send
   916  	assert.NotNil(t, sentSnaps, "No snapshots sent")
   917  	assert.GreaterOrEqual(t, len(sentSnaps), 1, "Should have sent at least 1 snapshot")
   918  	lastMeta := client.GetLastMetadata()
   919  	assert.NotNil(t, lastMeta)
   920  	md := lastMeta.Get("x-ambassador-api-key")
   921  	assert.NotEmpty(t, md)
   922  	assert.Equal(t, md[0], apiKey)
   923  
   924  	/////// Make sure the raw snapshot that got sent looks like we expect
   925  	sentSnapshot := sentSnaps[1]
   926  	var actualSnapshot snapshotTypes.Snapshot
   927  	err = json.Unmarshal(sentSnapshot.RawSnapshot, &actualSnapshot)
   928  	assert.Nil(t, err)
   929  
   930  	// Assert invalid things got scrubbed
   931  	assert.Equal(t, len(actualSnapshot.Invalid), 1)
   932  	expectedInvalid := getUnstructured(`
   933  {
   934  "kind":"WeirdKind",
   935  "apiVersion":"v1",
   936  "metadata": {
   937  "name":"hi",
   938  "namespace":"default"
   939  },
   940  "errors":"someerrors"
   941  }`)
   942  	assert.Equal(t, actualSnapshot.Invalid[0], expectedInvalid)
   943  
   944  	// make sure the secret values got scrubbed
   945  	assert.NotNil(t, actualSnapshot.Kubernetes)
   946  	assert.Equal(t, len(actualSnapshot.Kubernetes.Secrets), 1)
   947  	assert.Equal(t, len(actualSnapshot.Kubernetes.Secrets[0].ObjectMeta.Annotations), 0)
   948  	assert.Equal(t, "secret-1", actualSnapshot.Kubernetes.Secrets[0].Name)
   949  	assert.Equal(t, "ns", actualSnapshot.Kubernetes.Secrets[0].Namespace)
   950  	secretData := actualSnapshot.Kubernetes.Secrets[0].Data
   951  	assert.NotEqual(t, []byte("d293YXNlY3JldA=="), secretData["data1"])
   952  	assert.NotEqual(t, []byte("d293YW5vdGhlcm9uZQ=="), secretData["data2"])
   953  
   954  	// check that the other resources we watch make it into the snapshot
   955  	assert.Equal(t, len(actualSnapshot.Kubernetes.Endpoints), 1)
   956  	assert.Equal(t, len(actualSnapshot.Kubernetes.Pods), 1)
   957  	assert.Equal(t, len(actualSnapshot.Kubernetes.ConfigMaps), 1)
   958  	assert.Equal(t, len(actualSnapshot.Kubernetes.Deployments), 1)
   959  
   960  	/////// Make sure that the timestamp we sent makes sense
   961  	assert.NotNil(t, sentSnapshot.SnapshotTs)
   962  	snapshotTime := sentSnapshot.SnapshotTs.AsTime()
   963  	assert.WithinDuration(t, snapshotSentTime, snapshotTime, 5*time.Second)
   964  
   965  	/////// assert API version and content type
   966  	assert.Equal(t, snapshotTypes.ApiVersion, sentSnapshot.ApiVersion)
   967  	assert.Equal(t, snapshotTypes.ContentTypeJSON, sentSnapshot.ContentType)
   968  
   969  	/////// Identity assertions
   970  	actualIdentity := sentSnapshot.Identity
   971  	assert.NotNil(t, actualIdentity)
   972  	assert.Equal(t, "", actualIdentity.AccountId)
   973  	assert.NotNil(t, actualIdentity.Version)
   974  	assert.Equal(t, "", actualIdentity.Version)
   975  	assert.Equal(t, clusterID, actualIdentity.ClusterId)
   976  	parsedURL, err := url.Parse(ts.URL)
   977  	assert.Nil(t, err)
   978  	assert.Equal(t, actualIdentity.Hostname, parsedURL.Hostname())
   979  }
   980  
   981  // Setup a watch.
   982  // Send a snapshot with no cluster id
   983  // Make sure we don't try to send anything and that nothing errors or panics
   984  func TestWatchEmptySnapshot(t *testing.T) {
   985  	t.Parallel()
   986  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   987  
   988  	a := NewAgent(nil, nil)
   989  	minReport, err := time.ParseDuration("1ms")
   990  	assert.Nil(t, err)
   991  	a.minReportPeriod = minReport
   992  	watchDone := make(chan error)
   993  
   994  	snapshotRequested := make(chan bool)
   995  	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   996  		ambId := getRandomAmbassadorID()
   997  		// setup the snapshot we'll send
   998  		snapshot := snapshotTypes.Snapshot{
   999  			AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
  1000  				AmbassadorID: ambId,
  1001  			},
  1002  		}
  1003  		enSnapshot, err := json.Marshal(&snapshot)
  1004  		if err != nil {
  1005  			t.Fatal("error marshalling snapshot")
  1006  		}
  1007  
  1008  		_, _ = w.Write(enSnapshot)
  1009  		select {
  1010  		case snapshotRequested <- true:
  1011  		default:
  1012  		}
  1013  	}))
  1014  	defer ts.Close()
  1015  	podAcc := &mockAccumulator{
  1016  		changedChan: make(chan struct{}),
  1017  	}
  1018  	configAcc := &mockAccumulator{
  1019  		changedChan: make(chan struct{}),
  1020  	}
  1021  	rolloutCallback := make(chan *GenericCallback)
  1022  	appCallback := make(chan *GenericCallback)
  1023  	go func() {
  1024  		err := a.watch(ctx, ts.URL, configAcc, podAcc, rolloutCallback, appCallback)
  1025  		watchDone <- err
  1026  	}()
  1027  	select {
  1028  	case <-snapshotRequested:
  1029  		cancel()
  1030  	case <-time.After(10 * time.Second):
  1031  		t.Fatalf("Timed out waiting for agent to request snapshot")
  1032  		cancel()
  1033  	}
  1034  
  1035  	select {
  1036  	case err := <-watchDone:
  1037  		assert.Nil(t, err)
  1038  	case <-time.After(5 * time.Second):
  1039  		t.Fatal("Watch did not end")
  1040  	}
  1041  	assert.False(t, a.reportRunning.Value())
  1042  }
  1043  

View as plain text