package cloudtoedge import ( "context" _ "embed" "fmt" "os" "testing" "time" "cloud.google.com/go/pubsub" "edge-infra.dev/pkg/edge/bsl" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/info" "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/ktest" "edge-infra.dev/test/f2/x/ktest/envtest" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) var ( f f2.Framework bslInfo *bsl.BSLInfo edgeInfo *info.EdgeInfo nodes *corev1.NodeList pubSubClient *pubsub.Client couchNamespace = "data-sync-couchdb" pubSubTopic = "data-sync-c2e" dbName = "data-sync-e2e-tests" defaultPort = 5984 timeoutMultiplier time.Duration = 10 portMapping = map[string]*ktest.PortForward{} //go:embed testdata/tlog.json tlog []byte ) func TestMain(m *testing.M) { ctrl.SetLogger(fog.New().WithName("cloudtoedge_e2e_test")) f = f2.New(context.Background(), f2.WithExtensions(ktest.New( ktest.WithCtrlManager(createManager), ktest.WithEnvtestOptions(envtest.WithoutCRDs()), ktest.SkipNamespaceCreation(), ))). WithLabel("dsds", "true") f.Setup(func(ctx f2.Context) (f2.Context, error) { k, err := ktest.FromContext(ctx) if err != nil { return ctx, err } edgeInfo, err = info.FromClient(ctx, k.Client) if err != nil { return ctx, fmt.Errorf("fail to get edge-info configmap: %w", err) } bslInfo, err = bsl.FromClient(ctx, k.Client) if err != nil { return ctx, fmt.Errorf("fail to get bsl-info configmap: %w", err) } pubSubClient, err = pubsub.NewClient(ctx, edgeInfo.ForemanProjectID) if err != nil { return ctx, fmt.Errorf("fail to create pub/sub client: %s: %w", edgeInfo.ForemanProjectID, err) } exists, err := pubSubClient.Topic(pubSubTopic).Exists(ctx) if err != nil { return ctx, fmt.Errorf("fail to check if topic exists: %w", err) } if !exists { return ctx, fmt.Errorf("topic %s does not exist", pubSubTopic) } nodes = &corev1.NodeList{} err = k.Client.List(ctx, nodes) if err != nil { return ctx, fmt.Errorf("fail to list nodes: %w", err) } couchdbPods := &corev1.PodList{} err = k.Client.List(ctx, couchdbPods, client.InNamespace(couchNamespace), client.MatchingLabels{"platform.edge.ncr.com/component": couchNamespace}) if err != nil { return ctx, fmt.Errorf("fail to list couchdb pods: %w", err) } if len(nodes.Items) != len(couchdbPods.Items) { return ctx, fmt.Errorf("number of couchdb pods not equal to number of nodes") } for _, couchdbPod := range couchdbPods.Items { portForward := &ktest.PortForward{Namespace: couchNamespace} err = portForward.ForwardPodCtx(ctx, couchdbPod.Name, defaultPort) if err != nil { return ctx, fmt.Errorf("fail to forward couchdb pod: %w", err) } portMapping[couchdbPod.Name] = portForward } return ctx, nil }).Teardown(func(ctx f2.Context) (f2.Context, error) { if pubSubClient == nil { return ctx, nil } return ctx, pubSubClient.Close() }) os.Exit(f.Run(m)) } func createManager(opts ...controller.Option) (ctrl.Manager, error) { mgrCfg, mgrOpts := controller.ProcessOptions(opts...) mgrOpts.Scheme = createScheme() return ctrl.NewManager(mgrCfg, mgrOpts) } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(dsapi.AddToScheme(scheme)) return scheme }