1 package testutil
2
3 import (
4 "bytes"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "errors"
9 "flag"
10 "fmt"
11 "io"
12 "net/http"
13 "os"
14 "os/exec"
15 "path/filepath"
16 "runtime"
17 "strings"
18 "testing"
19 "time"
20
21 log "github.com/sirupsen/logrus"
22 corev1 "k8s.io/api/core/v1"
23 )
24
25
26 type TestHelper struct {
27 linkerd string
28 version string
29 namespace string
30 vizNamespace string
31 upgradeFromVersion string
32 clusterDomain string
33 externalIssuer bool
34 externalPrometheus bool
35 multicluster bool
36 multiclusterSrcCtx string
37 multiclusterTgtCtx string
38 uninstall bool
39 cni bool
40 calico bool
41 dualStack bool
42 nativeSidecar bool
43 defaultInboundPolicy string
44 httpClient http.Client
45 KubernetesHelper
46 helm
47 installedExtensions []string
48 }
49
50 type helm struct {
51 path string
52 charts string
53 multiclusterChart string
54 vizChart string
55 vizStableChart string
56 releaseName string
57 multiclusterReleaseName string
58 upgradeFromVersion string
59 }
60
61
62 type DeploySpec struct {
63 Namespace string
64 Replicas int
65 }
66
67
68 type Service struct {
69 Namespace string
70 Name string
71 }
72
73
74
75 var LinkerdDeployReplicasEdge = map[string]DeploySpec{
76 "linkerd-destination": {"linkerd", 1},
77 "linkerd-identity": {"linkerd", 1},
78 "linkerd-proxy-injector": {"linkerd", 1},
79 }
80
81
82
83
84 var LinkerdDeployReplicasStable = LinkerdDeployReplicasEdge
85
86
87
88
89 var LinkerdVizDeployReplicas = map[string]DeploySpec{
90 "prometheus": {"linkerd-viz", 1},
91 "metrics-api": {"linkerd-viz", 1},
92 "tap": {"linkerd-viz", 1},
93 "tap-injector": {"linkerd-viz", 1},
94 "web": {"linkerd-viz", 1},
95 }
96
97
98
99 var MulticlusterDeployReplicas = map[string]DeploySpec{
100 "linkerd-gateway": {"linkerd-multicluster", 1},
101 }
102
103
104
105
106 var MulticlusterSourceReplicas = map[string]DeploySpec{
107 "linkerd-service-mirror-target": {Namespace: "linkerd-multicluster", Replicas: 1},
108 }
109
110
111
112 var ExternalVizDeployReplicas = map[string]DeploySpec{
113 "prometheus": {"external-prometheus", 1},
114 "metrics-api": {"linkerd-viz", 1},
115 "tap": {"linkerd-viz", 1},
116 "tap-injector": {"linkerd-viz", 1},
117 "web": {"linkerd-viz", 1},
118 }
119
120
121
122 var SourceContextKey = "source"
123
124
125
126 var TargetContextKey = "target"
127
128
129
130
131
132 func NewGenericTestHelper(
133 linkerd,
134 version,
135 namespace,
136 vizNamespace,
137 upgradeFromVersion,
138 clusterDomain,
139 helmPath,
140 helmCharts,
141 helmReleaseName,
142 helmMulticlusterReleaseName,
143 helmMulticlusterChart string,
144 externalIssuer,
145 externalPrometheus,
146 multicluster,
147 cni,
148 calico,
149 uninstall bool,
150 httpClient http.Client,
151 kubernetesHelper KubernetesHelper,
152 ) *TestHelper {
153 return &TestHelper{
154 linkerd: linkerd,
155 version: version,
156 namespace: namespace,
157 vizNamespace: vizNamespace,
158 upgradeFromVersion: upgradeFromVersion,
159 helm: helm{
160 path: helmPath,
161 charts: helmCharts,
162 multiclusterChart: helmMulticlusterChart,
163 multiclusterReleaseName: helmMulticlusterReleaseName,
164 releaseName: helmReleaseName,
165 upgradeFromVersion: upgradeFromVersion,
166 },
167 clusterDomain: clusterDomain,
168 externalIssuer: externalIssuer,
169 externalPrometheus: externalPrometheus,
170 uninstall: uninstall,
171 cni: cni,
172 calico: calico,
173 httpClient: httpClient,
174 multicluster: multicluster,
175 KubernetesHelper: kubernetesHelper,
176 }
177 }
178
179
180
181 func NewTestHelper() *TestHelper {
182 exit := func(code int, msg string) {
183 fmt.Fprintln(os.Stderr, msg)
184 os.Exit(code)
185 }
186
187
188 k8sContext := flag.String("k8s-context", "", "kubernetes context associated with the test cluster")
189 linkerdExec := flag.String("linkerd", "", "path to the linkerd binary to test")
190 namespace := flag.String("linkerd-namespace", "linkerd", "the namespace where linkerd is installed")
191 vizNamespace := flag.String("viz-namespace", "linkerd-viz", "the namespace where linkerd viz extension is installed")
192 multicluster := flag.Bool("multicluster", false, "when specified the multicluster install functionality is tested")
193 multiclusterSourceCtx := flag.String("multicluster-source-context", "k3d-source", "the context belonging to source cluster in multicluster test")
194 multiclusterTargetCtx := flag.String("multicluster-target-context", "k3d-target", "the context belonging to target cluster in multicluster test")
195 helmPath := flag.String("helm-path", "target/helm", "path of the Helm binary")
196 helmCharts := flag.String("helm-charts", "charts/linkerd2", "path to linkerd2's Helm charts")
197 multiclusterHelmChart := flag.String("multicluster-helm-chart", "charts/linkerd-multicluster", "path to linkerd2's multicluster Helm chart")
198 vizHelmChart := flag.String("viz-helm-chart", "charts/linkerd-viz", "path to linkerd2's viz extension Helm chart")
199 vizHelmStableChart := flag.String("viz-helm-stable-chart", "charts/linkerd-viz", "path to linkerd2's viz extension stable Helm chart")
200 helmReleaseName := flag.String("helm-release", "", "install linkerd via Helm using this release name")
201 multiclusterHelmReleaseName := flag.String("multicluster-helm-release", "", "install linkerd multicluster via Helm using this release name")
202 upgradeFromVersion := flag.String("upgrade-from-version", "", "when specified, the upgrade test uses it as the base version of the upgrade")
203 clusterDomain := flag.String("cluster-domain", "cluster.local", "when specified, the install test uses a custom cluster domain")
204 externalIssuer := flag.Bool("external-issuer", false, "when specified, the install test uses it to install linkerd with --identity-external-issuer=true")
205 externalPrometheus := flag.Bool("external-prometheus", false, "when specified, the install test uses an external prometheus")
206 runTests := flag.Bool("integration-tests", false, "must be provided to run the integration tests")
207 verbose := flag.Bool("verbose", false, "turn on debug logging")
208 upgradeHelmFromVersion := flag.String("upgrade-helm-from-version", "", "Indicate a version of the Linkerd helm chart from which the helm installation is being upgraded")
209 uninstall := flag.Bool("uninstall", false, "whether to run the 'linkerd uninstall' integration test")
210 cni := flag.Bool("cni", false, "whether to install linkerd with CNI enabled")
211 calico := flag.Bool("calico", false, "whether to install calico CNI plugin")
212 dualStack := flag.Bool("dual-stack", false, "whether to run the dual-stack tests")
213 nativeSidecar := flag.Bool("native-sidecar", false, "whether to install using native sidecar injection")
214 defaultInboundPolicy := flag.String("default-inbound-policy", "", "if non-empty, passed to --set proxy.defaultInboundPolicy at linkerd's install time")
215 flag.Parse()
216
217 if !*runTests {
218 exit(0, "integration tests not enabled: enable with -integration-tests")
219 }
220
221 if *linkerdExec == "" {
222 exit(1, "-linkerd flag is required")
223 }
224
225 linkerd, err := filepath.Abs(*linkerdExec)
226 if err != nil {
227 exit(1, fmt.Sprintf("abs: %s", err))
228 }
229
230 if *verbose {
231 log.SetLevel(log.DebugLevel)
232 } else {
233 log.SetLevel(log.PanicLevel)
234 }
235
236 testHelper := &TestHelper{
237 linkerd: linkerd,
238 namespace: *namespace,
239 vizNamespace: *vizNamespace,
240 upgradeFromVersion: *upgradeFromVersion,
241 multicluster: *multicluster,
242 multiclusterSrcCtx: *multiclusterSourceCtx,
243 multiclusterTgtCtx: *multiclusterTargetCtx,
244 helm: helm{
245 path: *helmPath,
246 charts: *helmCharts,
247 multiclusterChart: *multiclusterHelmChart,
248 vizChart: *vizHelmChart,
249 vizStableChart: *vizHelmStableChart,
250 releaseName: *helmReleaseName,
251 multiclusterReleaseName: *multiclusterHelmReleaseName,
252 upgradeFromVersion: *upgradeHelmFromVersion,
253 },
254 clusterDomain: *clusterDomain,
255 externalIssuer: *externalIssuer,
256 externalPrometheus: *externalPrometheus,
257 cni: *cni,
258 calico: *calico,
259 dualStack: *dualStack,
260 nativeSidecar: *nativeSidecar,
261 uninstall: *uninstall,
262 defaultInboundPolicy: *defaultInboundPolicy,
263 }
264
265 version, err := testHelper.LinkerdRun("version", "--client", "--short")
266 if err != nil {
267 exit(1, fmt.Sprintf("error getting linkerd version: %s", err.Error()))
268 }
269 testHelper.version = strings.TrimSpace(version)
270
271 kubernetesHelper, err := NewKubernetesHelper(*k8sContext, RetryFor)
272 if err != nil {
273 exit(1, fmt.Sprintf("error creating kubernetes helper: %s", err.Error()))
274 }
275 testHelper.KubernetesHelper = *kubernetesHelper
276
277 testHelper.httpClient = http.Client{
278 Timeout: 10 * time.Second,
279 }
280
281 return testHelper
282 }
283
284
285
286
287 func (h *TestHelper) GetVersion() string {
288 return h.version
289 }
290
291
292
293 func (h *TestHelper) GetLinkerdNamespace() string {
294 return h.namespace
295 }
296
297
298
299 func (h *TestHelper) GetVizNamespace() string {
300 return h.vizNamespace
301 }
302
303
304
305 func (h *TestHelper) GetMulticlusterNamespace() string {
306 return fmt.Sprintf("%s-multicluster", h.GetLinkerdNamespace())
307 }
308
309
310
311 func (h *TestHelper) GetMulticlusterContexts() map[string]string {
312 return map[string]string{
313 "source": h.multiclusterSrcCtx,
314 "target": h.multiclusterTgtCtx,
315 }
316 }
317
318
319
320 func (h *TestHelper) GetTestNamespace(testName string) string {
321 return h.namespace + "-" + testName
322 }
323
324
325 func (h *TestHelper) GetHelmReleaseName() string {
326 return h.helm.releaseName
327 }
328
329
330 func (h *TestHelper) GetMulticlusterHelmReleaseName() string {
331 return h.helm.multiclusterReleaseName
332 }
333
334
335 func (h *TestHelper) GetHelmCharts() string {
336 return h.helm.charts
337 }
338
339
340 func (h *TestHelper) GetMulticlusterHelmChart() string {
341 return h.helm.multiclusterChart
342 }
343
344
345 func (h *TestHelper) GetLinkerdVizHelmChart() string {
346 return h.helm.vizChart
347 }
348
349
350
351 func (h *TestHelper) GetLinkerdVizHelmStableChart() string {
352 return h.helm.vizStableChart
353 }
354
355
356 func (h *TestHelper) UpgradeHelmFromVersion() string {
357 return h.helm.upgradeFromVersion
358 }
359
360
361 func (h *TestHelper) ExternalIssuer() bool {
362 return h.externalIssuer
363 }
364
365
366 func (h *TestHelper) ExternalPrometheus() bool {
367 return h.externalPrometheus
368 }
369
370
371 func (h *TestHelper) Multicluster() bool {
372 return h.multicluster
373 }
374
375
376 func (h *TestHelper) Uninstall() bool {
377 return h.uninstall
378 }
379
380
381 func (h *TestHelper) DefaultInboundPolicy() string {
382 return h.defaultInboundPolicy
383 }
384
385
386 func (h *TestHelper) UpgradeFromVersion() string {
387 return h.upgradeFromVersion
388 }
389
390
391 func (h *TestHelper) GetClusterDomain() string {
392 return h.clusterDomain
393 }
394
395
396 func (h *TestHelper) CNI() bool {
397 return h.cni
398 }
399
400
401 func (h *TestHelper) Calico() bool {
402 return h.calico
403 }
404
405
406 func (h *TestHelper) DualStack() bool {
407 return h.dualStack
408 }
409
410
411 func (h *TestHelper) NativeSidecar() bool {
412 return h.nativeSidecar
413 }
414
415
416
417 func (h *TestHelper) AddInstalledExtension(extensionName string) {
418 h.installedExtensions = append(h.installedExtensions, extensionName)
419 }
420
421
422
423 func (h *TestHelper) GetInstalledExtensions() []string {
424 return h.installedExtensions
425 }
426
427
428 func (h *TestHelper) CreateTLSSecret(name, root, cert, key string) error {
429 secret := fmt.Sprintf(`
430 apiVersion: v1
431 data:
432 ca.crt: %s
433 tls.crt: %s
434 tls.key: %s
435 kind: Secret
436 metadata:
437 name: %s
438 type: kubernetes.io/tls`, base64.StdEncoding.EncodeToString([]byte(root)), base64.StdEncoding.EncodeToString([]byte(cert)), base64.StdEncoding.EncodeToString([]byte(key)), name)
439
440 _, err := h.KubectlApply(secret, h.GetLinkerdNamespace())
441 return err
442 }
443
444
445
446 func (h *TestHelper) CmdRun(cmd string, arg ...string) (string, error) {
447 out, stderr, err := combinedOutput("", cmd, arg...)
448 if err != nil {
449 return out, fmt.Errorf("command failed: '%s %s'\n%w\n%s", cmd, strings.Join(arg, " "), err, stderr)
450 }
451 return out, nil
452 }
453
454
455 func (h *TestHelper) LinkerdRun(arg ...string) (string, error) {
456 out, stderr, err := h.PipeToLinkerdRun("", arg...)
457 if err != nil {
458 return out, fmt.Errorf("command failed: linkerd %s\n%w\n%s", strings.Join(arg, " "), err, stderr)
459 }
460 return out, nil
461 }
462
463
464
465 func (h *TestHelper) PipeToLinkerdRun(stdin string, arg ...string) (string, string, error) {
466 withParams := append([]string{"--linkerd-namespace", h.namespace, "--context=" + h.k8sContext}, arg...)
467 return combinedOutput(stdin, h.linkerd, withParams...)
468 }
469
470
471 func (h *TestHelper) HelmRun(arg ...string) (string, string, error) {
472 return h.PipeToHelmRun("", arg...)
473 }
474
475
476
477 func (h *TestHelper) PipeToHelmRun(stdin string, arg ...string) (string, string, error) {
478 withParams := append([]string{"--kube-context=" + h.k8sContext}, arg...)
479 return combinedOutput(stdin, h.helm.path, withParams...)
480 }
481
482
483
484
485 func (h *TestHelper) LinkerdRunStream(arg ...string) (*Stream, error) {
486 withParams := append([]string{"--linkerd-namespace", h.namespace, "--context=" + h.k8sContext}, arg...)
487 cmd := exec.Command(h.linkerd, withParams...)
488
489 cmdReader, err := cmd.StdoutPipe()
490 if err != nil {
491 return nil, err
492 }
493
494 err = cmd.Start()
495 if err != nil {
496 return nil, err
497 }
498
499 time.Sleep(500 * time.Millisecond)
500
501 if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
502 return nil, fmt.Errorf("Process exited: %s", cmd.ProcessState)
503 }
504
505 return &Stream{cmd: cmd, out: cmdReader}, nil
506 }
507
508
509
510
511 func (h *TestHelper) KubectlStream(arg ...string) (*Stream, error) {
512
513 withContext := append([]string{"--namespace", h.namespace, "--context=" + h.k8sContext}, arg...)
514 cmd := exec.Command("kubectl", withContext...)
515
516 cmdReader, err := cmd.StdoutPipe()
517 if err != nil {
518 return nil, err
519 }
520
521 err = cmd.Start()
522 if err != nil {
523 return nil, err
524 }
525
526 time.Sleep(500 * time.Millisecond)
527
528 if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
529 return nil, fmt.Errorf("Process exited: %s", cmd.ProcessState)
530 }
531
532 return &Stream{cmd: cmd, out: cmdReader}, nil
533 }
534
535
536 func (h *TestHelper) HelmUpgrade(chart, releaseName string, arg ...string) (string, string, error) {
537 withParams := append([]string{
538 "upgrade",
539 releaseName,
540 "--kube-context", h.k8sContext,
541 "--namespace", h.namespace,
542 "--timeout", "60m",
543 "--wait",
544 chart,
545 }, arg...)
546 return combinedOutput("", h.helm.path, withParams...)
547 }
548
549
550 func (h *TestHelper) HelmInstall(chart, releaseName string, arg ...string) (string, string, error) {
551 withParams := append([]string{
552 "install",
553 releaseName,
554 chart,
555 "--kube-context", h.k8sContext,
556 "--namespace", h.namespace,
557 "--create-namespace",
558 "--timeout", "60m",
559 "--wait",
560 }, arg...)
561 return combinedOutput("", h.helm.path, withParams...)
562 }
563
564
565 func (h *TestHelper) HelmCmdPlain(cmd, chart, releaseName string, arg ...string) (string, string, error) {
566 withParams := append([]string{
567 cmd,
568 releaseName,
569 chart,
570 "--kube-context", h.k8sContext,
571 }, arg...)
572
573 return combinedOutput("", h.helm.path, withParams...)
574 }
575
576
577 func (h *TestHelper) HelmInstallMulticluster(chart string, arg ...string) (string, string, error) {
578 withParams := append([]string{
579 "install",
580 h.helm.multiclusterReleaseName,
581 chart,
582 "--kube-context", h.k8sContext,
583 "--namespace", h.GetMulticlusterNamespace(),
584 "--create-namespace",
585 "--set", "linkerdNamespace=" + h.GetLinkerdNamespace(),
586 }, arg...)
587 return combinedOutput("", h.helm.path, withParams...)
588 }
589
590
591 func (h *TestHelper) HelmUninstallMulticluster(chart string) (string, string, error) {
592 withParams := []string{
593 "delete",
594 h.helm.multiclusterReleaseName,
595 "--kube-context", h.k8sContext,
596 }
597 return combinedOutput("", h.helm.path, withParams...)
598 }
599
600
601
602 func (h *TestHelper) ValidateOutput(out, fixtureFile string) error {
603 expected, err := ReadFile("testdata/" + fixtureFile)
604 if err != nil {
605 return err
606 }
607
608 if out != expected {
609 return fmt.Errorf(
610 "Expected:\n%s\nActual:\n%s", expected, out)
611 }
612
613 return nil
614 }
615
616
617 func (h *TestHelper) CheckVersion(serverVersion string) error {
618 out, err := h.LinkerdRun("version")
619 if err != nil {
620 return err
621 }
622 if !strings.Contains(out, fmt.Sprintf("Client version: %s", h.version)) {
623 return fmt.Errorf("Expected client version [%s], got:\n%s", h.version, out)
624 }
625 if !strings.Contains(out, fmt.Sprintf("Server version: %s", serverVersion)) {
626 return fmt.Errorf("Expected server version [%s], got:\n%s", serverVersion, out)
627 }
628 return nil
629 }
630
631
632
633
634 func RetryFor(timeout time.Duration, fn func() error) error {
635 err := fn()
636 if err == nil {
637 return nil
638 }
639
640 timeoutAfter := time.NewTimer(timeout)
641 defer timeoutAfter.Stop()
642 retryAfter := time.NewTicker(time.Second)
643 defer retryAfter.Stop()
644
645 for {
646 select {
647 case <-timeoutAfter.C:
648 return err
649 case <-retryAfter.C:
650 err = fn()
651 if err == nil {
652 return nil
653 }
654 }
655 }
656 }
657
658
659
660
661
662 func (h *TestHelper) HTTPGetURL(url string) (string, error) {
663 var body string
664 err := RetryFor(time.Minute, func() error {
665 resp, err := h.httpClient.Get(url)
666 if err != nil {
667 return err
668 }
669
670 defer resp.Body.Close()
671 bytes, err := io.ReadAll(resp.Body)
672 if err != nil {
673 return fmt.Errorf("Error reading response body: %w", err)
674 }
675 body = string(bytes)
676
677 if resp.StatusCode != http.StatusOK {
678 return fmt.Errorf("GET request to %s returned status code %d with body %q", url, resp.StatusCode, body)
679 }
680
681 return nil
682 })
683
684 return body, err
685 }
686
687
688 func (h *TestHelper) WithDataPlaneNamespace(ctx context.Context, testName string, annotations map[string]string, t *testing.T, test func(t *testing.T, ns string)) {
689 prefixedNs := h.GetTestNamespace(testName)
690 if err := h.CreateDataPlaneNamespaceIfNotExists(ctx, prefixedNs, annotations); err != nil {
691 AnnotatedFatalf(t, fmt.Sprintf("failed to create %s namespace", prefixedNs),
692 "failed to create %s namespace: %s", prefixedNs, err)
693 }
694 test(t, prefixedNs)
695 if err := h.DeleteNamespaceIfExists(ctx, prefixedNs); err != nil {
696 AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", prefixedNs),
697 "failed to delete %s namespace: %s", prefixedNs, err)
698 }
699 }
700
701
702
703 func (h *TestHelper) GetReleaseChannelVersions() (map[string]string, error) {
704 url := "https://versioncheck.linkerd.io/version.json"
705 resp, err := h.httpClient.Get(url)
706 if err != nil {
707 return map[string]string{}, err
708 }
709 defer resp.Body.Close()
710
711 var versions map[string]string
712 if err := json.NewDecoder(resp.Body).Decode(&versions); err != nil {
713 return map[string]string{}, err
714 }
715
716 return versions, nil
717 }
718
719
720
721
722 func (h *TestHelper) DownloadCLIBinary(filepath, version string) error {
723 url := fmt.Sprintf("https://github.com/linkerd/linkerd2/releases/download/%[1]s/linkerd2-cli-%[1]s-%s-%s", version, runtime.GOOS, runtime.GOARCH)
724 resp, err := h.httpClient.Get(url)
725 if err != nil {
726 return err
727 }
728 defer resp.Body.Close()
729
730
731
732
733
734 out, err := os.OpenFile(filepath, os.O_RDWR|os.O_CREATE, 0500)
735 if err != nil {
736 return err
737 }
738 defer out.Close()
739
740 _, err = io.Copy(out, resp.Body)
741 return err
742 }
743
744
745 func ReadFile(file string) (string, error) {
746 b, err := os.ReadFile(file)
747 if err != nil {
748 return "", err
749 }
750 return string(b), nil
751 }
752
753
754 func combinedOutput(stdin string, name string, arg ...string) (string, string, error) {
755 command := exec.Command(name, arg...)
756 command.Stdin = strings.NewReader(stdin)
757 var stderr bytes.Buffer
758 command.Stderr = &stderr
759
760 stdout, err := command.Output()
761 return string(stdout), stderr.String(), err
762 }
763
764
765 type RowStat struct {
766 Name string
767 Status string
768 Meshed string
769 Success string
770 Rps string
771 P50Latency string
772 P95Latency string
773 P99Latency string
774 TCPOpenConnections string
775 UnauthorizedRPS string
776 }
777
778
779 func CheckRowCount(out string, expectedRowCount int) ([]string, error) {
780 rows := strings.Split(strings.TrimSuffix(out, "\n"), "\n")
781 if len(rows) < 2 {
782 return nil, fmt.Errorf(
783 "Expected at least 2 lines in %q",
784 out,
785 )
786 }
787 rows = rows[1:]
788 if len(rows) != expectedRowCount {
789 return nil, fmt.Errorf(
790 "Expected %d rows in stat output but got %d in %q",
791 expectedRowCount, len(rows), out)
792 }
793
794 return rows, nil
795 }
796
797
798 func ParseRows(out string, expectedRowCount, expectedColumnCount int) (map[string]*RowStat, error) {
799 rows, err := CheckRowCount(out, expectedRowCount)
800 if err != nil {
801 return nil, err
802 }
803
804 rowStats := make(map[string]*RowStat)
805 for _, row := range rows {
806 fields := strings.Fields(row)
807
808 if expectedColumnCount == 0 {
809 expectedColumnCount = 8
810 }
811 if len(fields) != expectedColumnCount {
812 return nil, fmt.Errorf(
813 "Expected %d columns in stat output but got %d in %q",
814 expectedColumnCount, len(fields), row)
815 }
816
817 rowStats[fields[0]] = &RowStat{
818 Name: fields[0],
819 }
820
821 i := 0
822 if expectedColumnCount == 9 {
823 rowStats[fields[0]].Status = fields[1]
824 i = 1
825 }
826 rowStats[fields[0]].Meshed = fields[1+i]
827 rowStats[fields[0]].Success = fields[2+i]
828 rowStats[fields[0]].Rps = fields[3+i]
829 rowStats[fields[0]].P50Latency = fields[4+i]
830 rowStats[fields[0]].P95Latency = fields[5+i]
831 rowStats[fields[0]].P99Latency = fields[6+i]
832
833 if 7+i < len(fields) {
834 rowStats[fields[0]].TCPOpenConnections = fields[7+i]
835 }
836 }
837
838 return rowStats, nil
839 }
840
841
842 func ParseEvents(out string) ([]*corev1.Event, error) {
843 var list corev1.List
844 if err := json.Unmarshal([]byte(out), &list); err != nil {
845 return nil, fmt.Errorf("error unmarshaling list from `kubectl get events`: %w", err)
846 }
847
848 if len(list.Items) == 0 {
849 return nil, errors.New("no events found")
850 }
851
852 var events []*corev1.Event
853 for _, i := range list.Items {
854 var e corev1.Event
855 if err := json.Unmarshal(i.Raw, &e); err != nil {
856 return nil, fmt.Errorf("error unmarshaling list event from `kubectl get events`: %w", err)
857 }
858 events = append(events, &e)
859 }
860
861 return events, nil
862 }
863
View as plain text