package couchctl import ( "fmt" "testing" persistenceApi "edge-infra.dev/pkg/edge/apis/persistence/v1alpha1" "edge-infra.dev/pkg/edge/constants" "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" "edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/testing/kmp" v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" nodemeta "edge-infra.dev/pkg/sds/ien/node" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/integration" "edge-infra.dev/test/f2/x/ktest" "github.com/stretchr/testify/require" "gotest.tools/v3/poll" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) func TestPersistenceController(t *testing.T) { nodes := &corev1.NodeList{} leaderNode := &corev1.Node{} fin := f2.NewFeature("PersistenceController"). WithLabel(_fleetType, fleet.Store). WithLabel(_clusterType, cluster.Generic, cluster.DSDS). Setup("Node Exists For Persistence", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) k.WaitOn(t, func(_ poll.LogT) poll.Result { if err := k.Client.List(ctx, nodes); err != nil { return poll.Error(err) } if len(nodes.Items) > 0 { return poll.Success() } return poll.Continue("No K8s nodes found for Persistence Controller") }) require.NoError(t, client.IgnoreAlreadyExists(k.Client.Create(ctx, couchDBPersistence))) return ctx }). Test("CouchDBPersistence Ready", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) k.WaitOn(t, k.Check(couchDBPersistence, kmp.IsReady())) return ctx }). Test("Leader Node Elected", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) k.WaitOn(t, func(_ poll.LogT) poll.Result { leaderNodes := &corev1.NodeList{} if err := k.Client.List(ctx, leaderNodes, client.MatchingLabels{ couchdb.NodeLeaderLabel: couchdb.LabelValueTrue, }); err != nil { return poll.Error(err) } if len(leaderNodes.Items) == 1 && leaderNodes.Items[0].Labels[nodemeta.ClassLabel] == string(v1ien.Server) { leaderNode = &leaderNodes.Items[0] return poll.Success() } return poll.Continue("expected a leader node, node(s) found: %d", len(leaderNodes.Items)) }) return ctx }). Test("CouchDBPersistence Servers Created", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) for _, ni := range nodeInfo(nodes) { su := LaneSubstitution(ni, nil, couchCtlConfig.ReplicationDB(), string(leaderNode.UID)) for _, server := range couchDBPersistence.Spec.Servers { // make a copy of the resource to update its name _server := server _server.Name = fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName) k.WaitOn(t, k.ObjExists(&_server)) require.Equal(t, _server.Annotations[couchdb.StatefulSetLabel], su.CouchDBStatefulSet) require.Equal(t, _server.Labels[nodemeta.LaneLabel], su.LaneNumber) require.Equal(t, _server.Spec.Type, su.ServerType) } } return ctx }). Test("CouchDBPersistence Databases Created", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) for _, ni := range nodeInfo(nodes) { su := LaneSubstitution(ni, nil, couchCtlConfig.ReplicationDB(), string(leaderNode.UID)) for _, database := range couchDBPersistence.Spec.Databases { // make a copy of the resource to update its name _database := database _database.Name = fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName) k.WaitOn(t, k.ObjExists(&_database)) if couchCtlConfig.IsDSDS() { require.NotEmpty(t, _database.Labels[couchdb.NodeUIDLabel]) } require.Equal(t, _database.Spec.Name, fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName)) require.Equal(t, _database.Spec.Security.Admins.Names[0], fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName)) require.Equal(t, _database.Spec.ServerRef.Name, fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName)) } } return ctx }). Test("CouchDBPersistence Users Created", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) for _, ni := range nodeInfo(nodes) { su := LaneSubstitution(ni, nil, couchCtlConfig.ReplicationDB(), string(leaderNode.UID)) for _, user := range couchDBPersistence.Spec.Users { // make a copy of the resource to update its name _user := user _user.Name = fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName) k.WaitOn(t, k.ObjExists(&_user)) if couchCtlConfig.IsDSDS() { require.NotEmpty(t, _user.Labels[couchdb.NodeUIDLabel]) } require.Equal(t, _user.Spec.User.Name, fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName)) require.Equal(t, _user.Spec.Provider.Name, fmt.Sprintf("%s-provider", couchDBPersistence.Name)) } } return ctx }). Test("CouchDBPersistence Replications Created", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) for _, ni := range nodeInfo(nodes) { su := LaneSubstitution(ni, nil, couchCtlConfig.ReplicationDB(), string(leaderNode.UID)) for _, replication := range couchDBPersistence.Spec.Replications { // make a copy of the resource to update its name _replication := replication _replication.Name = fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName) k.WaitOn(t, k.ObjExists(&_replication)) if couchCtlConfig.IsDSDS() { require.NotEmpty(t, _replication.Labels[couchdb.NodeUIDLabel]) } //require.Equal(_replication.Spec.Datasets[0].Name, fmt.Sprintf("%s-%s", couchDBPersistence.Name, couchCtlConfig.ReplicationDB())) require.Equal(t, _replication.Spec.Datasets[0].Provider.Name, "replication-user") require.Equal(t, _replication.Spec.Source.Name, su.ReplicationSecret) require.Equal(t, _replication.Spec.Target.Name, fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.ServerName)) } } return ctx }). Test("CouchDBPersistence StatefulSets Created", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) for _, ni := range nodeInfo(nodes) { su := LaneSubstitution(ni, nil, couchCtlConfig.ReplicationDB(), string(leaderNode.UID)) for _, sts := range couchDBPersistence.Spec.StatefulSets { // make a copy of the resource to update its name sts := sts sts.Name = fmt.Sprintf("%s-%s", couchDBPersistence.Name, su.CouchDBStatefulSet) k.WaitOn(t, k.ObjExists(&sts)) require.Equal(t, sts.Spec.Template.Spec.Volumes[0].Name, "config") require.Equal(t, sts.Spec.Template.Spec.Volumes[0].ConfigMap.Name, su.CouchDBStatefulSet) require.Equal(t, sts.Spec.Selector.MatchLabels[persistenceApi.InstanceLabel], sts.Name) require.Equal(t, sts.Spec.Template.ObjectMeta.Labels[persistenceApi.InstanceLabel], sts.Name) require.Nil(t, sts.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) require.NotNil(t, sts.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution) if couchCtlConfig.IsDSDS() { require.NotEmpty(t, sts.Labels[couchdb.NodeUIDLabel]) } if su.ServerType == dsapi.Store { require.Equal(t, sts.Spec.Template.ObjectMeta.Labels[couchdb.NodeLeaderLabel], couchdb.LabelValueTrue) } } } return ctx }). Feature() f.Test(t, fin) } func newCouchDBPersistence(name string) *dsapi.CouchDBPersistence { persistName := fmt.Sprintf("pst-%s", name) server := dsapi.NewStoreCouchDBServer() // add substitutions server.Name = fmt.Sprintf("%s-%s", persistName, string(ServerName)) server.Annotations[couchdb.StatefulSetLabel] = string(CouchDBStatefulSet) server.Spec.Type = dsapi.ServerType(ServerType) // TODO create real couchdb STS and it server.Spec.URI = fmt.Sprintf("%s-0.data-sync-couchdb.data-sync-couchdb.svc.cluster.local", CouchDBStatefulSet) if integration.IsL1() { server.Spec.URI = defaultHost } db := newCouchDBDatabase(persistName, server) // add CouchDBDatabase substitutions db.Name = fmt.Sprintf("%s-%s", persistName, string(ServerName)) db.Spec.Name = fmt.Sprintf("%s-%s", persistName, string(ServerName)) db.Spec.Security.Admins.Names[0] = fmt.Sprintf("%s-%s", persistName, string(ServerName)) db.Spec.ServerRef.Name = server.Name user := newCouchDBUser(persistName, persistName, server) // add CouchDBUser substitutions user.Name = fmt.Sprintf("%s-%s", persistName, string(ServerName)) user.Spec.User.Name = fmt.Sprintf("%s-%s", persistName, string(ServerName)) repl := newCouchDBReplicationSet(persistName, server) // add CouchDBReplicationSet substitutions repl.Name = fmt.Sprintf("%s-%s", persistName, string(ServerName)) repl.Spec.Datasets[0].Name = string(ReplicationDB) //repl.Spec.Datasets[0].Name = fmt.Sprintf("%s-%s", persistName, string(ReplicationDB)) repl.Spec.Source.Name = string(ReplicationSecret) repl.Spec.Source.Namespace = string(ReplicationSecretNS) repl.Spec.Target.Name = server.Name // StatefulSet substitutions stsName := fmt.Sprintf("%s-%s", persistName, string(CouchDBStatefulSet)) configMapName := string(CouchDBStatefulSet) sts := newStatefulSet(stsName, server, configMapName) return &dsapi.CouchDBPersistence{ TypeMeta: metav1.TypeMeta{ APIVersion: dsapi.GroupVersion.String(), Kind: "CouchDBPersistence", }, ObjectMeta: metav1.ObjectMeta{ Name: persistName, Namespace: server.Namespace, }, Spec: dsapi.CouchDBPersistenceSpec{ Servers: []dsapi.CouchDBServer{ *server, }, Databases: []dsapi.CouchDBDatabase{ *db, }, Users: []dsapi.CouchDBUser{ *user, }, Replications: []dsapi.CouchDBReplicationSet{ *repl, }, StatefulSets: []appsv1.StatefulSet{ *sts, }, }, } } func nodeInfo(nodes *corev1.NodeList) []*nameutils.NodeInfo { if couchCtlConfig.IsDSDS() { var info []*nameutils.NodeInfo for i := range nodes.Items { node := nodes.Items[i] ni, _ := nameutils.GetNodeInfo(node, LaneNumberSubstitutionMaxLength) info = append(info, ni) } return info } return []*nameutils.NodeInfo{ { Hostname: "", Lane: "", Class: v1ien.Server, Role: v1ien.ControlPlane, }, } } func newStatefulSet(name string, server *dsapi.CouchDBServer, configMapMount string) *appsv1.StatefulSet { return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ APIVersion: appsv1.SchemeGroupVersion.String(), Kind: "StatefulSet", }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: server.Namespace, Labels: map[string]string{ "platform.edge.ncr.com/component": "data-sync-couchdb", }, }, Spec: appsv1.StatefulSetSpec{ PodManagementPolicy: appsv1.ParallelPodManagement, ServiceName: server.Namespace, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "platform.edge.ncr.com/component": "data-sync-couchdb", }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ "platform.edge.ncr.com/component": "data-sync-couchdb", }, }, Spec: corev1.PodSpec{ ServiceAccountName: server.Namespace, InitContainers: []corev1.Container{ { Name: "init-copy", Image: "us-east1-docker.pkg.dev/ret-edge-pltf-infra/thirdparty/index.docker.io/library/busybox:latest", // Test for copying generated configmap per node for couchdb touchpoint servers Command: []string{"sh", "-c", "cp /tmp/chart.ini /default.d; ls -lrt /default.d;"}, VolumeMounts: []corev1.VolumeMount{ { Name: "config", MountPath: "/tmp/", }, { Name: "config-storage", MountPath: "/default.d", }, }, ImagePullPolicy: corev1.PullIfNotPresent, }, }, Containers: []corev1.Container{ { Name: "couchdb", Image: "us-east1-docker.pkg.dev/ret-edge-pltf-infra/thirdparty/couchdb:3.3.1", Ports: []corev1.ContainerPort{ { Name: "couchdb", ContainerPort: int32(5984), }, { Name: "epmd", ContainerPort: int32(4369), }, { ContainerPort: int32(9100), }, }, Env: []corev1.EnvVar{ { Name: "COUCHDB_USER", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: couchdb.StoreSecretName, }, Key: couchdb.SecretUsername, }, }, }, { Name: "COUCHDB_PASSWORD", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: couchdb.StoreSecretName, }, Key: couchdb.SecretPassword, }, }, }, { Name: "COUCHDB_SECRET", ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: couchdb.StoreSecretName, }, Key: couchdb.SecretCookieName, }, }, }, { Name: "couch_namespace", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "metadata.namespace", }, }, }, { Name: "ERL_FLAGS", Value: "-name couchdb -setcookie $(COUCHDB_SECRET)", }, }, Resources: corev1.ResourceRequirements{ Limits: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceCPU: resource.MustParse("4"), corev1.ResourceMemory: resource.MustParse("4Gi"), }, Requests: map[corev1.ResourceName]resource.Quantity{ corev1.ResourceCPU: resource.MustParse("50m"), corev1.ResourceMemory: resource.MustParse("150Mi"), }, }, VolumeMounts: []corev1.VolumeMount{ { Name: "config-storage", MountPath: "/opt/couchdb/etc/default.d", }, { Name: "database-storage", MountPath: "/opt/couchdb/data", }, }, LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ Exec: &corev1.ExecAction{ Command: []string{"curl", "$COUCHDB_USER:$COUCHDB_PASSWORD@localhost:5984/_up"}, }, }, FailureThreshold: 3, PeriodSeconds: 10, SuccessThreshold: 1, TimeoutSeconds: 5, }, ReadinessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ Exec: &corev1.ExecAction{ Command: []string{"curl", "$COUCHDB_USER:$COUCHDB_PASSWORD@localhost:5984/_up"}, }, }, FailureThreshold: 3, InitialDelaySeconds: 0, PeriodSeconds: 10, SuccessThreshold: 1, TimeoutSeconds: 1, }, }, }, Volumes: []corev1.Volume{ { Name: "config", VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: configMapMount, }, Items: []corev1.KeyToPath{{ Key: IniFileKey, Path: "chart.ini", }}, }, }, }, { Name: "config-storage", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, { Name: couchdb.StoreSecretName, VolumeSource: corev1.VolumeSource{ Secret: &corev1.SecretVolumeSource{ SecretName: couchdb.StoreSecretName, }, }, }, }, ImagePullSecrets: []corev1.LocalObjectReference{ { Name: constants.EdgeDockerSecret, }, }, }, }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ { ObjectMeta: metav1.ObjectMeta{ Name: "database-storage", Labels: map[string]string{ "platform.edge.ncr.com/component": "data-sync-couchdb", }, }, Spec: corev1.PersistentVolumeClaimSpec{ Resources: corev1.VolumeResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceStorage: resource.MustParse("10Gi"), }, }, AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}, }, }, }, }, } }