1 package agent_test
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io/ioutil"
10 "os"
11 "path/filepath"
12 "strings"
13 "testing"
14 "time"
15
16 "github.com/stretchr/testify/assert"
17 "github.com/stretchr/testify/require"
18
19 "github.com/datawire/ambassador/v2/pkg/api/agent"
20 "github.com/datawire/ambassador/v2/pkg/dtest"
21 "github.com/datawire/ambassador/v2/pkg/k8s"
22 "github.com/datawire/ambassador/v2/pkg/kates"
23 "github.com/datawire/ambassador/v2/pkg/kubeapply"
24 snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
25 "github.com/datawire/dlib/dexec"
26 "github.com/datawire/dlib/dlog"
27 )
28
29
30
31
32
33
34 func TestAgentE2E(t *testing.T) {
35 ctx := dlog.NewTestContext(t, false)
36 kubeconfig := dtest.KubeVersionConfig(ctx, dtest.Kube22)
37 cli, err := kates.NewClient(kates.ClientConfig{Kubeconfig: kubeconfig})
38 require.NoError(t, err)
39
40
41
42 setup(t, ctx, kubeconfig, cli)
43
44
45 time.Sleep(time.Second * 3)
46
47 defer deleteArgoResources(t, ctx, kubeconfig)
48 hasArgo := false
49 reportSnapshot, ambSnapshot := getAgentComSnapshots(t, ctx, kubeconfig, cli, hasArgo)
50
51
52 assert.NotEmpty(t, reportSnapshot.Identity.ClusterId)
53 assert.NotEmpty(t, reportSnapshot.Identity.Hostname)
54 assert.NotEmpty(t, reportSnapshot.RawSnapshot)
55 assert.NotEmpty(t, reportSnapshot.ApiVersion)
56 assert.NotEmpty(t, reportSnapshot.SnapshotTs)
57 assert.Equal(t, reportSnapshot.ApiVersion, snapshotTypes.ApiVersion)
58
59 assert.NotEmpty(t, ambSnapshot.Kubernetes)
60
61
62 assert.NotEmpty(t, ambSnapshot.Kubernetes.Services, "No services in snapshot")
63 assert.NotEmpty(t, ambSnapshot.Kubernetes.Mappings, "No mappings in snapshot")
64
65
66 assert.NotEmpty(t, ambSnapshot.Kubernetes.Pods, "No pods found in snapshot")
67 assert.Empty(t, ambSnapshot.Kubernetes.ArgoRollouts, "rollouts found in snapshot")
68 assert.Empty(t, ambSnapshot.Kubernetes.ArgoApplications, "applications found in snapshot")
69
70 applyArgoResources(t, ctx, kubeconfig, cli)
71 hasArgo = true
72 reportSnapshot, ambSnapshot = getAgentComSnapshots(t, ctx, kubeconfig, cli, hasArgo)
73 assert.NotEmpty(t, ambSnapshot.Kubernetes.ArgoRollouts, "No argo rollouts found in snapshot")
74 assert.NotEmpty(t, ambSnapshot.Kubernetes.ArgoApplications, "No argo applications found in snapshot")
75 }
76
77 func getAgentComSnapshots(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client, waitArgo bool) (*agent.Snapshot, *snapshotTypes.Snapshot) {
78 found := false
79 reportSnapshot := &agent.Snapshot{}
80 ambSnapshot := &snapshotTypes.Snapshot{}
81
82
83
84
85
86
87
88 for i := 0; i < 15; i++ {
89 podName, err := getFakeAgentComPodName(ctx, cli)
90 assert.NoError(t, err)
91
92 localSnapshot := fmt.Sprintf("%s/snapshot.json", t.TempDir())
93 time.Sleep(time.Second * time.Duration(i))
94 if err := dexec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeconfig, "cp", podName+":/tmp/snapshot.json", localSnapshot).Run(); err != nil {
95 t.Logf("Error running kubectl cp: %+v", err)
96 continue
97 }
98 if _, err := os.Stat(localSnapshot); os.IsNotExist(err) {
99 t.Log("Could not copy file from agentcom, retrying...")
100 continue
101 }
102 snapbytes, err := ioutil.ReadFile(localSnapshot)
103 if err != nil {
104 t.Logf("Error reading snapshot file: %+v", err)
105 continue
106 }
107 found = true
108
109 err = json.Unmarshal(snapbytes, reportSnapshot)
110 if err != nil {
111 t.Fatal("Could not unmarshal report snapshot")
112 }
113
114 err = json.Unmarshal(reportSnapshot.RawSnapshot, ambSnapshot)
115 if err != nil {
116 t.Fatal("Could not unmarshal ambassador snapshot")
117 }
118 if !snapshotIsSane(ambSnapshot, t, waitArgo) {
119 continue
120 }
121 break
122 }
123 require.True(t, found, "Could not cp file from agentcom")
124 return reportSnapshot, ambSnapshot
125 }
126
127 func snapshotIsSane(ambSnapshot *snapshotTypes.Snapshot, t *testing.T, hasArgo bool) bool {
128 if ambSnapshot.Kubernetes == nil {
129 t.Log("K8s snapshot empty, retrying")
130 return false
131 }
132 if len(ambSnapshot.Kubernetes.Services) == 0 {
133 t.Log("K8s snapshot services empty, retrying")
134 return false
135 }
136 if len(ambSnapshot.Kubernetes.Mappings) == 0 {
137 t.Log("K8s snapshot mappings empty, retrying")
138 return false
139 }
140 if len(ambSnapshot.Kubernetes.Pods) == 0 {
141 t.Log("K8s snapshot pods empty, retrying")
142 return false
143 }
144 if hasArgo && len(ambSnapshot.Kubernetes.ArgoRollouts) == 0 {
145 t.Log("K8s snapshot argo rollouts empty, retrying")
146 return false
147 }
148 if hasArgo && len(ambSnapshot.Kubernetes.ArgoApplications) == 0 {
149 t.Log("K8s snapshot argo applications empty, retrying")
150 return false
151 }
152 if !hasArgo && len(ambSnapshot.Kubernetes.ArgoRollouts) != 0 {
153 t.Log("K8s snapshot argo rollouts should be empty, retrying")
154 return false
155 }
156 if !hasArgo && len(ambSnapshot.Kubernetes.ArgoApplications) != 0 {
157 t.Log("K8s snapshot argo applications should be empty, retrying")
158 return false
159 }
160
161 return true
162 }
163 func applyArgoResources(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client) {
164 kubeinfo := k8s.NewKubeInfo(kubeconfig, "", "")
165 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-rollouts-crd.yaml"))
166 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-application-crd.yaml"))
167 time.Sleep(3 * time.Second)
168 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-rollouts.yaml"))
169 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/argo-application.yaml"))
170 }
171
172 func needsDockerBuilds(ctx context.Context, var2file map[string]string) error {
173 var targets []string
174 for varname, filename := range var2file {
175 if os.Getenv(varname) == "" {
176 targets = append(targets, filename)
177 }
178 }
179 if len(targets) == 0 {
180 return nil
181 }
182 if os.Getenv("DEV_REGISTRY") == "" {
183 registry := dtest.DockerRegistry(ctx)
184 os.Setenv("DEV_REGISTRY", registry)
185 os.Setenv("DTEST_REGISTRY", registry)
186 }
187 cmdline := append([]string{"make", "-C", "../.."}, targets...)
188 if err := dexec.CommandContext(ctx, cmdline[0], cmdline[1:]...).Run(); err != nil {
189 return err
190 }
191 for varname, filename := range var2file {
192 if os.Getenv(varname) == "" {
193 dat, err := ioutil.ReadFile(filepath.Join("../..", filename))
194 if err != nil {
195 return err
196 }
197 lines := strings.Split(strings.TrimSpace(string(dat)), "\n")
198 if len(lines) < 2 {
199 return fmt.Errorf("malformed docker.mk tagfile %q", filename)
200 }
201 if err := os.Setenv(varname, lines[1]); err != nil {
202 return err
203 }
204 }
205 }
206 return nil
207 }
208
209 func yamlFilename(t *testing.T, inFilename, image string) string {
210 dat, err := ioutil.ReadFile(inFilename)
211 require.NoError(t, err)
212 dat = bytes.ReplaceAll(dat, []byte("$imageRepo$:$version$"), []byte(image))
213 outFilename := filepath.Join(t.TempDir(), strings.TrimSuffix(filepath.Base(inFilename), ".in"))
214 require.NoError(t, ioutil.WriteFile(outFilename, dat, 0644))
215 return outFilename
216 }
217
218 func setup(t *testing.T, ctx context.Context, kubeconfig string, cli *kates.Client) {
219 require.NoError(t, needsDockerBuilds(ctx, map[string]string{
220 "AMBASSADOR_DOCKER_IMAGE": "docker/emissary.docker.push.remote",
221 "KAT_SERVER_DOCKER_IMAGE": "docker/kat-server.docker.push.remote",
222 }))
223
224 image := os.Getenv("AMBASSADOR_DOCKER_IMAGE")
225 require.NotEmpty(t, image)
226
227 crdFile := yamlFilename(t, "../../manifests/emissary/emissary-crds.yaml.in", image)
228 aesFile := yamlFilename(t, "../../manifests/emissary/emissary-emissaryns.yaml.in", image)
229
230 kubeinfo := k8s.NewKubeInfo(kubeconfig, "", "")
231
232 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, crdFile))
233 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/namespace.yaml"))
234 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, 2*time.Minute, true, false, aesFile))
235 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, 2*time.Minute, true, false, "./testdata/fake-agentcom.yaml"))
236
237 dep := &kates.Deployment{
238 TypeMeta: kates.TypeMeta{
239 Kind: "Deployment",
240 },
241 ObjectMeta: kates.ObjectMeta{
242 Name: "emissary-ingress-agent",
243 Namespace: "emissary",
244 },
245 }
246
247 patch, err := json.Marshal(map[string]interface{}{
248 "spec": map[string]interface{}{
249 "template": map[string]interface{}{
250 "spec": map[string]interface{}{
251 "containers": []interface{}{
252 map[string]interface{}{
253 "env": []interface{}{
254 map[string]interface{}{
255 "name": "RPC_CONNECTION_ADDRESS",
256 "value": "http://agentcom-server.default:8080/",
257 },
258 },
259 "name": "agent",
260 },
261 },
262 },
263 },
264 },
265 })
266 require.NoError(t, err)
267 require.NoError(t, cli.Patch(ctx, dep, kates.StrategicMergePatchType, []byte(patch), dep))
268
269 time.Sleep(3 * time.Second)
270 require.NoError(t, kubeapply.Kubeapply(ctx, kubeinfo, time.Minute, true, false, "./testdata/sample-config.yaml"))
271 }
272
273 func deleteArgoResources(t *testing.T, ctx context.Context, kubeconfig string) {
274
275 cmd := dexec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeconfig, "delete", "crd", "--ignore-not-found=true", "rollouts.argoproj.io")
276 out, err := cmd.CombinedOutput()
277 t.Log(fmt.Sprintf("Kubectl delete crd rollouts output: %s", out))
278 if err != nil {
279 t.Errorf("Error running kubectl delete crd rollouts: %s", err)
280 }
281 cmd = dexec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeconfig, "delete", "crd", "--ignore-not-found=true", "applications.argoproj.io")
282 out, err = cmd.CombinedOutput()
283 t.Log(fmt.Sprintf("Kubectl delete crd applications output: %s", out))
284 if err != nil {
285 t.Errorf("Error running kubectl delete crd applications: %s", err)
286 }
287 }
288
289 func getFakeAgentComPodName(ctx context.Context, cli *kates.Client) (string, error) {
290 query := kates.Query{
291 Kind: "Pod",
292 LabelSelector: "app=agentcom-server",
293 Namespace: "default",
294 }
295 pods := []*kates.Pod{}
296 err := cli.List(ctx, query, &pods)
297 if err != nil {
298 return "", err
299 }
300 if len(pods) < 1 {
301 return "", errors.New("No pods found with label app=agentcom-server")
302 }
303 return pods[0].ObjectMeta.Name, nil
304 }
305
View as plain text