package couchctl import ( "errors" "fmt" "regexp" "slices" "strings" "maps" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" "edge-infra.dev/pkg/edge/apis/meta" persistenceApi "edge-infra.dev/pkg/edge/apis/persistence/v1alpha1" "edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/unstructured" v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" nodemeta "edge-infra.dev/pkg/sds/ien/node" ) type SubstitutionVar string const ( ServerName = SubstitutionVar("{server_name}") ServerType = SubstitutionVar("{sever_type}") LaneNumber = SubstitutionVar("{lane_number}") Suffix = SubstitutionVar("{suffix}") CouchDBStatefulSet = SubstitutionVar("{couchdb_sts}") ChirpStatefulSet = SubstitutionVar("{chirp_sts}") ReplicationDB = SubstitutionVar("{replication_db}") ReplicationSecret = SubstitutionVar("{replication_secret}") ReplicationSecretNS = SubstitutionVar("{replication_secret_ns}") NodeUID = SubstitutionVar("{node_uid}") LaneNumberSubstitutionMaxLength = 12 ChirpName = "chirp" ChirpOldName = "data-sync-messaging" ConfigMapUID = "node-uuid" ) type Substitution struct { Leader bool DSDS bool ServerName string ServerType dsapi.ServerType LaneNumber string Suffix string CouchDBStatefulSet string ReplicationDB string ReplicationSecret string ReplicationSecretNS string NodeUID string // optional for generic cluster, required for dsds cluster NodeName string // optional for generic cluster, required for dsds cluster ChirpStatefulSet string NodeInfo *nameutils.NodeInfo } func (s Substitution) IsTouchpoint() bool { return s.ServerType == dsapi.Touchpoint } func (s Substitution) NodeRole() v1ien.Role { if s.NodeInfo != nil { return s.NodeInfo.Role } return "" } func (s Substitution) NodeClass() v1ien.Class { if s.NodeInfo != nil { return s.NodeInfo.Class } return "" } var ( // Substitutions a set of valid mockSubstitutions values Substitutions = map[SubstitutionVar]struct{}{ ServerName: {}, ServerType: {}, LaneNumber: {}, Suffix: {}, CouchDBStatefulSet: {}, ChirpStatefulSet: {}, ReplicationDB: {}, ReplicationSecret: {}, ReplicationSecretNS: {}, NodeUID: {}, } substitutionMatcher = regexp.MustCompile(`\{[a-z_]+\}`) ErrInvalidSubstitutions = errors.New("invalid substitutions") ErrNoValueSubstitution = errors.New("no value found for mockSubstitutions") appsV1APIVersion = appsv1.SchemeGroupVersion.String() ) // ParseSubstitutions return a set of valid substitutions or error func ParseSubstitutions(yamlString string) ([]SubstitutionVar, error) { matches := substitutionMatcher.FindAllString(yamlString, -1) // set of substitutions substitutions := make(map[SubstitutionVar]struct{}) var invalidSubstitutions []string for _, match := range matches { su := SubstitutionVar(match) if _, found := Substitutions[su]; found { substitutions[su] = struct{}{} } else { invalidSubstitutions = append(invalidSubstitutions, match) } } if len(invalidSubstitutions) > 0 { return nil, fmt.Errorf("%w: %s ", ErrInvalidSubstitutions, strings.Join(invalidSubstitutions, ",")) } return slices.Collect(maps.Keys(substitutions)), nil } // ApplySubstitutions replace all mockSubstitutions values func ApplySubstitutions(res client.Object, su Substitution) (*unstructured.Unstructured, error) { un, err := unstructured.ToUnstructured(res) if err != nil { return nil, err } obj, err := toYAMLString(un) if err != nil { return nil, err } substitutions, err := ParseSubstitutions(obj) if err != nil { return nil, err } suVars := SubstitutionVars(su) for _, s := range substitutions { value := suVars[s] if value == "" && s != Suffix { return nil, fmt.Errorf("%w: %s", ErrNoValueSubstitution, s) } obj = strings.ReplaceAll(obj, string(s), value) } err = yaml.Unmarshal([]byte(obj), un) if err != nil { return nil, err } labels := un.GetLabels() if labels == nil { labels = map[string]string{} } labels[couchdb.SubstitutionLabel] = couchdb.LabelValueTrue if su.LaneNumber != "" { labels[nodemeta.LaneLabel] = su.LaneNumber } // for dsds or generic cluster we have a resource per node if su.NodeUID != "" { labels[couchdb.NodeUIDLabel] = su.NodeUID } if su.Leader { labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue } else { delete(labels, couchdb.NodeLeaderLabel) } un.SetLabels(labels) anno := un.GetAnnotations() if anno == nil { anno = map[string]string{} } if su.NodeName != "" { anno[couchdb.NodeNameAnnotation] = su.NodeName } un.SetAnnotations(anno) if un.GetKind() == "StatefulSet" && un.GetAPIVersion() == appsV1APIVersion { un, err = statefulSetNodeTargetingSubstitution(un, su) if err != nil { return nil, err } } return un, nil } func toYAMLString(un *unstructured.Unstructured) (string, error) { data, err := yaml.Marshal(un) return string(data), err } // StoreSubstitution generic cluster don't have node labels, so we can use hard coded values func StoreSubstitution(replDB string) Substitution { return Substitution{ Leader: false, // no leader in generic store server DSDS: false, ServerName: couchdb.StoreServerName, ServerType: dsapi.Store, LaneNumber: "", // generic store server dont have lane number Suffix: "", // generic store server dont have lane number suffix CouchDBStatefulSet: couchdb.Namespace, ChirpStatefulSet: ChirpOldName, ReplicationDB: replDB, ReplicationSecret: couchdb.StoreReplicationSecretName, ReplicationSecretNS: ControllerNamespace, NodeUID: "", } } // LaneSubstitution based on node labels // Node UID is used for suffix func LaneSubstitution(ni *nameutils.NodeInfo, nodeMapping map[string]map[string]string, replDB, leaderNodeUID string) Substitution { su := StoreSubstitution(replDB) su.Leader = ni.UID == leaderNodeUID su.DSDS = true su.NodeInfo = ni su.LaneNumber = ni.Lane su.NodeUID = ni.UID su.NodeName = ni.Name nodeUIDHash := meta.Hash(ni.UID) su.ServerName = fmt.Sprintf("%s-%s", couchdb.CouchDBName, su.NodeUID) cp := ni.Role == v1ien.ControlPlane couchDBPvcExists := oldPvcExists(nodeMapping, ni, couchdb.Namespace) // backwards compatibility chirpPvcExists := oldPvcExists(nodeMapping, ni, ChirpOldName) // backwards compatibility su.CouchDBStatefulSet = safeName(cp, couchDBPvcExists, couchdb.Namespace, fmt.Sprintf("%s-%s", couchdb.Namespace, ni.Lane), fmt.Sprintf("%s-%s", couchdb.CouchDBName, nodeUIDHash)) su.ChirpStatefulSet = safeName(cp, chirpPvcExists, ChirpOldName, fmt.Sprintf("%s-%s", ChirpOldName, ni.Lane), fmt.Sprintf("%s-%s", ChirpName, nodeUIDHash)) // can be removed after all clusters are updated su.Suffix = safeName(cp, couchDBPvcExists, "", "-"+ni.Lane, "-"+su.NodeUID) if !cp { su.ServerType = dsapi.Touchpoint } if su.Leader { // replicate from cloud couchdb su.ReplicationSecret = couchdb.StoreReplicationSecretName su.ReplicationSecretNS = ControllerNamespace } else { // replicate from leader couchdb su.ReplicationSecret = fmt.Sprintf("%s-%s", couchdb.CouchDBName, leaderNodeUID) su.ReplicationSecretNS = couchdb.Namespace } return su } func oldPvcExists(m map[string]map[string]string, ni *nameutils.NodeInfo, ns string) bool { if len(m) == 0 { return false } nodeSuffix := m[ns] if len(nodeSuffix) == 0 { return false } return len(nodeSuffix[ni.Name]) > 0 } func safeName(cp, oldPvc bool, cpOldValue, oldValue, newValue string) string { if oldPvc { if cp { return cpOldValue } return oldValue } return newValue } func SubstitutionVars(su Substitution) map[SubstitutionVar]string { return map[SubstitutionVar]string{ ServerName: su.ServerName, ServerType: string(su.ServerType), LaneNumber: su.LaneNumber, Suffix: su.Suffix, CouchDBStatefulSet: su.CouchDBStatefulSet, ChirpStatefulSet: su.ChirpStatefulSet, ReplicationDB: su.ReplicationDB, ReplicationSecret: su.ReplicationSecret, ReplicationSecretNS: su.ReplicationSecretNS, NodeUID: su.NodeUID, } } func ToSubstitution(s map[SubstitutionVar]string) Substitution { return Substitution{ ServerName: s[ServerName], ServerType: dsapi.ServerType(s[ServerType]), LaneNumber: s[LaneNumber], Suffix: s[Suffix], CouchDBStatefulSet: s[CouchDBStatefulSet], ChirpStatefulSet: s[ChirpStatefulSet], ReplicationDB: s[ReplicationDB], ReplicationSecret: s[ReplicationSecret], ReplicationSecretNS: s[ReplicationSecretNS], NodeUID: s[NodeUID], } } // statefulSetNodeTargetingSubstitution node targeting logic for StatefulSets func statefulSetNodeTargetingSubstitution(un *unstructured.Unstructured, su Substitution) (*unstructured.Unstructured, error) { sts := &appsv1.StatefulSet{} err := unstructured.FromUnstructured(un, sts) if err != nil { return nil, err } if sts.Spec.Template.Spec.Affinity == nil { sts.Spec.Template.Spec.Affinity = &corev1.Affinity{} } if sts.Spec.Template.Spec.Affinity.NodeAffinity == nil { sts.Spec.Template.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} } na := sts.Spec.Template.Spec.Affinity.NodeAffinity sts.Spec.Template.ObjectMeta.Labels[persistenceApi.InstanceLabel] = sts.Name sts.Spec.Selector.MatchLabels[persistenceApi.InstanceLabel] = sts.Name if su.Leader { sts.Spec.Template.ObjectMeta.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue } if su.DSDS { na.PreferredDuringSchedulingIgnoredDuringExecution = nil na.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{ NodeSelectorTerms: []corev1.NodeSelectorTerm{ { MatchExpressions: []corev1.NodeSelectorRequirement{ { Key: couchdb.NodeUIDLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{su.NodeUID}, }, }, }, }, } } else { na.RequiredDuringSchedulingIgnoredDuringExecution = nil na.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.PreferredSchedulingTerm{ { Weight: int32(100), Preference: corev1.NodeSelectorTerm{ MatchExpressions: []corev1.NodeSelectorRequirement{{ Key: nodemeta.RoleLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{string(v1ien.ControlPlane)}, }}, }, }, } } return unstructured.ToUnstructured(sts) }