package integrationv2 import ( "context" "embed" "fmt" "os" "path" "testing" "cloud.google.com/go/pubsub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "edge-infra.dev/pkg/sds/emergencyaccess/emulatorsvc" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/fctx" "edge-infra.dev/test/f2/integration" "edge-infra.dev/test/f2/x/ktest" "edge-infra.dev/test/f2/x/ktest/envtest" "edge-infra.dev/test/f2/x/postgres" ) var ( f f2.Framework smanifests map[string]string ) //go:embed testdata var manifests embed.FS const ( // names to avoid typos in maps userServiceName = "userService" eaGatewayName = "eaGateway" rulesEngineName = "rulesEngine" authServiceName = "authService" authProxyName = "authProxy" mockBFFName = "mockBFF" pubsubName = "pubsub" ) func TestMain(m *testing.M) { f = f2.New( context.Background(), f2.WithExtensions( // Include ktest extension for access to a k8s cluster ktest.New( ktest.WithEnvtestOptions( // Do not install any CRD into cluster envtest.WithoutCRDs(), ), ), postgres.New( postgres.ApplySeedModel(), ), ), ). Setup(func(ctx f2.Context) (f2.Context, error) { // Test execution should end here unless -integration-level=2 is passed to test if !integration.IsL2() { return ctx, fmt.Errorf("%w: requires L2 integration test level", f2.ErrSkip) } return ctx, nil }). Setup(func(ctx f2.Context) (f2.Context, error) { var err error manifestPaths := map[string]string{ userServiceName: path.Join("testdata", "userservice_manifests.yaml"), authServiceName: path.Join("testdata", "authservice_manifests.yaml"), rulesEngineName: path.Join("testdata", "rulesengine_manifests.yaml"), eaGatewayName: path.Join("testdata", "eagateway_manifests.yaml"), authProxyName: path.Join("testdata", "auth_proxy_manifests.yaml"), mockBFFName: path.Join("testdata", "mockbffserver_manifests.yaml"), pubsubName: path.Join("testdata", "pubsub_manifests.yaml"), } smanifests = map[string]string{} for name, path := range manifestPaths { smanifests[name], err = getManifests(manifests, path) if err != nil { return ctx, err } } return ctx, err }) //Run the tests os.Exit(f.Run(m)) } func TestEA(t *testing.T) { var ( namespace string pgConfigMapData map[string]string portforwardReng = ktest.PortForward{} portForwardProxy = ktest.PortForward{} portForwardPSE = ktest.PortForward{} emsvc *emulatorsvc.EmulatorService sub *pubsub.Subscription ) feat := f2.NewFeature("Test Remote CLI services"). Setup("Get info for CM config", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) namespace = k.Namespace pg := postgres.FromContextT(ctx, t) pgConfigMapData = map[string]string{ "DATABASE_CONNECTION_NAME": pg.ConnectionName, "DATABASE_HOST": pg.K8SHost(), "DATABASE_PORT": fmt.Sprintf("%d", pg.Port), "DATABASE_NAME": pg.Database, "DATABASE_USERNAME": pg.User, "DATABASE_PASSWORD": pg.Password, "DATABASE_SCHEMA": pg.Schema(), } return ctx }). // auth proxy Setup("Create auth proxy", createService(smanifests[authProxyName], processAuthProxyManifests(&namespace), addPostGresDetailsToCM("auth-proxy", &pgConfigMapData))). // mock bff server Setup("Create mock BFF Server", createService(smanifests[mockBFFName])). // user service Setup("Create user service", createService(smanifests[userServiceName], addPostGresDetailsToCM("userservice", &pgConfigMapData))). // auth service Setup("Create auth service", createService(smanifests[authServiceName], addPostGresDetailsToCM("authservice", &pgConfigMapData))). Setup("Add data from seed", postgres.WithData(testOrgSeed)). // rules engine Setup("Create rules engine", createService(smanifests[rulesEngineName], addPostGresDetailsToCM("rulesengine", &pgConfigMapData))). // EA Gateway Setup("Create EAGateway", createService(smanifests[eaGatewayName], processEAGatewayCM(&namespace), addPostGresDetailsToCM("eagateway", &pgConfigMapData))). // create + configure pubsub emulator Setup("Deploy pubsub emulator", createService(smanifests[pubsubName])). Setup("Wait for pubsub emulator", waitOn(smanifests[pubsubName])). Setup("Port forward pubsub emulator", portForwardPSE.Forward("pubsub-service", 8085)). Setup("Create topic and subscription", func(ctx f2.Context, t *testing.T) f2.Context { // projectID needs to be ID not name // Create a new client that connects to the pubsub emulator in the // test namespace via the port forward. Manually set the same // options that [pubsub.NewClient] would set when it detects the // PUBSUB_EMULATOR_HOST env var as this means we don't need to set // an environment variable for the test (env var's are process // specific so would not allow running parallel tests against // different addresses) pseClient, err := pubsub.NewClient(ctx, "d56bf564-90f7-4036-9eab-9efd435e68fe", option.WithEndpoint(portForwardPSE.Retrieve(t)), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), option.WithoutAuthentication(), option.WithTelemetryDisabled(), internaloption.SkipDialSettingsValidation()) assert.NoError(t, err) reqTopic, err := pseClient.CreateTopic(ctx, "topic.dsds-ea-request") assert.NoError(t, err) // this subscription isnt needed for this test, but is used to verify the message is sent correctly in a // later step function. sub, err = pseClient.CreateSubscription(ctx, "sub.dsds-ea-request", pubsub.SubscriptionConfig{ Topic: reqTopic, //AckDeadline: 10 * time.Second, }) assert.NoError(t, err) topic, err := pseClient.CreateTopic(ctx, "topic.dsds-ea-response") assert.NoError(t, err) // subscription is sub.clusterEdgeID (as defined in seed).dsds-ea-response _, err = pseClient.CreateSubscription(ctx, "sub.f983e403-b2fc-4359-83e5-53e2c993141b.dsds-ea-response", pubsub.SubscriptionConfig{ Topic: topic, }) assert.NoError(t, err) return ctx }). // Wait for remaining services to be ready Setup("Wait for auth-proxy deployment", waitOn(smanifests[authProxyName])). Setup("Wait for mock bff server deployment", waitOn(smanifests[mockBFFName])). Setup("Wait for userservice deployment", waitOn(smanifests[userServiceName])). Setup("Wait for authservice deployment", waitOn(smanifests[authServiceName])). Setup("Wait for rulesengine deployment", waitOn(smanifests[rulesEngineName])). Setup("Wait for EAGateway", waitOn(smanifests[eaGatewayName])). // Port forwarding for rules engine Setup("Rulesengine port forwarding", portforwardReng.Forward("rulesengine", 8080)). Setup("Store rules engine port forward in ctx", func(ctx f2.Context, _ *testing.T) f2.Context { return fctx.WithValue(ctx, rulesEnginePortForwardKey{}, portforwardReng) }). // configure rules engine Setup("Add commands", addCommands([]string{"ls"})). Setup("Add privileges", addPrivileges([]string{"ea-admin"})). Setup("Add default rules", addDefaultRules([]defaultRule{ {"ls", []string{"ea-admin"}}, })). Setup("Delete default rule", func(ctx f2.Context, _ *testing.T) f2.Context { // Nothing for now _ = deleteDefaultRule return ctx }). Setup("Add role mapping", addRoleMapping(map[string][]string{"EDGE_ADMIN": {"ea-admin"}})). Setup("Port forward auth proxy", portForwardProxy.Forward("auth-proxy", 9003)). Test("Auth OK", func(ctx f2.Context, t *testing.T) f2.Context { cfg := emulatorsvc.NewConfig("testdata", emulatorsvc.Profile{ Username: "testValidUser1", Password: "abcd", Organization: "testOrganization", API: "http://" + portForwardProxy.Retrieve(t) + "/api/v2", }) var err error emsvc, err = emulatorsvc.New(ctx, cfg) require.NoError(t, err) err = emsvc.RetrieveIdentity(ctx) require.NoError(t, err) return ctx }). Test("Test Valid Command", func(ctx f2.Context, t *testing.T) f2.Context { // Connect with a valid session // banner, store and terminalIDs are defined in the Seed, loaded during the WithData step for authservice err := emsvc.Connect(ctx, "testOrganization", "testBanner1", "testStore1", "testTerminal1") require.NoError(t, err) // Send a valid command commandID, err := emsvc.Send("ls") require.NoError(t, err) // Receive the message from the subscription cctx, cancel := context.WithCancel(ctx) defer cancel() err = sub.Receive(cctx, func(_ context.Context, m *pubsub.Message) { t.Log("Received message", "message", string(m.Data)) // check the message is as expected assert.JSONEq(t, "{\"command\":\"ls\"}", string(m.Data)) assertPubSubMessageAttributesEqual(t, m, map[string]string{ "bannerId": "3cf00a00-d06b-40c7-859c-fbddfdeb1177", "storeId": "f983e403-b2fc-4359-83e5-53e2c993141b", "terminalId": "55c61200-d079-4b71-9e45-f5a81b7af86d", "identity": "testValidUser1", "version": "1.0", "signature": "", "commandId": commandID, }) assertPubSubMessageAttributesNotNil(t, m, []string{"sessionId"}) m.Ack() cancel() // stop receiving after the first message }) assert.NoError(t, err) return ctx }). Feature() f.Test(t, feat) }