...

Source file src/github.com/datawire/ambassador/v2/cmd/agent/agent_test.go

Documentation: github.com/datawire/ambassador/v2/cmd/agent

     1  package agent_test
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"io/ioutil"
    10  	"os"
    11  	"path/filepath"
    12  	"strings"
    13  	"testing"
    14  	"time"
    15  
    16  	"github.com/stretchr/testify/assert"
    17  	"github.com/stretchr/testify/require"
    18  
    19  	"github.com/datawire/ambassador/v2/pkg/api/agent"
    20  	"github.com/datawire/ambassador/v2/pkg/dtest"
    21  	"github.com/datawire/ambassador/v2/pkg/k8s"
    22  	"github.com/datawire/ambassador/v2/pkg/kates"
    23  	"github.com/datawire/ambassador/v2/pkg/kubeapply"
    24  	snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    25  	"github.com/datawire/dlib/dexec"
    26  	"github.com/datawire/dlib/dlog"
    27  )
    28  
    29  // This test is supposed to be a very lightweight end to end test.
    30  // We're essentially testing that the k8s yaml configuration allows the agent to report on all the
    31  // things the cloud app needs. We do this with dtest, which spins up a k3ds cluster by default, or
    32  // you can point it at your own cluster by running `go test` with env var `DTEST_KUBECONFIG=$HOME/.kube/config`
    33  // More complicated business logic tests live in ambassador.git/pkg/agent
    34  func TestAgentE2E(t *testing.T) {
    35  	ctx := dlog.NewTestContext(t, false)
    36  	kubeconfig := dtest.KubeVersionConfig(ctx, dtest.Kube22)
    37  	cli, err := kates.NewClient(kates.ClientConfig{Kubeconfig: kubeconfig})
    38  	require.NoError(t, err)
    39  	// applies all k8s yaml to dtest cluter
    40  	// ambassador, ambassador-agent, rbac, crds, and a fake agentcom that implements the grpc
    41  	// server for the agent
    42  	setup(t, ctx, kubeconfig, cli)
    43  
    44  	// eh lets make sure the agent came up
    45  	time.Sleep(time.Second * 3)
    46  
    47  	defer deleteArgoResources(t, ctx, kubeconfig)
    48  	hasArgo := false
    49  	reportSnapshot, ambSnapshot := getAgentComSnapshots(t, ctx, kubeconfig, cli, hasArgo)
    50  
    51  	// Do actual assertions here. kind of lazy way to retry, but it should work
    52  	assert.NotEmpty(t, reportSnapshot.Identity.ClusterId)
    53  	assert.NotEmpty(t, reportSnapshot.Identity.Hostname)
    54  	assert.NotEmpty(t, reportSnapshot.RawSnapshot)
    55  	assert.NotEmpty(t, reportSnapshot.ApiVersion)
    56  	assert.NotEmpty(t, reportSnapshot.SnapshotTs)
    57  	assert.Equal(t, reportSnapshot.ApiVersion, snapshotTypes.ApiVersion)
    58  
    59  	assert.NotEmpty(t, ambSnapshot.Kubernetes)
    60  
    61  	// just make sure the stuff we really need for the service catalog is in there
    62  	assert.NotEmpty(t, ambSnapshot.Kubernetes.Services, "No services in snapshot")
    63  	assert.NotEmpty(t, ambSnapshot.Kubernetes.Mappings, "No mappings in snapshot")
    64  
    65  	// pods not being empty basically ensures that the rbac in the yaml is correct
    66  	assert.NotEmpty(t, ambSnapshot.Kubernetes.Pods, "No pods found in snapshot")
    67  	assert.Empty(t, ambSnapshot.Kubernetes.ArgoRollouts, "rollouts found in snapshot")
    68  	assert.Empty(t, ambSnapshot.Kubernetes.ArgoApplications, "applications found in snapshot")
    69  
    70  	applyArgoResources(t, ctx, kubeconfig, cli)
    71  	hasArgo = true
    72  	reportSnapshot, ambSnapshot = getAgentComSnapshots(t, ctx, kubeconfig, cli, hasArgo)
    73  	assert.NotEmpty(t, ambSnapshot.Kubernetes.ArgoRollouts, "No argo rollouts found in snapshot")
    74  	assert.NotEmpty(t, ambSnapshot.Kubernetes.ArgoApplications, "No argo applications found in snapshot")
    75  }
    76  
    77  func getAgentComSnapshots(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client, waitArgo bool) (*agent.Snapshot, *snapshotTypes.Snapshot) {
    78  	found := false
    79  	reportSnapshot := &agent.Snapshot{}
    80  	ambSnapshot := &snapshotTypes.Snapshot{}
    81  
    82  	// now we're going to go copy the snapshot.json file from our fake agentcom
    83  	// when the agentcom gets a snapshot from the agent, it'll store it at /tmp/snapshot.json
    84  	// we do this in a loop because it might take ambassador and the agent a sec to get into the
    85  	// state we're asserting. this is okay, this test is just to make sure that the agent RBAC
    86  	// is correct and that the agent can talk to the ambassador-agent service.
    87  	// any tests that do any more complicated assertions should live in ambassador.git/pkg/agent
    88  	for i := 0; i < 15; i++ {
    89  		podName, err := getFakeAgentComPodName(ctx, cli)
    90  		assert.NoError(t, err)
    91  
    92  		localSnapshot := fmt.Sprintf("%s/snapshot.json", t.TempDir())
    93  		time.Sleep(time.Second * time.Duration(i))
    94  		if err := dexec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeconfig, "cp", podName+":/tmp/snapshot.json", localSnapshot).Run(); err != nil {
    95  			t.Logf("Error running kubectl cp: %+v", err)
    96  			continue
    97  		}
    98  		if _, err := os.Stat(localSnapshot); os.IsNotExist(err) {
    99  			t.Log("Could not copy file from agentcom, retrying...")
   100  			continue
   101  		}
   102  		snapbytes, err := ioutil.ReadFile(localSnapshot)
   103  		if err != nil {
   104  			t.Logf("Error reading snapshot file: %+v", err)
   105  			continue
   106  		}
   107  		found = true
   108  
   109  		err = json.Unmarshal(snapbytes, reportSnapshot)
   110  		if err != nil {
   111  			t.Fatal("Could not unmarshal report snapshot")
   112  		}
   113  
   114  		err = json.Unmarshal(reportSnapshot.RawSnapshot, ambSnapshot)
   115  		if err != nil {
   116  			t.Fatal("Could not unmarshal ambassador snapshot")
   117  		}
   118  		if !snapshotIsSane(ambSnapshot, t, waitArgo) {
   119  			continue
   120  		}
   121  		break
   122  	}
   123  	require.True(t, found, "Could not cp file from agentcom")
   124  	return reportSnapshot, ambSnapshot
   125  }
   126  
   127  func snapshotIsSane(ambSnapshot *snapshotTypes.Snapshot, t *testing.T, hasArgo bool) bool {
   128  	if ambSnapshot.Kubernetes == nil {
   129  		t.Log("K8s snapshot empty, retrying")
   130  		return false
   131  	}
   132  	if len(ambSnapshot.Kubernetes.Services) == 0 {
   133  		t.Log("K8s snapshot services empty, retrying")
   134  		return false
   135  	}
   136  	if len(ambSnapshot.Kubernetes.Mappings) == 0 {
   137  		t.Log("K8s snapshot mappings empty, retrying")
   138  		return false
   139  	}
   140  	if len(ambSnapshot.Kubernetes.Pods) == 0 {
   141  		t.Log("K8s snapshot pods empty, retrying")
   142  		return false
   143  	}
   144  	if hasArgo && len(ambSnapshot.Kubernetes.ArgoRollouts) == 0 {
   145  		t.Log("K8s snapshot argo rollouts empty, retrying")
   146  		return false
   147  	}
   148  	if hasArgo && len(ambSnapshot.Kubernetes.ArgoApplications) == 0 {
   149  		t.Log("K8s snapshot argo applications empty, retrying")
   150  		return false
   151  	}
   152  	if !hasArgo && len(ambSnapshot.Kubernetes.ArgoRollouts) != 0 {
   153  		t.Log("K8s snapshot argo rollouts should be empty, retrying")
   154  		return false
   155  	}
   156  	if !hasArgo && len(ambSnapshot.Kubernetes.ArgoApplications) != 0 {
   157  		t.Log("K8s snapshot argo applications should be empty, retrying")
   158  		return false
   159  	}
   160  
   161  	return true
   162  }
   163  func applyArgoResources(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client) {
   164  	kubeinfo := k8s.NewKubeInfo(kubeconfig, "", "")
   165  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-rollouts-crd.yaml"))
   166  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-application-crd.yaml"))
   167  	time.Sleep(3 * time.Second)
   168  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-rollouts.yaml"))
   169  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-application.yaml"))
   170  }
   171  
   172  func needsDockerBuilds(ctx context.Context, var2file map[string]string) error {
   173  	var targets []string
   174  	for varname, filename := range var2file {
   175  		if os.Getenv(varname) == "" {
   176  			targets = append(targets, filename)
   177  		}
   178  	}
   179  	if len(targets) == 0 {
   180  		return nil
   181  	}
   182  	if os.Getenv("DEV_REGISTRY") == "" {
   183  		registry := dtest.DockerRegistry(ctx)
   184  		os.Setenv("DEV_REGISTRY", registry)
   185  		os.Setenv("DTEST_REGISTRY", registry)
   186  	}
   187  	cmdline := append([]string{"make", "-C", "../.."}, targets...)
   188  	if err := dexec.CommandContext(ctx, cmdline[0], cmdline[1:]...).Run(); err != nil {
   189  		return err
   190  	}
   191  	for varname, filename := range var2file {
   192  		if os.Getenv(varname) == "" {
   193  			dat, err := ioutil.ReadFile(filepath.Join("../..", filename))
   194  			if err != nil {
   195  				return err
   196  			}
   197  			lines := strings.Split(strings.TrimSpace(string(dat)), "\n")
   198  			if len(lines) < 2 {
   199  				return fmt.Errorf("malformed docker.mk tagfile %q", filename)
   200  			}
   201  			if err := os.Setenv(varname, lines[1]); err != nil {
   202  				return err
   203  			}
   204  		}
   205  	}
   206  	return nil
   207  }
   208  
   209  func yamlFilename(t *testing.T, inFilename, image string) string {
   210  	dat, err := ioutil.ReadFile(inFilename)
   211  	require.NoError(t, err)
   212  	dat = bytes.ReplaceAll(dat, []byte("$imageRepo$:$version$"), []byte(image))
   213  	outFilename := filepath.Join(t.TempDir(), strings.TrimSuffix(filepath.Base(inFilename), ".in"))
   214  	require.NoError(t, ioutil.WriteFile(outFilename, dat, 0644))
   215  	return outFilename
   216  }
   217  
   218  func setup(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client) {
   219  	require.NoError(t, needsDockerBuilds(ctx, map[string]string{
   220  		"AMBASSADOR_DOCKER_IMAGE": "docker/emissary.docker.push.remote",
   221  		"KAT_SERVER_DOCKER_IMAGE": "docker/kat-server.docker.push.remote",
   222  	}))
   223  
   224  	image := os.Getenv("AMBASSADOR_DOCKER_IMAGE")
   225  	require.NotEmpty(t, image)
   226  
   227  	crdFile := yamlFilename(t, "../../manifests/emissary/emissary-crds.yaml.in", image)
   228  	aesFile := yamlFilename(t, "../../manifests/emissary/emissary-emissaryns.yaml.in", image)
   229  
   230  	kubeinfo := k8s.NewKubeInfo(kubeconfig, "", "")
   231  
   232  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, crdFile))
   233  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/namespace.yaml"))
   234  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, 2*time.Minute, true, false, aesFile))
   235  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, 2*time.Minute, true, false, "./testdata/fake-agentcom.yaml"))
   236  
   237  	dep := &kates.Deployment{
   238  		TypeMeta: kates.TypeMeta{
   239  			Kind: "Deployment",
   240  		},
   241  		ObjectMeta: kates.ObjectMeta{
   242  			Name:      "emissary-ingress-agent",
   243  			Namespace: "emissary",
   244  		},
   245  	}
   246  
   247  	patch, err := json.Marshal(map[string]interface{}{
   248  		"spec": map[string]interface{}{
   249  			"template": map[string]interface{}{
   250  				"spec": map[string]interface{}{
   251  					"containers": []interface{}{
   252  						map[string]interface{}{
   253  							"env": []interface{}{
   254  								map[string]interface{}{
   255  									"name":  "RPC_CONNECTION_ADDRESS",
   256  									"value": "http://agentcom-server.default:8080/",
   257  								},
   258  							},
   259  							"name": "agent",
   260  						},
   261  					},
   262  				},
   263  			},
   264  		},
   265  	})
   266  	require.NoError(t, err)
   267  	require.NoError(t, cli.Patch(ctx, dep, kates.StrategicMergePatchType, []byte(patch), dep))
   268  
   269  	time.Sleep(3 * time.Second)
   270  	require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/sample-config.yaml"))
   271  }
   272  
   273  func deleteArgoResources(t *testing.T, ctx context.Context, kubeconfig string) {
   274  	// cleaning up argo crds so the e2e test can be deterministic
   275  	cmd := dexec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeconfig, "delete", "crd", "--ignore-not-found=true", "rollouts.argoproj.io")
   276  	out, err := cmd.CombinedOutput()
   277  	t.Log(fmt.Sprintf("Kubectl delete crd rollouts output: %s", out))
   278  	if err != nil {
   279  		t.Errorf("Error running kubectl delete crd rollouts: %s", err)
   280  	}
   281  	cmd = dexec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeconfig, "delete", "crd", "--ignore-not-found=true", "applications.argoproj.io")
   282  	out, err = cmd.CombinedOutput()
   283  	t.Log(fmt.Sprintf("Kubectl delete crd applications output: %s", out))
   284  	if err != nil {
   285  		t.Errorf("Error running kubectl delete crd applications: %s", err)
   286  	}
   287  }
   288  
   289  func getFakeAgentComPodName(ctx context.Context, cli *kates.Client) (string, error) {
   290  	query := kates.Query{
   291  		Kind:          "Pod",
   292  		LabelSelector: "app=agentcom-server",
   293  		Namespace:     "default",
   294  	}
   295  	pods := []*kates.Pod{}
   296  	err := cli.List(ctx, query, &pods)
   297  	if err != nil {
   298  		return "", err
   299  	}
   300  	if len(pods) < 1 {
   301  		return "", errors.New("No pods found with label app=agentcom-server")
   302  	}
   303  	return pods[0].ObjectMeta.Name, nil
   304  }
   305  

View as plain text