1 package agent
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "math/rand"
8 "net/http"
9 "net/http/httptest"
10 "net/url"
11 "testing"
12 "time"
13
14 "github.com/stretchr/testify/assert"
15
16 "google.golang.org/protobuf/types/known/durationpb"
17
18 v1 "k8s.io/api/core/v1"
19 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
21 "github.com/datawire/ambassador/v2/pkg/api/agent"
22 "github.com/datawire/ambassador/v2/pkg/kates"
23 snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
24 "github.com/datawire/dlib/dlog"
25 )
26
27
28
29 func getUnstructured(objStr string) *kates.Unstructured {
30 var obj map[string]interface{}
31 _ = json.Unmarshal([]byte(objStr), &obj)
32 unstructured := &kates.Unstructured{}
33 unstructured.SetUnstructuredContent(obj)
34 return unstructured
35 }
36
37 const letterBytes = "abcdefghijklmnopqrstuvwxyz"
38
39 func getRandomAmbassadorID() string {
40 b := make([]byte, 10)
41 for i := range b {
42 b[i] = letterBytes[rand.Intn(len(letterBytes))]
43 }
44 return string(b)
45 }
46
47 func TestHandleAPIKeyConfigChange(t *testing.T) {
48 t.Parallel()
49 objMeta := metav1.ObjectMeta{
50 Name: "coolname",
51 Namespace: "coolnamespace",
52 }
53 testcases := []struct {
54 testName string
55 agent *Agent
56 secrets []kates.Secret
57 configMaps []kates.ConfigMap
58 expectedAPIKey string
59 }{
60 {
61 testName: "configmap-wins",
62 agent: &Agent{
63 agentNamespace: "coolnamespace",
64 agentCloudResourceConfigName: "coolname",
65 ambassadorAPIKey: "",
66 ambassadorAPIKeyEnvVarValue: "",
67 },
68 secrets: []kates.Secret{},
69 configMaps: []kates.ConfigMap{
70 {
71 ObjectMeta: objMeta,
72 Data: map[string]string{
73 "CLOUD_CONNECT_TOKEN": "beepboop",
74 },
75 },
76 },
77 expectedAPIKey: "beepboop",
78 },
79 {
80 testName: "secret-over-configmap",
81 agent: &Agent{
82 agentNamespace: "coolnamespace",
83 agentCloudResourceConfigName: "coolname",
84 ambassadorAPIKey: "",
85 ambassadorAPIKeyEnvVarValue: "",
86 },
87 secrets: []kates.Secret{
88 {
89 ObjectMeta: objMeta,
90 Data: map[string][]byte{
91 "CLOUD_CONNECT_TOKEN": []byte("secretvalue"),
92 },
93 },
94 },
95 configMaps: []kates.ConfigMap{
96 {
97 ObjectMeta: objMeta,
98 Data: map[string]string{
99 "CLOUD_CONNECT_TOKEN": "beepboop",
100 },
101 },
102 },
103 expectedAPIKey: "secretvalue",
104 },
105 {
106 testName: "from-secret",
107 agent: &Agent{
108 agentNamespace: "coolnamespace",
109 agentCloudResourceConfigName: "coolname",
110 ambassadorAPIKey: "",
111 ambassadorAPIKeyEnvVarValue: "",
112 },
113 secrets: []kates.Secret{
114 {
115 ObjectMeta: objMeta,
116 Data: map[string][]byte{
117 "CLOUD_CONNECT_TOKEN": []byte("secretvalue"),
118 },
119 },
120 },
121 configMaps: []kates.ConfigMap{},
122 expectedAPIKey: "secretvalue",
123 },
124 {
125 testName: "configmap-empty-string-value",
126 agent: &Agent{
127 agentNamespace: "coolnamespace",
128 agentCloudResourceConfigName: "coolname",
129 ambassadorAPIKey: "someexistingvalue",
130 ambassadorAPIKeyEnvVarValue: "",
131 },
132 secrets: []kates.Secret{},
133 configMaps: []kates.ConfigMap{
134 {
135 ObjectMeta: objMeta,
136 Data: map[string]string{},
137 },
138 },
139 expectedAPIKey: "",
140 },
141 {
142 testName: "secret-empty-string-value",
143 agent: &Agent{
144 agentNamespace: "coolnamespace",
145 agentCloudResourceConfigName: "coolname",
146 ambassadorAPIKey: "someexistingvalue",
147 ambassadorAPIKeyEnvVarValue: "",
148 },
149 secrets: []kates.Secret{
150 {
151 ObjectMeta: objMeta,
152 Data: map[string][]byte{},
153 },
154 },
155 configMaps: []kates.ConfigMap{},
156 expectedAPIKey: "",
157 },
158 {
159 testName: "fall-back-envvar",
160 agent: &Agent{
161 agentNamespace: "coolnamespace",
162 agentCloudResourceConfigName: "coolname",
163 ambassadorAPIKey: "somevaluefromsomewhereelse",
164 ambassadorAPIKeyEnvVarValue: "gotfromenv",
165 },
166 expectedAPIKey: "gotfromenv",
167 },
168 {
169 testName: "fall-back-envvar-bad-configs",
170 agent: &Agent{
171 agentNamespace: "notcoolnamespace",
172 agentCloudResourceConfigName: "notcoolname",
173 ambassadorAPIKey: "somevaluefromsomewhereelse",
174 ambassadorAPIKeyEnvVarValue: "gotfromenv",
175 },
176 secrets: []kates.Secret{
177 {
178 ObjectMeta: objMeta,
179 Data: map[string][]byte{
180 "CLOUD_CONNECT_TOKEN": []byte("secretvalue"),
181 },
182 },
183 },
184 configMaps: []kates.ConfigMap{
185 {
186 ObjectMeta: objMeta,
187 Data: map[string]string{
188 "CLOUD_CONNECT_TOKEN": "secretvalue",
189 },
190 },
191 },
192 expectedAPIKey: "gotfromenv",
193 },
194 }
195 for _, tc := range testcases {
196 t.Run(tc.testName, func(t *testing.T) {
197 ctx := dlog.NewTestContext(t, false)
198
199 tc.agent.handleAPIKeyConfigChange(ctx, tc.secrets, tc.configMaps)
200
201 assert.Equal(t, tc.agent.ambassadorAPIKey, tc.expectedAPIKey)
202
203 })
204 }
205 }
206
207 func TestProcessSnapshot(t *testing.T) {
208 t.Parallel()
209 snapshotTests := []struct {
210
211 testName string
212
213 inputSnap *snapshotTypes.Snapshot
214
215 ret error
216
217 res *agent.Snapshot
218
219
220 expectedConnInfo *ConnInfo
221 podStore *podStore
222 assertionFunc func(*testing.T, *agent.Snapshot)
223 address string
224 }{
225 {
226
227
228 testName: "nil-snapshot",
229 inputSnap: nil,
230 ret: nil,
231 res: nil,
232 },
233 {
234
235
236
237 testName: "no-modules",
238 inputSnap: &snapshotTypes.Snapshot{
239 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{},
240 Kubernetes: &snapshotTypes.KubernetesSnapshot{},
241 },
242 ret: nil,
243 res: nil,
244 },
245 {
246
247 testName: "default-connection-info",
248 inputSnap: &snapshotTypes.Snapshot{
249 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
250 AmbassadorID: "default",
251 ClusterID: "dopecluster",
252 AmbassadorVersion: "v1.0",
253 },
254 Kubernetes: &snapshotTypes.KubernetesSnapshot{},
255 },
256
257 ret: nil,
258 res: &agent.Snapshot{
259 Identity: &agent.Identity{
260 Version: "",
261 Hostname: "ambassador-host",
262 License: "",
263 ClusterId: "dopecluster",
264 Label: "",
265 },
266 ContentType: snapshotTypes.ContentTypeJSON,
267 ApiVersion: snapshotTypes.ApiVersion,
268 },
269 expectedConnInfo: &ConnInfo{hostname: "app.getambassador.io", port: "443", secure: true},
270 },
271 {
272
273
274 testName: "module-contains-connection-info",
275 address: "http://somecooladdress:1234",
276 inputSnap: &snapshotTypes.Snapshot{
277 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
278 AmbassadorID: "default",
279 AmbassadorVersion: "v1.1",
280 ClusterID: "clusterid",
281 },
282 Kubernetes: &snapshotTypes.KubernetesSnapshot{},
283 },
284 ret: nil,
285 res: &agent.Snapshot{
286 Identity: &agent.Identity{
287 Version: "",
288 Hostname: "ambassador-host",
289 License: "",
290 ClusterId: "clusterid",
291 Label: "",
292 },
293 ContentType: snapshotTypes.ContentTypeJSON,
294 ApiVersion: snapshotTypes.ApiVersion,
295 },
296
297
298 expectedConnInfo: &ConnInfo{hostname: "somecooladdress", port: "1234", secure: false},
299 },
300 {
301
302
303 testName: "pods-in-snapshot",
304 inputSnap: &snapshotTypes.Snapshot{
305 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
306 AmbassadorID: "default",
307 ClusterID: "dopecluster",
308 AmbassadorVersion: "v1.0",
309 },
310 Kubernetes: &snapshotTypes.KubernetesSnapshot{
311 Services: []*kates.Service{
312 {
313 Spec: kates.ServiceSpec{
314 Selector: map[string]string{"label": "matching"},
315 },
316 },
317 {
318 Spec: kates.ServiceSpec{
319 Selector: map[string]string{"label2": "alsomatching", "label3": "yay"},
320 },
321 },
322 },
323 },
324 },
325 podStore: NewPodStore([]*kates.Pod{
326 {
327 ObjectMeta: metav1.ObjectMeta{
328 Name: "pod1",
329 Namespace: "ns",
330 Labels: map[string]string{"label": "matching", "tag": "1.0"},
331 },
332 Status: v1.PodStatus{
333 Phase: v1.PodRunning,
334 },
335 },
336 {
337 ObjectMeta: metav1.ObjectMeta{
338 Name: "pod2",
339 Namespace: "ns",
340 Labels: map[string]string{"label2": "alsomatching", "tag": "1.0", "label3": "yay"},
341 },
342 Status: v1.PodStatus{
343 Phase: v1.PodFailed,
344 },
345 },
346 {
347 ObjectMeta: metav1.ObjectMeta{
348 Name: "pod3",
349 Namespace: "ns",
350 Labels: map[string]string{"label2": "alsomatching", "tag": "1.0"},
351 },
352 Status: v1.PodStatus{
353 Phase: v1.PodSucceeded,
354 },
355 },
356 }),
357
358 ret: nil,
359 res: &agent.Snapshot{
360 Identity: &agent.Identity{
361 Version: "",
362 Hostname: "ambassador-host",
363 License: "",
364 ClusterId: "dopecluster",
365 Label: "",
366 },
367 ContentType: snapshotTypes.ContentTypeJSON,
368 ApiVersion: snapshotTypes.ApiVersion,
369 },
370 expectedConnInfo: &ConnInfo{hostname: "app.getambassador.io", port: "443", secure: true},
371 assertionFunc: func(t *testing.T, agentSnap *agent.Snapshot) {
372 assert.NotEmpty(t, agentSnap.RawSnapshot)
373 ambSnap := &snapshotTypes.Snapshot{}
374 err := json.Unmarshal(agentSnap.RawSnapshot, ambSnap)
375 assert.Nil(t, err)
376 assert.Equal(t, len(ambSnap.Kubernetes.Services), 2)
377 assert.Equal(t, len(ambSnap.Kubernetes.Pods), 2)
378 for _, p := range ambSnap.Kubernetes.Pods {
379 assert.Contains(t, []string{"pod1", "pod2"}, p.ObjectMeta.Name)
380 }
381 },
382 },
383 }
384
385 for _, testcase := range snapshotTests {
386 t.Run(testcase.testName, func(t *testing.T) {
387 a := NewAgent(nil, nil)
388 ctx := dlog.NewTestContext(t, false)
389 a.coreStore = &coreStore{podStore: testcase.podStore}
390 a.connAddress = testcase.address
391
392 actualRet := a.ProcessSnapshot(ctx, testcase.inputSnap, "ambassador-host")
393
394 assert.Equal(t, testcase.ret, actualRet)
395 if testcase.res == nil {
396 assert.Nil(t, a.reportToSend)
397 } else {
398 assert.NotNil(t, a.reportToSend)
399 assert.Equal(t, testcase.res.Identity, a.reportToSend.Identity)
400 assert.Equal(t, testcase.res.ContentType, a.reportToSend.ContentType)
401 assert.Equal(t, testcase.res.ApiVersion, a.reportToSend.ApiVersion)
402 }
403 if testcase.expectedConnInfo != nil {
404 assert.Equal(t, testcase.expectedConnInfo, a.connInfo)
405 }
406 if testcase.assertionFunc != nil {
407 testcase.assertionFunc(t, a.reportToSend)
408 }
409 })
410 }
411 }
412
413 type mockAccumulator struct {
414 changedChan chan struct{}
415 targetInterface interface{}
416 }
417
418 func (m *mockAccumulator) Changed() <-chan struct{} {
419 return m.changedChan
420 }
421
422 func (m *mockAccumulator) FilteredUpdate(_ context.Context, target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) (bool, error) {
423 rawtarget, err := json.Marshal(m.targetInterface)
424 if err != nil {
425 return false, err
426 }
427 if err := json.Unmarshal(rawtarget, target); err != nil {
428 return false, err
429 }
430 return true, nil
431 }
432
433
434
435 func TestWatchReportPeriodDirective(t *testing.T) {
436 t.Parallel()
437 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
438
439 a := NewAgent(nil, nil)
440 watchDone := make(chan error)
441
442 directiveChan := make(chan *agent.Directive)
443 a.newDirective = directiveChan
444 cfgDuration, err := time.ParseDuration("1ms")
445 assert.Nil(t, err)
446
447 a.minReportPeriod = cfgDuration
448
449 expectedDuration, err := time.ParseDuration("50s10ns")
450 assert.Nil(t, err)
451
452 podAcc := &mockAccumulator{
453 changedChan: make(chan struct{}),
454 }
455 configAcc := &mockAccumulator{
456 changedChan: make(chan struct{}),
457 }
458 rolloutCallback := make(chan *GenericCallback)
459 appCallback := make(chan *GenericCallback)
460
461 go func() {
462 err := a.watch(ctx, "http://localhost:9697", configAcc, podAcc, rolloutCallback, appCallback)
463 watchDone <- err
464 }()
465 dur := durationpb.Duration{
466 Seconds: 50,
467 Nanos: 10,
468 }
469
470
471 directive := &agent.Directive{
472 ID: "myid123",
473 MinReportPeriod: &dur,
474 }
475 directiveChan <- directive
476
477
478 time.Sleep(1)
479
480
481 cancel()
482
483 select {
484 case err := <-watchDone:
485 assert.Nil(t, err)
486 case <-time.After(10 * time.Second):
487 t.Fatal("Timed out waiting for watch to finish after cancelling context")
488 }
489
490 assert.Equal(t, expectedDuration, a.minReportPeriod)
491 assert.False(t, a.reportRunning.Value())
492 }
493
494
495
496 func TestWatchEmptyDirectives(t *testing.T) {
497 t.Parallel()
498 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
499
500 a := NewAgent(nil, nil)
501 id := agent.Identity{}
502 a.agentID = &id
503 watchDone := make(chan error)
504 directiveChan := make(chan *agent.Directive)
505 a.newDirective = directiveChan
506
507 podAcc := &mockAccumulator{
508 changedChan: make(chan struct{}),
509 }
510 configAcc := &mockAccumulator{
511 changedChan: make(chan struct{}),
512 }
513 rolloutCallback := make(chan *GenericCallback)
514 appCallback := make(chan *GenericCallback)
515 go func() {
516 err := a.watch(ctx, "http://localhost:9697", configAcc, podAcc, rolloutCallback, appCallback)
517 watchDone <- err
518 }()
519
520
521 directive := &agent.Directive{}
522 directiveChan <- directive
523 select {
524 case err := <-watchDone:
525 eString := "No error"
526 if err != nil {
527 eString = err.Error()
528 }
529 t.Fatalf("Sending empty directive stopped the watch and shouldn't have. Error: %s", eString)
530 case <-time.After(2 * time.Second):
531 }
532
533
534 directiveChan <- nil
535 select {
536 case err := <-watchDone:
537 eString := "No error"
538 if err != nil {
539 eString = err.Error()
540 }
541 t.Fatalf("Sending empty directive stopped the watch and shouldn't have. Error: %s", eString)
542 case <-time.After(2 * time.Second):
543 }
544
545 cancel()
546
547 select {
548 case err := <-watchDone:
549 assert.Nil(t, err)
550 case <-time.After(10 * time.Second):
551 t.Fatal("Timed out waiting for watch to finish after cancelling context")
552 }
553 }
554
555
556
557
558 func TestWatchStopReportingDirective(t *testing.T) {
559 t.Parallel()
560 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
561
562 a := NewAgent(nil, nil)
563 id := agent.Identity{}
564 a.agentID = &id
565 watchDone := make(chan error)
566 directiveChan := make(chan *agent.Directive)
567 a.newDirective = directiveChan
568
569
570 client := &MockClient{}
571 c := &RPCComm{
572 conn: client,
573 client: client,
574 rptWake: make(chan struct{}, 1),
575 retCancel: cancel,
576 agentID: &id,
577 directives: directiveChan,
578 }
579 a.comm = c
580 a.connInfo = &ConnInfo{hostname: "localhost", port: "8080", secure: false}
581 podAcc := &mockAccumulator{
582 changedChan: make(chan struct{}),
583 }
584 configAcc := &mockAccumulator{
585 changedChan: make(chan struct{}),
586 }
587 rolloutCallback := make(chan *GenericCallback)
588 appCallback := make(chan *GenericCallback)
589
590
591 go func() {
592 err := a.watch(ctx, "http://localhost:9697", configAcc, podAcc, rolloutCallback, appCallback)
593 watchDone <- err
594 }()
595
596
597 directive := &agent.Directive{
598 ID: "1234",
599 StopReporting: true,
600 }
601 directiveChan <- directive
602
603 time.Sleep(time.Second * 3)
604
605
606 cancel()
607
608 select {
609 case err := <-watchDone:
610 assert.Nil(t, err)
611 case <-time.After(10 * time.Second):
612 t.Fatal("Timed out waiting for watch to finish after cancelling context")
613 }
614
615 assert.True(t, a.reportingStopped)
616
617 assert.Equal(t, len(client.GetSnapshots()), 0, "No snapshots should have been sent to the client")
618 assert.False(t, a.reportRunning.Value())
619 }
620
621
622
623
624 func TestWatchErrorSendingSnapshot(t *testing.T) {
625 t.Parallel()
626 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
627 ambId := getRandomAmbassadorID()
628 a := NewAgent(nil, nil)
629 a.reportingStopped = false
630 a.reportRunning.Set(false)
631
632 minReport, err := time.ParseDuration("3s")
633 assert.Nil(t, err)
634 a.minReportPeriod = minReport
635 id := agent.Identity{}
636 a.agentID = &id
637 a.ambassadorAPIKey = "mycoolapikey"
638 a.ambassadorAPIKeyEnvVarValue = a.ambassadorAPIKey
639 a.agentCloudResourceConfigName = "bogusvalue"
640
641 a.connAddress = "http://localhost:8080"
642 a.connInfo = &ConnInfo{hostname: "localhost", port: "8080", secure: false}
643 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
644
645 snapshot := snapshotTypes.Snapshot{
646 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
647 AmbassadorID: ambId,
648 ClusterID: "reallylongthing",
649 },
650 Kubernetes: &snapshotTypes.KubernetesSnapshot{},
651 }
652 enSnapshot, err := json.Marshal(&snapshot)
653 if !assert.NoError(t, err) {
654 return
655 }
656 _, err = w.Write(enSnapshot)
657 assert.NoError(t, err)
658
659 }))
660 defer ts.Close()
661 mockError := errors.New("MockClient: Error sending report")
662
663 client := &MockClient{
664
665 reportFunc: func(ctx context.Context, in *agent.Snapshot) (*agent.SnapshotResponse, error) {
666 return nil, mockError
667 },
668 }
669 c := &RPCComm{
670 conn: client,
671 client: client,
672 rptWake: make(chan struct{}, 1),
673 retCancel: cancel,
674 agentID: &id,
675 directives: make(chan *agent.Directive, 1),
676 }
677 a.comm = c
678
679 watchDone := make(chan error)
680 podAcc := &mockAccumulator{
681 changedChan: make(chan struct{}),
682 }
683 configAcc := &mockAccumulator{
684 changedChan: make(chan struct{}),
685 }
686 rolloutCallback := make(chan *GenericCallback)
687 appCallback := make(chan *GenericCallback)
688
689
690 go func() {
691 err := a.watch(ctx, ts.URL, configAcc, podAcc, rolloutCallback, appCallback)
692 watchDone <- err
693 }()
694
695
696 select {
697 case err := <-a.reportComplete:
698
699
700 assert.ErrorIs(t, err, mockError)
701 assert.False(t, a.reportRunning.Value())
702 cancel()
703 case err := <-watchDone:
704 if err != nil {
705 t.Fatalf("Watch ended early with error %s", err.Error())
706 } else {
707 t.Fatal("Watch ended early with no error.")
708 }
709 case <-time.After(10 * time.Second):
710 cancel()
711 t.Fatal("Timed out waiting for report to complete.")
712 }
713 select {
714 case err := <-watchDone:
715 assert.Nil(t, err)
716 case <-time.After(10 * time.Second):
717 t.Fatal("Timed out waiting for watch to end")
718 }
719 }
720
721
722
723
724
725 func TestWatchWithSnapshot(t *testing.T) {
726 t.Parallel()
727 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
728 clusterID := "coolcluster"
729 ambId := getRandomAmbassadorID()
730 a := NewAgent(nil, nil)
731 a.reportingStopped = false
732 a.reportRunning.Set(false)
733
734 id := agent.Identity{}
735
736 minReport, err := time.ParseDuration("0s")
737 assert.Nil(t, err)
738 a.minReportPeriod = minReport
739 a.agentID = &id
740
741 a.connAddress = "http://localhost:8080/"
742 a.connInfo = &ConnInfo{hostname: "localhost", port: "8080", secure: false}
743 apiKey := "coolapikey"
744 a.ambassadorAPIKey = apiKey
745 a.ambassadorAPIKeyEnvVarValue = apiKey
746 a.agentCloudResourceConfigName = "bogusvalue"
747 snapshot := &snapshotTypes.Snapshot{
748 Invalid: []*kates.Unstructured{
749
750 getUnstructured(`
751 {
752 "kind":"WeirdKind",
753 "apiVersion":"v1",
754 "metadata": {
755 "name":"hi",
756 "namespace":"default"
757 },
758 "errors": "someerrors",
759 "wat":"dontshowthis"
760 }`),
761 },
762 Kubernetes: &snapshotTypes.KubernetesSnapshot{
763 Secrets: []*kates.Secret{
764 {
765 TypeMeta: metav1.TypeMeta{
766 Kind: "Secret",
767 APIVersion: "v1",
768 },
769 ObjectMeta: metav1.ObjectMeta{
770 Name: "secret-1",
771 Namespace: "ns",
772
773 Annotations: map[string]string{"also": "unset"},
774 },
775 Type: "Opaque",
776 Data: map[string][]byte{
777
778 "data1": []byte("d293YXNlY3JldA=="),
779 "data2": []byte("d293YW5vdGhlcm9uZQ=="),
780 },
781 },
782 },
783 },
784 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
785 AmbassadorID: ambId,
786 ClusterID: clusterID,
787 AmbassadorVersion: "v1.0",
788 },
789 }
790
791
792 var snapshotSentTime time.Time
793 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
794 enSnapshot, err := json.Marshal(&snapshot)
795 if !assert.NoError(t, err) {
796 return
797 }
798 _, err = w.Write(enSnapshot)
799 assert.NoError(t, err)
800 snapshotSentTime = time.Now()
801 }))
802 defer ts.Close()
803
804
805 client := &MockClient{}
806 c := &RPCComm{
807 conn: client,
808 client: client,
809 rptWake: make(chan struct{}, 1),
810 retCancel: cancel,
811 agentID: &id,
812 directives: make(chan *agent.Directive, 1),
813 }
814 a.comm = c
815
816 watchDone := make(chan error)
817 podAcc := &mockAccumulator{
818 changedChan: make(chan struct{}),
819 targetInterface: CoreSnapshot{
820 Pods: []*kates.Pod{
821 {
822 TypeMeta: metav1.TypeMeta{
823 Kind: "Pod",
824 APIVersion: "v1",
825 },
826 ObjectMeta: metav1.ObjectMeta{
827 Name: "some-pod",
828 Namespace: "default",
829 },
830 Status: v1.PodStatus{
831 Phase: v1.PodRunning,
832 },
833 },
834 },
835 Endpoints: []*kates.Endpoints{
836 {
837 TypeMeta: metav1.TypeMeta{
838 Kind: "Endpoints",
839 APIVersion: "v1",
840 },
841 ObjectMeta: metav1.ObjectMeta{
842 Name: "some-endpoint",
843 Namespace: "default",
844 },
845 },
846 },
847 Deployments: []*kates.Deployment{
848 {
849 TypeMeta: metav1.TypeMeta{
850 Kind: "Deployment",
851 APIVersion: "apps/v1",
852 },
853 ObjectMeta: metav1.ObjectMeta{
854 Name: "some-deployment",
855 Namespace: "default",
856 },
857 },
858 },
859 ConfigMaps: []*kates.ConfigMap{
860 {
861 TypeMeta: metav1.TypeMeta{
862 Kind: "ConfigMap",
863 APIVersion: "",
864 },
865 ObjectMeta: metav1.ObjectMeta{
866 Name: "some-config-map",
867 Namespace: "default",
868 },
869 },
870 },
871 },
872 }
873 configAcc := &mockAccumulator{
874 changedChan: make(chan struct{}),
875 }
876 rolloutCallback := make(chan *GenericCallback)
877 appCallback := make(chan *GenericCallback)
878
879
880 go func() {
881 err := a.watch(ctx, ts.URL, configAcc, podAcc, rolloutCallback, appCallback)
882 watchDone <- err
883 }()
884
885
886
887
888
889 reportsSent := 0
890 for reportsSent < 2 {
891 podAcc.changedChan <- struct{}{}
892 select {
893 case err := <-a.reportComplete:
894 assert.Nil(t, err)
895 reportsSent += 1
896 case err := <-watchDone:
897 t.Fatalf("Watch ended early with error %s", err.Error())
898 case <-time.After(10 * time.Second):
899 cancel()
900 t.Fatal("Timed out waiting for report to complete.")
901 }
902 }
903 cancel()
904
905
906 select {
907 case err := <-watchDone:
908
909 assert.Nil(t, err)
910 case <-time.After(10 * time.Second):
911 t.Fatal("Timed out waiting for watch to finish after cancelling context")
912 }
913 sentSnaps := client.GetSnapshots()
914
915
916 assert.NotNil(t, sentSnaps, "No snapshots sent")
917 assert.GreaterOrEqual(t, len(sentSnaps), 1, "Should have sent at least 1 snapshot")
918 lastMeta := client.GetLastMetadata()
919 assert.NotNil(t, lastMeta)
920 md := lastMeta.Get("x-ambassador-api-key")
921 assert.NotEmpty(t, md)
922 assert.Equal(t, md[0], apiKey)
923
924
925 sentSnapshot := sentSnaps[1]
926 var actualSnapshot snapshotTypes.Snapshot
927 err = json.Unmarshal(sentSnapshot.RawSnapshot, &actualSnapshot)
928 assert.Nil(t, err)
929
930
931 assert.Equal(t, len(actualSnapshot.Invalid), 1)
932 expectedInvalid := getUnstructured(`
933 {
934 "kind":"WeirdKind",
935 "apiVersion":"v1",
936 "metadata": {
937 "name":"hi",
938 "namespace":"default"
939 },
940 "errors":"someerrors"
941 }`)
942 assert.Equal(t, actualSnapshot.Invalid[0], expectedInvalid)
943
944
945 assert.NotNil(t, actualSnapshot.Kubernetes)
946 assert.Equal(t, len(actualSnapshot.Kubernetes.Secrets), 1)
947 assert.Equal(t, len(actualSnapshot.Kubernetes.Secrets[0].ObjectMeta.Annotations), 0)
948 assert.Equal(t, "secret-1", actualSnapshot.Kubernetes.Secrets[0].Name)
949 assert.Equal(t, "ns", actualSnapshot.Kubernetes.Secrets[0].Namespace)
950 secretData := actualSnapshot.Kubernetes.Secrets[0].Data
951 assert.NotEqual(t, []byte("d293YXNlY3JldA=="), secretData["data1"])
952 assert.NotEqual(t, []byte("d293YW5vdGhlcm9uZQ=="), secretData["data2"])
953
954
955 assert.Equal(t, len(actualSnapshot.Kubernetes.Endpoints), 1)
956 assert.Equal(t, len(actualSnapshot.Kubernetes.Pods), 1)
957 assert.Equal(t, len(actualSnapshot.Kubernetes.ConfigMaps), 1)
958 assert.Equal(t, len(actualSnapshot.Kubernetes.Deployments), 1)
959
960
961 assert.NotNil(t, sentSnapshot.SnapshotTs)
962 snapshotTime := sentSnapshot.SnapshotTs.AsTime()
963 assert.WithinDuration(t, snapshotSentTime, snapshotTime, 5*time.Second)
964
965
966 assert.Equal(t, snapshotTypes.ApiVersion, sentSnapshot.ApiVersion)
967 assert.Equal(t, snapshotTypes.ContentTypeJSON, sentSnapshot.ContentType)
968
969
970 actualIdentity := sentSnapshot.Identity
971 assert.NotNil(t, actualIdentity)
972 assert.Equal(t, "", actualIdentity.AccountId)
973 assert.NotNil(t, actualIdentity.Version)
974 assert.Equal(t, "", actualIdentity.Version)
975 assert.Equal(t, clusterID, actualIdentity.ClusterId)
976 parsedURL, err := url.Parse(ts.URL)
977 assert.Nil(t, err)
978 assert.Equal(t, actualIdentity.Hostname, parsedURL.Hostname())
979 }
980
981
982
983
984 func TestWatchEmptySnapshot(t *testing.T) {
985 t.Parallel()
986 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
987
988 a := NewAgent(nil, nil)
989 minReport, err := time.ParseDuration("1ms")
990 assert.Nil(t, err)
991 a.minReportPeriod = minReport
992 watchDone := make(chan error)
993
994 snapshotRequested := make(chan bool)
995 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
996 ambId := getRandomAmbassadorID()
997
998 snapshot := snapshotTypes.Snapshot{
999 AmbassadorMeta: &snapshotTypes.AmbassadorMetaInfo{
1000 AmbassadorID: ambId,
1001 },
1002 }
1003 enSnapshot, err := json.Marshal(&snapshot)
1004 if err != nil {
1005 t.Fatal("error marshalling snapshot")
1006 }
1007
1008 _, _ = w.Write(enSnapshot)
1009 select {
1010 case snapshotRequested <- true:
1011 default:
1012 }
1013 }))
1014 defer ts.Close()
1015 podAcc := &mockAccumulator{
1016 changedChan: make(chan struct{}),
1017 }
1018 configAcc := &mockAccumulator{
1019 changedChan: make(chan struct{}),
1020 }
1021 rolloutCallback := make(chan *GenericCallback)
1022 appCallback := make(chan *GenericCallback)
1023 go func() {
1024 err := a.watch(ctx, ts.URL, configAcc, podAcc, rolloutCallback, appCallback)
1025 watchDone <- err
1026 }()
1027 select {
1028 case <-snapshotRequested:
1029 cancel()
1030 case <-time.After(10 * time.Second):
1031 t.Fatalf("Timed out waiting for agent to request snapshot")
1032 cancel()
1033 }
1034
1035 select {
1036 case err := <-watchDone:
1037 assert.Nil(t, err)
1038 case <-time.After(5 * time.Second):
1039 t.Fatal("Watch did not end")
1040 }
1041 assert.False(t, a.reportRunning.Value())
1042 }
1043
View as plain text