package couchctl import ( "context" "errors" "fmt" "os" "path/filepath" "strconv" "strings" "testing" "time" "github.com/google/uuid" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" k8errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "edge-infra.dev/pkg/edge/bsl" "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/edge/info" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" "edge-infra.dev/pkg/k8s/testing/kmp" "edge-infra.dev/pkg/lib/fog" v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" nodemeta "edge-infra.dev/pkg/sds/ien/node" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/integration" "edge-infra.dev/test/f2/x/ktest" ) const ( _fleetType = "fleet-type" _clusterType = "cluster-type" defaultHost = "localhost" ) var ( f f2.Framework // flags laneNumber string //couchdbPort string clean bool couchCtlConfig *Config couchDBServer *dsapi.CouchDBServer touchpointSever *dsapi.CouchDBServer couchDBDatabase *dsapi.CouchDBDatabase couchDBDUser *dsapi.CouchDBUser couchDBIndex *dsapi.CouchDBIndex couchDBDesignDoc *dsapi.CouchDBDesignDoc couchDBReplicationSet *dsapi.CouchDBReplicationSet couchDBPersistence *dsapi.CouchDBPersistence _secretManager secretManager ) func init() { couchCtlConfig = &Config{} couchCtlConfig.BindFlags(f2.Flags) f2.Flags.StringVar(&laneNumber, "lane-number", "1", "the lane number for touchpoint") f2.Flags.BoolVar(&clean, "clean", false, "delete couchdb data before starting testing") } func TestMain(m *testing.M) { ctrl.SetLogger(fog.New()) ctxMgr := contextAwareManager{cfg: couchCtlConfig} f = f2.New(context.Background(), f2.WithExtensions(ktest.New(ktest.WithCtrlManager(ctxMgr.createManager)))) // f2 parses command line args, but not env variables if err := ensureValidConfig(); err != nil { ctrl.Log.Error(err, "invalid config values:") os.Exit(1) } if f2.Labels == nil { f2.Labels = map[string]string{} } f2.Labels[_fleetType] = couchCtlConfig.FleetType f2.Labels[_clusterType] = couchCtlConfig.ClusterType cctx, cancel := context.WithCancel(context.Background()) defer cancel() var replicationEvent *ReplicationEvent f.Setup(func(ctx f2.Context) (f2.Context, error) { k, err := ktest.FromContext(ctx) if err != nil { return ctx, err } resourceName := fmt.Sprintf("couchdb-%s", couchCtlConfig.NodeUID) laneUID := uuid.NewString() ctrl.Log.Info("Setting up Test", "resource", resourceName, "touchpoint", laneUID) couchDBServer = newCouchdbServer(resourceName) touchpointSever = dsapi.NewTouchpointCouchDBServer(laneUID, laneNumber) couchDBDatabase = newCouchDBDatabase(resourceName, couchDBServer) couchDBDUser = newCouchDBUser(resourceName, fmt.Sprintf("username-%s", ctx.RunID), couchDBServer) couchDBReplicationSet = newCouchDBReplicationSet(resourceName, couchDBServer) couchDBPersistence = newCouchDBPersistence(ctx.RunID) couchDBIndex = newCouchDBIndex(resourceName, couchDBDatabase) couchDBDesignDoc = newCouchDBDesignDoc(resourceName, couchDBDatabase) server := mockInterlockClient(cctx) // Mock resources if running locally if integration.IsL1() { //nolint:nestif if err = localTestResources(ctx, k.Client, couchDBServer); err != nil { return ctx, err } } else { _secretManager = &gcpSecretManager{} // Suspend CouchCTL Pallet err := suspendPallet(ctx, k.Client, "couchctl-", true) if client.IgnoreNotFound(err) != nil { return ctx, err } err = scaleDeployment(ctx, k.Client, types.NamespacedName{ Name: couchCtlConfig.CouchCTLNamespace, Namespace: couchCtlConfig.CouchCTLNamespace, }, 0) if client.IgnoreNotFound(err) != nil { return ctx, err } // delete resources created by couchdb if clean { err = cleanUpBeforeTest(ctx, k.Client, k.Timeout, k.Tick) } else { err = resetBeforeTest(ctx, k.Client) } if err != nil { return ctx, err } err = client.IgnoreAlreadyExists(k.Client.Create(ctx, couchDBServer)) if err != nil { return ctx, err } } couchCtlConfig.InterlockAPIURL = server.URL rle := PersistenceLeaderElectorFunc(func() bool { return true }) nrp := NodeResourcePredicateFunc(func(*Config, client.Object) bool { return true }) replicationEvent = NewReplicationEvent(couchCtlConfig) replicationEvent.changesFunc = func(_ context.Context, _, _, _ string) (Changes, error) { return NewChangesIter(mockChangesResults()...), nil } replicationEvent.isRetryableError = func(_ error) bool { return false } err = setupControllers(k.Manager, couchCtlConfig, _secretManager, rle, nrp, replicationEvent) if err != nil { return ctx, err } return ctx, nil }).Teardown(func(ctx f2.Context) (f2.Context, error) { if replicationEvent != nil { replicationEvent.Stop() } // if its an L1 test dont teardown since its being mocked if integration.IsL1() { return ctx, nil } k, err := ktest.FromContext(ctx) if err != nil { return ctx, err } err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBReplicationSet)) if err != nil { return ctx, err } err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBDUser)) if err != nil { return ctx, err } err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBDatabase)) if err != nil { return ctx, err } err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBServer)) if err != nil { return ctx, err } err = client.IgnoreNotFound(k.Client.Delete(ctx, couchDBPersistence)) if err != nil { return ctx, err } return ctx, client.IgnoreNotFound(suspendPallet(ctx, k.Client, "couchctl-", false)) }) os.Exit(f.Run(m)) } func newCouchDBDesignDoc(name string, database *dsapi.CouchDBDatabase) *dsapi.CouchDBDesignDoc { return &dsapi.CouchDBDesignDoc{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: database.Namespace, }, Spec: dsapi.CouchDBDesignDocSpec{ DB: database.Name, ID: "test", DesignDoc: dsapi.DesignDoc{ Views: map[string]map[string]string{ "test": { "map": "function(doc) { if (doc.type === 'test') { emit(doc._id, doc); } }", }, }, }, }, } } func newCouchDBIndex(name string, database *dsapi.CouchDBDatabase) *dsapi.CouchDBIndex { return &dsapi.CouchDBIndex{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: database.Namespace, }, Spec: dsapi.CouchDBIndexSpec{ DB: database.Name, DDoc: "test", Name: "test", Type: "json", Index: dsapi.Index{ Fields: []string{"referenceId"}, PartialFilterSelector: `{"referenceId": {"$gt": "10000"},"limit": 2,"skip": 0}`, }, }, } } func testReady(o client.Object) f2.StepFn { return func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) k.WaitOn(t, k.Check(o, kmp.IsReady())) return ctx } } func newCouchdbServer(name string) *dsapi.CouchDBServer { var server *dsapi.CouchDBServer if couchCtlConfig.FleetType == fleet.CouchDB { server = dsapi.NewAdminCouchDBServer() } else { server = dsapi.NewStoreCouchDBServer() server.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue server.Labels[couchdb.NodeUIDLabel] = couchCtlConfig.NodeUID } server.Name = name // touchpoint CouchDBServer is created by PersistenceController return server } func localTestResources(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) error { server.Spec.URI = defaultHost touchpointSever.Spec.URI = defaultHost nsCouchDB := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: server.Namespace}} err := client.IgnoreAlreadyExists(cl.Create(ctx, nsCouchDB)) if err != nil { return err } nsCouchCTL := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ControllerNamespace}} err = client.IgnoreAlreadyExists(cl.Create(ctx, nsCouchCTL)) if err != nil { return err } if couchCtlConfig.FleetType == fleet.CouchDB { err = cloudIngress(ctx, cl, server) if err != nil { return err } } else { secret, err := cloudReplicationCredentials(ctx, cl) if err != nil { return err } err = client.IgnoreAlreadyExists(cl.Create(ctx, secret)) if err != nil { return err } } // for stores, CouchDBServer is created by CouchDBPersistence err = client.IgnoreAlreadyExists(cl.Create(ctx, couchDBServer)) if err != nil { return err } totalNodes := 1 if couchCtlConfig.ClusterType == cluster.DSDS { // 1 store sever + 1 lane server totalNodes = 2 err = fakePodsForCouchdbServer(ctx, cl, touchpointSever) if err != nil { return err } } err = nodes(ctx, cl, totalNodes) if err != nil { return err } err = fakePodsForCouchdbServer(ctx, cl, couchDBServer) if err != nil { return err } couchDBCluster, err := NewMockCouchDBCluster(couchCtlConfig, cl, couchDBServer, couchDBDatabase, couchDBDUser, couchDBIndex, couchDBDesignDoc, couchDBReplicationSet) if err != nil { return err } couchCtlConfig.CouchDBPort = couchDBCluster.Port() _secretManager = &mockSecretManager{ clients: make(map[string]*mockSecretManagerClient), } return nil } // cloudReplicationCredentials cloud to store replication information func cloudReplicationCredentials(ctx context.Context, cl client.Client) (*corev1.Secret, error) { uri := fmt.Sprintf("https://%s.%s", couchCtlConfig.BannerEdgeID, couchCtlConfig.DatasyncDNSName) replCreds := &couchdb.ReplicationCredentials{ UserCredentials: couchdb.UserCredentials{ UsernamePassword: couchdb.UsernamePassword{ Username: []byte("replication-user"), Password: []byte("replication-password"), }, URI: []byte(uri), }, DBName: []byte(couchCtlConfig.ReplicationDB()), } ref := dsapi.CloudReplicationCredentials() return replCreds.ToSecret(ctx, cl, types.NamespacedName{ Name: ref.Name, Namespace: ref.Namespace, }) } // couchdbServerClient get the http client for a CouchDBServer // TODO replace with helper function from pr #8029 func couchdbServerClient(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) (*couchdb.CouchDB, error) { creds, err := serverAdminCreds(ctx, cl, couchDBServer) if err != nil { return nil, err } cc := &couchdb.CouchDB{} return cc, cc.New(couchdb.Driver, string(creds.Username), string(creds.Password), server.Spec.URI, couchCtlConfig.CouchDBPort) } // resetBeforeTest delete status to enable clean run func resetBeforeTest(ctx context.Context, cl client.Client) error { apiVersion := dsapi.GroupVersion.String() err := resetStatus(ctx, cl, apiVersion, "CouchDBPersistence", client.ObjectKeyFromObject(couchDBPersistence)) if err != nil { return err } err = resetStatus(ctx, cl, apiVersion, "CouchDBServer", client.ObjectKeyFromObject(couchDBServer)) if err != nil { return err } err = resetStatus(ctx, cl, apiVersion, "CouchDBDatabase", client.ObjectKeyFromObject(couchDBDatabase)) if err != nil { return err } err = resetStatus(ctx, cl, apiVersion, "CouchDBUser", client.ObjectKeyFromObject(couchDBDUser)) if err != nil { return err } err = resetStatus(ctx, cl, apiVersion, "CouchDBReplicationSet", client.ObjectKeyFromObject(couchDBReplicationSet)) if err != nil { return err } return nil } //nolint:gocyclo func cleanUpBeforeTest(ctx context.Context, cl client.Client, tick, timeout time.Duration) error { if err := client.IgnoreNotFound(suspendPallet(ctx, cl, "couchdb-", true)); err != nil { return err } replications := &dsapi.CouchDBReplicationSetList{} err := cl.List(ctx, replications, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range replications.Items { item := replications.Items[i] err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } users := &dsapi.CouchDBUserList{} err = cl.List(ctx, users, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range users.Items { item := users.Items[i] controllerutil.RemoveFinalizer(&item, DatasyncFinalizer) err = client.IgnoreNotFound(cl.Update(ctx, &item)) if err != nil { return err } err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } databases := &dsapi.CouchDBDatabaseList{} err = cl.List(ctx, databases, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range databases.Items { item := databases.Items[i] err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } servers := &dsapi.CouchDBServerList{} err = cl.List(ctx, servers, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range servers.Items { item := servers.Items[i] err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } // NOTE: since we added finalizers deletion order matters pl := &dsapi.CouchDBPersistenceList{} err = cl.List(ctx, pl, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range pl.Items { item := pl.Items[i] err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } sts := &appsv1.StatefulSetList{} err = cl.List(ctx, sts, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range sts.Items { item := sts.Items[i] err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } pvc := &corev1.PersistentVolumeClaimList{} err = cl.List(ctx, pvc, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } for i := range pvc.Items { item := pvc.Items[i] item.Finalizers = nil err = client.IgnoreNotFound(cl.Update(ctx, &item)) if err != nil { return err } err = client.IgnoreNotFound(cl.Delete(ctx, &item)) if err != nil { return err } } apiVersion := dsapi.GroupVersion.String() err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBPersistence") if err != nil { return err } err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBReplicationSet") if err != nil { return err } err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBUser") if err != nil { return err } err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBDatabase") if err != nil { return err } err = waitForDeletedResources(cl, tick, timeout, apiVersion, "CouchDBServer") if err != nil { return err } err = waitForDeletedResources(cl, tick, timeout, corev1.SchemeGroupVersion.String(), "Pod") if err != nil { return err } err = waitForDeletedResources(cl, tick, timeout, corev1.SchemeGroupVersion.String(), "PersistentVolumeClaim") if err != nil { return err } if client.IgnoreNotFound(suspendPallet(ctx, cl, "couchdb-", false)) != nil { return err } return nil } func scaleDeployment(ctx context.Context, cl client.Client, nn types.NamespacedName, replicas int32) error { deployment := &appsv1.Deployment{} err := cl.Get(ctx, nn, deployment) if err != nil { return err } if *deployment.Spec.Replicas == replicas { return nil } patch := client.MergeFrom(deployment.DeepCopy()) deployment.Spec.Replicas = &replicas return cl.Patch(ctx, deployment, patch) } func suspendPallet(ctx context.Context, cl client.Client, prefix string, suspend bool) error { pallets := &whv1.UnpackedPalletList{} err := cl.List(ctx, pallets) if err != nil { return err } palletName := "" for _, item := range pallets.Items { if strings.HasPrefix(item.Name, prefix) { palletName = item.Name break } } if palletName == "" { return nil } pallet := &whv1.UnpackedPallet{} err = cl.Get(ctx, client.ObjectKey{Name: palletName}, pallet) if err != nil { return err } if pallet.Spec.Suspend == suspend { return nil } patch := client.MergeFrom(pallet.DeepCopy()) pallet.Spec.Suspend = suspend return cl.Patch(ctx, pallet, patch) } func waitForDeletedResources(cl client.Client, tick, timeout time.Duration, apiVersion, kind string) error { list := &unstructured.UnstructuredList{} list.SetAPIVersion(apiVersion) list.SetKind(kind) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for { select { default: err := cl.List(ctx, list, client.InNamespace(couchCtlConfig.CouchNamespace)) if err != nil { return err } if len(list.Items) == 0 { // objects are deleted return nil } time.Sleep(tick) case <-ctx.Done(): return fmt.Errorf("%s:%s in namespace %s not deleted on time", apiVersion, kind, couchCtlConfig.CouchNamespace) } } } func resetStatus(ctx context.Context, cl client.Client, apiVersion, kind string, key client.ObjectKey) error { un := &unstructured.Unstructured{} un.SetAPIVersion(apiVersion) un.SetKind(kind) err := cl.Get(ctx, key, un) switch { case k8errors.IsNotFound(err): return nil case err != nil: return err } unstructured.RemoveNestedField(un.UnstructuredContent(), "status") err = cl.Status().Update(ctx, un) return client.IgnoreNotFound(err) } func cloudIngress(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) error { ingress := &netv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: couchdb.CouchIngressName, Namespace: server.Namespace, }, Spec: netv1.IngressSpec{ DefaultBackend: &netv1.IngressBackend{ Service: &netv1.IngressServiceBackend{ Name: couchdb.Name, Port: netv1.ServiceBackendPort{ Number: 5984, }, }, }, }, } err := client.IgnoreAlreadyExists(cl.Create(ctx, ingress)) if err != nil { return err } patch := client.MergeFrom(ingress.DeepCopy()) ingress.Status = netv1.IngressStatus{ LoadBalancer: netv1.IngressLoadBalancerStatus{ Ingress: []netv1.IngressLoadBalancerIngress{ { IP: "127.0.0.1", }, }, }, } return cl.Status().Patch(ctx, ingress, patch) } func nodes(ctx context.Context, cl client.Client, totalNodes int) error { for i := 0; i < totalNodes; i++ { nodeName := couchdb.StoreServerName class := v1ien.Server role := v1ien.ControlPlane laneNumber := fmt.Sprintf("%d", i) nodeUID := couchCtlConfig.NodeUID if i > 0 { nodeName = fmt.Sprintf("%s-%s", couchdb.TouchpointName, laneNumber) class = v1ien.Touchpoint role = v1ien.Worker nodeUID = touchpointSever.Labels[couchdb.NodeUIDLabel] } node := &corev1.Node{ TypeMeta: metav1.TypeMeta{ Kind: "Node", APIVersion: "v1", }, ObjectMeta: metav1.ObjectMeta{ UID: types.UID(nodeUID), Name: nodeName, Labels: map[string]string{ nodemeta.ClassLabel: string(class), nodemeta.RoleLabel: string(role), nodemeta.LaneLabel: laneNumber, nodemeta.HostnameLabel: "ien-" + laneNumber, }, }, Spec: corev1.NodeSpec{}, } err := client.IgnoreAlreadyExists(cl.Create(ctx, node)) if err != nil { return err } mem := "1Gi" if i == 0 { mem = "4Gi" // LEADER IS SELECTED BY MEMORY AVAILABLE } node.Status.Allocatable = corev1.ResourceList{ corev1.ResourceMemory: resource.MustParse(mem), } err = cl.Status().Update(ctx, node) if err != nil { return err } } return nil } func ensureValidConfig() error { // for testing disable reconciliation couchCtlConfig.PollingInterval = 24 * time.Hour // couchdb should already be enabled for integration tests couchCtlConfig.EnablementWatchInterval = 24 * time.Hour couchCtlConfig.ReplicationPollingInterval = 24 * time.Hour if integration.IsL1() { uid := uuid.NewString() couchCtlConfig.BannerEdgeID = uuid.NewString() couchCtlConfig.ProjectID = "ret-edge-dev-test" couchCtlConfig.RequeueTime = 1 * time.Second couchCtlConfig.ServerNotReady = 1 * time.Second couchCtlConfig.DatabaseNotFound = 1 * time.Second couchCtlConfig.IngressNotReady = 1 * time.Second couchCtlConfig.CouchCTLNamespace = "couchctl" couchCtlConfig.CouchNamespace = "data-sync-couchdb" couchCtlConfig.CouchDBPort = strconv.Itoa(5984) couchCtlConfig.NodeUID = uid couchCtlConfig.NodeRole = string(v1ien.ControlPlane) couchCtlConfig.NodeClass = string(v1ien.Server) return couchCtlConfig.Validate() } // f2 framework does not parse ENV variables // TODO update f2 framework to parse ENV variables config, err := rest.InClusterConfig() if errors.Is(err, rest.ErrNotInCluster) { // for local testing config, err = clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config")) if err != nil { return err } } cl, err := client.New(config, client.Options{}) if err != nil { return err } ctx := context.Background() ei, err := info.FromClient(ctx, cl) if err != nil { return err } couchCtlConfig.ProjectID = ei.ProjectID couchCtlConfig.BannerEdgeID = ei.BannerEdgeID couchCtlConfig.FleetType = ei.Fleet couchCtlConfig.ClusterType = ei.ClusterType bi, err := bsl.FromClient(ctx, cl) if err == nil { // optional couchCtlConfig.SiteID = bi.ID } return couchCtlConfig.Validate() }