...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/subsitutions.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"regexp"
     7  	"slices"
     8  	"strings"
     9  
    10  	"maps"
    11  
    12  	appsv1 "k8s.io/api/apps/v1"
    13  	corev1 "k8s.io/api/core/v1"
    14  	"sigs.k8s.io/controller-runtime/pkg/client"
    15  	"sigs.k8s.io/yaml"
    16  
    17  	"edge-infra.dev/pkg/edge/apis/meta"
    18  	persistenceApi "edge-infra.dev/pkg/edge/apis/persistence/v1alpha1"
    19  	"edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils"
    20  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    21  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    22  	"edge-infra.dev/pkg/k8s/unstructured"
    23  	v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
    24  	nodemeta "edge-infra.dev/pkg/sds/ien/node"
    25  )
    26  
    27  type SubstitutionVar string
    28  
    29  const (
    30  	ServerName                      = SubstitutionVar("{server_name}")
    31  	ServerType                      = SubstitutionVar("{sever_type}")
    32  	LaneNumber                      = SubstitutionVar("{lane_number}")
    33  	Suffix                          = SubstitutionVar("{suffix}")
    34  	CouchDBStatefulSet              = SubstitutionVar("{couchdb_sts}")
    35  	ChirpStatefulSet                = SubstitutionVar("{chirp_sts}")
    36  	ReplicationDB                   = SubstitutionVar("{replication_db}")
    37  	ReplicationSecret               = SubstitutionVar("{replication_secret}")
    38  	ReplicationSecretNS             = SubstitutionVar("{replication_secret_ns}")
    39  	NodeUID                         = SubstitutionVar("{node_uid}")
    40  	LaneNumberSubstitutionMaxLength = 12
    41  	ChirpName                       = "chirp"
    42  	ChirpOldName                    = "data-sync-messaging"
    43  	ConfigMapUID                    = "node-uuid"
    44  )
    45  
    46  type Substitution struct {
    47  	Leader              bool
    48  	DSDS                bool
    49  	ServerName          string
    50  	ServerType          dsapi.ServerType
    51  	LaneNumber          string
    52  	Suffix              string
    53  	CouchDBStatefulSet  string
    54  	ReplicationDB       string
    55  	ReplicationSecret   string
    56  	ReplicationSecretNS string
    57  	NodeUID             string // optional for generic cluster, required for dsds cluster
    58  	NodeName            string // optional for generic cluster, required for dsds cluster
    59  	ChirpStatefulSet    string
    60  	NodeInfo            *nameutils.NodeInfo
    61  }
    62  
    63  func (s Substitution) IsTouchpoint() bool {
    64  	return s.ServerType == dsapi.Touchpoint
    65  }
    66  
    67  func (s Substitution) NodeRole() v1ien.Role {
    68  	if s.NodeInfo != nil {
    69  		return s.NodeInfo.Role
    70  	}
    71  	return ""
    72  }
    73  
    74  func (s Substitution) NodeClass() v1ien.Class {
    75  	if s.NodeInfo != nil {
    76  		return s.NodeInfo.Class
    77  	}
    78  	return ""
    79  }
    80  
    81  var (
    82  	// Substitutions a set of valid mockSubstitutions values
    83  	Substitutions = map[SubstitutionVar]struct{}{
    84  		ServerName:          {},
    85  		ServerType:          {},
    86  		LaneNumber:          {},
    87  		Suffix:              {},
    88  		CouchDBStatefulSet:  {},
    89  		ChirpStatefulSet:    {},
    90  		ReplicationDB:       {},
    91  		ReplicationSecret:   {},
    92  		ReplicationSecretNS: {},
    93  		NodeUID:             {},
    94  	}
    95  	substitutionMatcher     = regexp.MustCompile(`\{[a-z_]+\}`)
    96  	ErrInvalidSubstitutions = errors.New("invalid substitutions")
    97  	ErrNoValueSubstitution  = errors.New("no value found for mockSubstitutions")
    98  	appsV1APIVersion        = appsv1.SchemeGroupVersion.String()
    99  )
   100  
   101  // ParseSubstitutions return a set of valid substitutions or error
   102  func ParseSubstitutions(yamlString string) ([]SubstitutionVar, error) {
   103  	matches := substitutionMatcher.FindAllString(yamlString, -1)
   104  	// set of substitutions
   105  	substitutions := make(map[SubstitutionVar]struct{})
   106  	var invalidSubstitutions []string
   107  	for _, match := range matches {
   108  		su := SubstitutionVar(match)
   109  		if _, found := Substitutions[su]; found {
   110  			substitutions[su] = struct{}{}
   111  		} else {
   112  			invalidSubstitutions = append(invalidSubstitutions, match)
   113  		}
   114  	}
   115  	if len(invalidSubstitutions) > 0 {
   116  		return nil, fmt.Errorf("%w: %s ", ErrInvalidSubstitutions, strings.Join(invalidSubstitutions, ","))
   117  	}
   118  	return slices.Collect(maps.Keys(substitutions)), nil
   119  }
   120  
   121  // ApplySubstitutions replace all mockSubstitutions values
   122  func ApplySubstitutions(res client.Object, su Substitution) (*unstructured.Unstructured, error) {
   123  	un, err := unstructured.ToUnstructured(res)
   124  	if err != nil {
   125  		return nil, err
   126  	}
   127  	obj, err := toYAMLString(un)
   128  	if err != nil {
   129  		return nil, err
   130  	}
   131  	substitutions, err := ParseSubstitutions(obj)
   132  	if err != nil {
   133  		return nil, err
   134  	}
   135  	suVars := SubstitutionVars(su)
   136  	for _, s := range substitutions {
   137  		value := suVars[s]
   138  		if value == "" && s != Suffix {
   139  			return nil, fmt.Errorf("%w: %s", ErrNoValueSubstitution, s)
   140  		}
   141  		obj = strings.ReplaceAll(obj, string(s), value)
   142  	}
   143  	err = yaml.Unmarshal([]byte(obj), un)
   144  	if err != nil {
   145  		return nil, err
   146  	}
   147  	labels := un.GetLabels()
   148  	if labels == nil {
   149  		labels = map[string]string{}
   150  	}
   151  	labels[couchdb.SubstitutionLabel] = couchdb.LabelValueTrue
   152  	if su.LaneNumber != "" {
   153  		labels[nodemeta.LaneLabel] = su.LaneNumber
   154  	}
   155  	// for dsds or generic cluster we have a resource per node
   156  	if su.NodeUID != "" {
   157  		labels[couchdb.NodeUIDLabel] = su.NodeUID
   158  	}
   159  	if su.Leader {
   160  		labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
   161  	} else {
   162  		delete(labels, couchdb.NodeLeaderLabel)
   163  	}
   164  	un.SetLabels(labels)
   165  
   166  	anno := un.GetAnnotations()
   167  	if anno == nil {
   168  		anno = map[string]string{}
   169  	}
   170  	if su.NodeName != "" {
   171  		anno[couchdb.NodeNameAnnotation] = su.NodeName
   172  	}
   173  	un.SetAnnotations(anno)
   174  
   175  	if un.GetKind() == "StatefulSet" && un.GetAPIVersion() == appsV1APIVersion {
   176  		un, err = statefulSetNodeTargetingSubstitution(un, su)
   177  		if err != nil {
   178  			return nil, err
   179  		}
   180  	}
   181  
   182  	return un, nil
   183  }
   184  
   185  func toYAMLString(un *unstructured.Unstructured) (string, error) {
   186  	data, err := yaml.Marshal(un)
   187  	return string(data), err
   188  }
   189  
   190  // StoreSubstitution generic cluster don't have node labels, so we can use hard coded values
   191  func StoreSubstitution(replDB string) Substitution {
   192  	return Substitution{
   193  		Leader:              false, // no leader in generic store server
   194  		DSDS:                false,
   195  		ServerName:          couchdb.StoreServerName,
   196  		ServerType:          dsapi.Store,
   197  		LaneNumber:          "", // generic store server dont have lane number
   198  		Suffix:              "", // generic store server dont have lane number suffix
   199  		CouchDBStatefulSet:  couchdb.Namespace,
   200  		ChirpStatefulSet:    ChirpOldName,
   201  		ReplicationDB:       replDB,
   202  		ReplicationSecret:   couchdb.StoreReplicationSecretName,
   203  		ReplicationSecretNS: ControllerNamespace,
   204  		NodeUID:             "",
   205  	}
   206  }
   207  
   208  // LaneSubstitution based on node labels
   209  // Node UID is used for suffix
   210  func LaneSubstitution(ni *nameutils.NodeInfo, nodeMapping map[string]map[string]string, replDB, leaderNodeUID string) Substitution {
   211  	su := StoreSubstitution(replDB)
   212  	su.Leader = ni.UID == leaderNodeUID
   213  	su.DSDS = true
   214  	su.NodeInfo = ni
   215  	su.LaneNumber = ni.Lane
   216  	su.NodeUID = ni.UID
   217  	su.NodeName = ni.Name
   218  
   219  	nodeUIDHash := meta.Hash(ni.UID)
   220  	su.ServerName = fmt.Sprintf("%s-%s", couchdb.CouchDBName, su.NodeUID)
   221  
   222  	cp := ni.Role == v1ien.ControlPlane
   223  	couchDBPvcExists := oldPvcExists(nodeMapping, ni, couchdb.Namespace) // backwards compatibility
   224  	chirpPvcExists := oldPvcExists(nodeMapping, ni, ChirpOldName)        // backwards compatibility
   225  
   226  	su.CouchDBStatefulSet = safeName(cp, couchDBPvcExists, couchdb.Namespace,
   227  		fmt.Sprintf("%s-%s", couchdb.Namespace, ni.Lane),
   228  		fmt.Sprintf("%s-%s", couchdb.CouchDBName, nodeUIDHash))
   229  
   230  	su.ChirpStatefulSet = safeName(cp, chirpPvcExists, ChirpOldName,
   231  		fmt.Sprintf("%s-%s", ChirpOldName, ni.Lane),
   232  		fmt.Sprintf("%s-%s", ChirpName, nodeUIDHash))
   233  
   234  	// can be removed after all clusters are updated
   235  	su.Suffix = safeName(cp, couchDBPvcExists, "", "-"+ni.Lane, "-"+su.NodeUID)
   236  
   237  	if !cp {
   238  		su.ServerType = dsapi.Touchpoint
   239  	}
   240  
   241  	if su.Leader { // replicate from cloud couchdb
   242  		su.ReplicationSecret = couchdb.StoreReplicationSecretName
   243  		su.ReplicationSecretNS = ControllerNamespace
   244  	} else { // replicate from leader couchdb
   245  		su.ReplicationSecret = fmt.Sprintf("%s-%s", couchdb.CouchDBName, leaderNodeUID)
   246  		su.ReplicationSecretNS = couchdb.Namespace
   247  	}
   248  	return su
   249  }
   250  
   251  func oldPvcExists(m map[string]map[string]string, ni *nameutils.NodeInfo, ns string) bool {
   252  	if len(m) == 0 {
   253  		return false
   254  	}
   255  	nodeSuffix := m[ns]
   256  	if len(nodeSuffix) == 0 {
   257  		return false
   258  	}
   259  	return len(nodeSuffix[ni.Name]) > 0
   260  }
   261  
   262  func safeName(cp, oldPvc bool, cpOldValue, oldValue, newValue string) string {
   263  	if oldPvc {
   264  		if cp {
   265  			return cpOldValue
   266  		}
   267  		return oldValue
   268  	}
   269  	return newValue
   270  }
   271  
   272  func SubstitutionVars(su Substitution) map[SubstitutionVar]string {
   273  	return map[SubstitutionVar]string{
   274  		ServerName:          su.ServerName,
   275  		ServerType:          string(su.ServerType),
   276  		LaneNumber:          su.LaneNumber,
   277  		Suffix:              su.Suffix,
   278  		CouchDBStatefulSet:  su.CouchDBStatefulSet,
   279  		ChirpStatefulSet:    su.ChirpStatefulSet,
   280  		ReplicationDB:       su.ReplicationDB,
   281  		ReplicationSecret:   su.ReplicationSecret,
   282  		ReplicationSecretNS: su.ReplicationSecretNS,
   283  		NodeUID:             su.NodeUID,
   284  	}
   285  }
   286  
   287  func ToSubstitution(s map[SubstitutionVar]string) Substitution {
   288  	return Substitution{
   289  		ServerName:          s[ServerName],
   290  		ServerType:          dsapi.ServerType(s[ServerType]),
   291  		LaneNumber:          s[LaneNumber],
   292  		Suffix:              s[Suffix],
   293  		CouchDBStatefulSet:  s[CouchDBStatefulSet],
   294  		ChirpStatefulSet:    s[ChirpStatefulSet],
   295  		ReplicationDB:       s[ReplicationDB],
   296  		ReplicationSecret:   s[ReplicationSecret],
   297  		ReplicationSecretNS: s[ReplicationSecretNS],
   298  		NodeUID:             s[NodeUID],
   299  	}
   300  }
   301  
   302  // statefulSetNodeTargetingSubstitution node targeting logic for StatefulSets
   303  func statefulSetNodeTargetingSubstitution(un *unstructured.Unstructured, su Substitution) (*unstructured.Unstructured, error) {
   304  	sts := &appsv1.StatefulSet{}
   305  	err := unstructured.FromUnstructured(un, sts)
   306  	if err != nil {
   307  		return nil, err
   308  	}
   309  
   310  	if sts.Spec.Template.Spec.Affinity == nil {
   311  		sts.Spec.Template.Spec.Affinity = &corev1.Affinity{}
   312  	}
   313  	if sts.Spec.Template.Spec.Affinity.NodeAffinity == nil {
   314  		sts.Spec.Template.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
   315  	}
   316  	na := sts.Spec.Template.Spec.Affinity.NodeAffinity
   317  
   318  	sts.Spec.Template.ObjectMeta.Labels[persistenceApi.InstanceLabel] = sts.Name
   319  	sts.Spec.Selector.MatchLabels[persistenceApi.InstanceLabel] = sts.Name
   320  	if su.Leader {
   321  		sts.Spec.Template.ObjectMeta.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
   322  	}
   323  
   324  	if su.DSDS {
   325  		na.PreferredDuringSchedulingIgnoredDuringExecution = nil
   326  		na.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{
   327  			NodeSelectorTerms: []corev1.NodeSelectorTerm{
   328  				{
   329  					MatchExpressions: []corev1.NodeSelectorRequirement{
   330  						{
   331  							Key:      couchdb.NodeUIDLabel,
   332  							Operator: corev1.NodeSelectorOpIn,
   333  							Values:   []string{su.NodeUID},
   334  						},
   335  					},
   336  				},
   337  			},
   338  		}
   339  	} else {
   340  		na.RequiredDuringSchedulingIgnoredDuringExecution = nil
   341  		na.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.PreferredSchedulingTerm{
   342  			{
   343  				Weight: int32(100),
   344  				Preference: corev1.NodeSelectorTerm{
   345  					MatchExpressions: []corev1.NodeSelectorRequirement{{
   346  						Key:      nodemeta.RoleLabel,
   347  						Operator: corev1.NodeSelectorOpIn,
   348  						Values:   []string{string(v1ien.ControlPlane)},
   349  					}},
   350  				},
   351  			},
   352  		}
   353  	}
   354  	return unstructured.ToUnstructured(sts)
   355  }
   356  

View as plain text