package slowcooker import ( "context" "embed" "encoding/json" "fmt" "io" "io/fs" "os" "regexp" "strconv" "strings" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gotest.tools/v3/assert/cmp" "gotest.tools/v3/poll" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/k8s/testing/kmp" "edge-infra.dev/pkg/k8s/unstructured" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/integration" "edge-infra.dev/test/f2/x/ktest" "edge-infra.dev/test/f2/x/ktest/envtest" "edge-infra.dev/test/f2/x/ktest/kustomization" ) //go:embed testdata var manifests embed.FS var f f2.Framework var slowcookerManifests []byte const slowcooker = "slow-cooker" type Result struct { P50 int `json:"p50"` P75 int `json:"p75"` P90 int `json:"p90"` P95 int `json:"p95"` P99 int `json:"p99"` P999 int `json:"p999"` } func TestMain(m *testing.M) { f = f2.New( context.Background(), f2.WithExtensions( ktest.New( ktest.SkipNamespaceCreation(), ktest.WithEnvtestOptions( envtest.WithoutCRDs(), ), ), ), ). Setup(func(ctx f2.Context) (f2.Context, error) { if !integration.IsL2() { return ctx, fmt.Errorf("%w: requires L2 integration test level", f2.ErrSkip) } return ctx, nil }). Setup(func(ctx f2.Context) (f2.Context, error) { // Load slowcooker kustomization manifests var err error slowcookerManifests, err = fs.ReadFile(manifests, "testdata/slow-cooker_manifests.yaml") if err != nil { return ctx, err } return ctx, nil }). WithLabel("dsds", "true"). WithLabel("performance", "true"). Slow() os.Exit(f.Run(m)) } func TestSlowcooker(t *testing.T) { var iterations int var podName string var logs string var k *ktest.K8s var manifests []*unstructured.Unstructured var err error feature := f2.NewFeature("slowcooker"). Setup("apply slowcooker manifests", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) manifests, err = kustomization.ProcessManifests(ctx.RunID, slowcookerManifests, slowcooker) require.NoError(t, err) for _, manifest := range manifests { require.NoError(t, k.Client.Create(ctx, manifest)) } return ctx }). Setup("wait for slowcooker manifests", func(ctx f2.Context, t *testing.T) f2.Context { job := &batchv1.Job{} for _, manifest := range manifests { if manifest.GetKind() == "Job" && manifest.GetName() == slowcooker { require.NoError(t, unstructured.FromUnstructured(manifest, job)) for _, arg := range job.Spec.Template.Spec.Containers[0].Args { if strings.Contains(arg, "iterations") { iterations, err = strconv.Atoi(strings.TrimPrefix(arg, "--iterations=")) require.NoError(t, err) } } k.WaitOn(t, k.Check(manifest, kmp.IsCurrent()), poll.WithTimeout(time.Minute)) } } return ctx }). Setup("wait for slowcooker pod to be ready", func(ctx f2.Context, t *testing.T) f2.Context { podName, err = getPodName(ctx, k) require.NoError(t, err) pod := &corev1.Pod{} require.NoError(t, k.Client.Get(ctx, k8stypes.NamespacedName{Name: podName, Namespace: slowcooker}, pod)) k.WaitOn(t, k.Check(pod, func(_ client.Object) cmp.Result { require.NoError(t, k.Client.Get(ctx, k8stypes.NamespacedName{Name: podName, Namespace: slowcooker}, pod)) if isPodReady(pod) { return cmp.ResultSuccess } return cmp.ResultFailure("slowcooker pod is not ready") }), poll.WithTimeout(time.Minute*2)) return ctx }). Setup("get slowcooker logs", func(ctx f2.Context, t *testing.T) f2.Context { pod := &corev1.Pod{} require.NoError(t, k.Client.Get(ctx, k8stypes.NamespacedName{Name: podName, Namespace: slowcooker}, pod)) k.WaitOn(t, k.Check(pod, func(_ client.Object) cmp.Result { require.NoError(t, k.Client.Get(ctx, k8stypes.NamespacedName{Name: podName, Namespace: slowcooker}, pod)) if !isPodReady(pod) { return cmp.ResultSuccess } return cmp.ResultFailure("slow cooker container still running") }), poll.WithTimeout(time.Minute*time.Duration(iterations+2))) logs, err = getLogs(ctx, k, podName) require.NoError(t, err) fmt.Println(logs) return ctx }). Test("over 99 percent average throughput", func(ctx f2.Context, t *testing.T) f2.Context { averageThroughput, err := calculateAverageThroughput(logs, iterations) require.NoError(t, err) fmt.Printf("Average throughput: %v%%\n", averageThroughput) assert.GreaterOrEqual(t, averageThroughput, float32(99.0)) return ctx }). Test("over 95 percent successful requests", func(ctx f2.Context, t *testing.T) f2.Context { successfulRequests, err := calculateSuccessfulRequests(logs) require.NoError(t, err) fmt.Printf("Successful requests: %v%%\n", successfulRequests) assert.GreaterOrEqual(t, successfulRequests, float32(95.0)) return ctx }). Test("99th percentile latency less than 2ms", func(ctx f2.Context, t *testing.T) f2.Context { latency, err := getHighestLatency(logs) require.NoError(t, err) fmt.Printf("Latency: %vms\n", latency) assert.LessOrEqual(t, latency, 2) return ctx }). Teardown("delete namespace", func(ctx f2.Context, t *testing.T) f2.Context { require.NoError(t, k.Client.Delete(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: slowcooker}})) return ctx }).Feature() f.Test(t, feature) } func getPodName(ctx context.Context, k *ktest.K8s) (string, error) { podList := &corev1.PodList{} err := k.Client.List(ctx, podList, &client.ListOptions{Namespace: slowcooker}) if err != nil { return "", err } for _, pod := range podList.Items { if strings.Contains(pod.Name, slowcooker) { return pod.Name, nil } } return "", fmt.Errorf("slow-cooker pod not found") } func isPodReady(pod *corev1.Pod) bool { for _, condition := range pod.Status.Conditions { if condition.Type == "Ready" && condition.Status == corev1.ConditionTrue { return true } } return false } func getLogs(ctx context.Context, k *ktest.K8s, podName string) (string, error) { logStream, err := k.GetContainerLogs(ctx, podName, slowcooker, slowcooker) if err != nil { return "", err } logsData, err := io.ReadAll(logStream) if err != nil { return "", err } return string(logsData), nil } func getHighestLatency(logs string) (int, error) { result := &Result{} err := json.Unmarshal([]byte(fmt.Sprintf("{%s", strings.SplitN(logs, "{", 2)[1])), result) if err != nil { return 0, err } return result.P99, nil } func calculateSuccessfulRequests(logs string) (float32, error) { totalGood := 0 totalBad := 0 totalFailed := 0 lines := strings.Split(logs, "\n") for _, line := range lines { re := regexp.MustCompile("[0-9]+/[0-9]+/[0-9]+") requests := string(re.Find([]byte(line))) if requests != "" { good, err := strconv.Atoi(strings.Split(requests, "/")[0]) if err != nil { return 0, err } totalGood += good bad, err := strconv.Atoi(strings.Split(requests, "/")[1]) if err != nil { return 0, err } totalBad += bad failed, err := strconv.Atoi(strings.Split(requests, "/")[2]) if err != nil { return 0, err } totalFailed += failed } } totalRequests := totalGood + totalBad + totalFailed successfulRequests := float32(totalGood) / float32(totalRequests) * 100 return successfulRequests, nil } func calculateAverageThroughput(logs string, iterations int) (float32, error) { totalThroughput := 0 lines := strings.Split(logs, "\n") for _, line := range lines { re := regexp.MustCompile("[0-9]+%") throughput := strings.TrimSuffix(string(re.Find([]byte(line))), "%") if throughput != "" { throughput, err := strconv.Atoi(throughput) if err != nil { return 0, err } totalThroughput += throughput } } return float32(totalThroughput) / float32(iterations), nil }