1 package couchctl
2
3 import (
4 "errors"
5 "fmt"
6 "regexp"
7 "slices"
8 "strings"
9
10 "maps"
11
12 appsv1 "k8s.io/api/apps/v1"
13 corev1 "k8s.io/api/core/v1"
14 "sigs.k8s.io/controller-runtime/pkg/client"
15 "sigs.k8s.io/yaml"
16
17 "edge-infra.dev/pkg/edge/apis/meta"
18 persistenceApi "edge-infra.dev/pkg/edge/apis/persistence/v1alpha1"
19 "edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils"
20 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
21 "edge-infra.dev/pkg/edge/datasync/couchdb"
22 "edge-infra.dev/pkg/k8s/unstructured"
23 v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
24 nodemeta "edge-infra.dev/pkg/sds/ien/node"
25 )
26
27 type SubstitutionVar string
28
29 const (
30 ServerName = SubstitutionVar("{server_name}")
31 ServerType = SubstitutionVar("{sever_type}")
32 LaneNumber = SubstitutionVar("{lane_number}")
33 Suffix = SubstitutionVar("{suffix}")
34 CouchDBStatefulSet = SubstitutionVar("{couchdb_sts}")
35 ChirpStatefulSet = SubstitutionVar("{chirp_sts}")
36 ReplicationDB = SubstitutionVar("{replication_db}")
37 ReplicationSecret = SubstitutionVar("{replication_secret}")
38 ReplicationSecretNS = SubstitutionVar("{replication_secret_ns}")
39 NodeUID = SubstitutionVar("{node_uid}")
40 LaneNumberSubstitutionMaxLength = 12
41 ChirpName = "chirp"
42 ChirpOldName = "data-sync-messaging"
43 ConfigMapUID = "node-uuid"
44 )
45
46 type Substitution struct {
47 Leader bool
48 DSDS bool
49 ServerName string
50 ServerType dsapi.ServerType
51 LaneNumber string
52 Suffix string
53 CouchDBStatefulSet string
54 ReplicationDB string
55 ReplicationSecret string
56 ReplicationSecretNS string
57 NodeUID string
58 NodeName string
59 ChirpStatefulSet string
60 NodeInfo *nameutils.NodeInfo
61 }
62
63 func (s Substitution) IsTouchpoint() bool {
64 return s.ServerType == dsapi.Touchpoint
65 }
66
67 func (s Substitution) NodeRole() v1ien.Role {
68 if s.NodeInfo != nil {
69 return s.NodeInfo.Role
70 }
71 return ""
72 }
73
74 func (s Substitution) NodeClass() v1ien.Class {
75 if s.NodeInfo != nil {
76 return s.NodeInfo.Class
77 }
78 return ""
79 }
80
81 var (
82
83 Substitutions = map[SubstitutionVar]struct{}{
84 ServerName: {},
85 ServerType: {},
86 LaneNumber: {},
87 Suffix: {},
88 CouchDBStatefulSet: {},
89 ChirpStatefulSet: {},
90 ReplicationDB: {},
91 ReplicationSecret: {},
92 ReplicationSecretNS: {},
93 NodeUID: {},
94 }
95 substitutionMatcher = regexp.MustCompile(`\{[a-z_]+\}`)
96 ErrInvalidSubstitutions = errors.New("invalid substitutions")
97 ErrNoValueSubstitution = errors.New("no value found for mockSubstitutions")
98 appsV1APIVersion = appsv1.SchemeGroupVersion.String()
99 )
100
101
102 func ParseSubstitutions(yamlString string) ([]SubstitutionVar, error) {
103 matches := substitutionMatcher.FindAllString(yamlString, -1)
104
105 substitutions := make(map[SubstitutionVar]struct{})
106 var invalidSubstitutions []string
107 for _, match := range matches {
108 su := SubstitutionVar(match)
109 if _, found := Substitutions[su]; found {
110 substitutions[su] = struct{}{}
111 } else {
112 invalidSubstitutions = append(invalidSubstitutions, match)
113 }
114 }
115 if len(invalidSubstitutions) > 0 {
116 return nil, fmt.Errorf("%w: %s ", ErrInvalidSubstitutions, strings.Join(invalidSubstitutions, ","))
117 }
118 return slices.Collect(maps.Keys(substitutions)), nil
119 }
120
121
122 func ApplySubstitutions(res client.Object, su Substitution) (*unstructured.Unstructured, error) {
123 un, err := unstructured.ToUnstructured(res)
124 if err != nil {
125 return nil, err
126 }
127 obj, err := toYAMLString(un)
128 if err != nil {
129 return nil, err
130 }
131 substitutions, err := ParseSubstitutions(obj)
132 if err != nil {
133 return nil, err
134 }
135 suVars := SubstitutionVars(su)
136 for _, s := range substitutions {
137 value := suVars[s]
138 if value == "" && s != Suffix {
139 return nil, fmt.Errorf("%w: %s", ErrNoValueSubstitution, s)
140 }
141 obj = strings.ReplaceAll(obj, string(s), value)
142 }
143 err = yaml.Unmarshal([]byte(obj), un)
144 if err != nil {
145 return nil, err
146 }
147 labels := un.GetLabels()
148 if labels == nil {
149 labels = map[string]string{}
150 }
151 labels[couchdb.SubstitutionLabel] = couchdb.LabelValueTrue
152 if su.LaneNumber != "" {
153 labels[nodemeta.LaneLabel] = su.LaneNumber
154 }
155
156 if su.NodeUID != "" {
157 labels[couchdb.NodeUIDLabel] = su.NodeUID
158 }
159 if su.Leader {
160 labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
161 } else {
162 delete(labels, couchdb.NodeLeaderLabel)
163 }
164 un.SetLabels(labels)
165
166 anno := un.GetAnnotations()
167 if anno == nil {
168 anno = map[string]string{}
169 }
170 if su.NodeName != "" {
171 anno[couchdb.NodeNameAnnotation] = su.NodeName
172 }
173 un.SetAnnotations(anno)
174
175 if un.GetKind() == "StatefulSet" && un.GetAPIVersion() == appsV1APIVersion {
176 un, err = statefulSetNodeTargetingSubstitution(un, su)
177 if err != nil {
178 return nil, err
179 }
180 }
181
182 return un, nil
183 }
184
185 func toYAMLString(un *unstructured.Unstructured) (string, error) {
186 data, err := yaml.Marshal(un)
187 return string(data), err
188 }
189
190
191 func StoreSubstitution(replDB string) Substitution {
192 return Substitution{
193 Leader: false,
194 DSDS: false,
195 ServerName: couchdb.StoreServerName,
196 ServerType: dsapi.Store,
197 LaneNumber: "",
198 Suffix: "",
199 CouchDBStatefulSet: couchdb.Namespace,
200 ChirpStatefulSet: ChirpOldName,
201 ReplicationDB: replDB,
202 ReplicationSecret: couchdb.StoreReplicationSecretName,
203 ReplicationSecretNS: ControllerNamespace,
204 NodeUID: "",
205 }
206 }
207
208
209
210 func LaneSubstitution(ni *nameutils.NodeInfo, nodeMapping map[string]map[string]string, replDB, leaderNodeUID string) Substitution {
211 su := StoreSubstitution(replDB)
212 su.Leader = ni.UID == leaderNodeUID
213 su.DSDS = true
214 su.NodeInfo = ni
215 su.LaneNumber = ni.Lane
216 su.NodeUID = ni.UID
217 su.NodeName = ni.Name
218
219 nodeUIDHash := meta.Hash(ni.UID)
220 su.ServerName = fmt.Sprintf("%s-%s", couchdb.CouchDBName, su.NodeUID)
221
222 cp := ni.Role == v1ien.ControlPlane
223 couchDBPvcExists := oldPvcExists(nodeMapping, ni, couchdb.Namespace)
224 chirpPvcExists := oldPvcExists(nodeMapping, ni, ChirpOldName)
225
226 su.CouchDBStatefulSet = safeName(cp, couchDBPvcExists, couchdb.Namespace,
227 fmt.Sprintf("%s-%s", couchdb.Namespace, ni.Lane),
228 fmt.Sprintf("%s-%s", couchdb.CouchDBName, nodeUIDHash))
229
230 su.ChirpStatefulSet = safeName(cp, chirpPvcExists, ChirpOldName,
231 fmt.Sprintf("%s-%s", ChirpOldName, ni.Lane),
232 fmt.Sprintf("%s-%s", ChirpName, nodeUIDHash))
233
234
235 su.Suffix = safeName(cp, couchDBPvcExists, "", "-"+ni.Lane, "-"+su.NodeUID)
236
237 if !cp {
238 su.ServerType = dsapi.Touchpoint
239 }
240
241 if su.Leader {
242 su.ReplicationSecret = couchdb.StoreReplicationSecretName
243 su.ReplicationSecretNS = ControllerNamespace
244 } else {
245 su.ReplicationSecret = fmt.Sprintf("%s-%s", couchdb.CouchDBName, leaderNodeUID)
246 su.ReplicationSecretNS = couchdb.Namespace
247 }
248 return su
249 }
250
251 func oldPvcExists(m map[string]map[string]string, ni *nameutils.NodeInfo, ns string) bool {
252 if len(m) == 0 {
253 return false
254 }
255 nodeSuffix := m[ns]
256 if len(nodeSuffix) == 0 {
257 return false
258 }
259 return len(nodeSuffix[ni.Name]) > 0
260 }
261
262 func safeName(cp, oldPvc bool, cpOldValue, oldValue, newValue string) string {
263 if oldPvc {
264 if cp {
265 return cpOldValue
266 }
267 return oldValue
268 }
269 return newValue
270 }
271
272 func SubstitutionVars(su Substitution) map[SubstitutionVar]string {
273 return map[SubstitutionVar]string{
274 ServerName: su.ServerName,
275 ServerType: string(su.ServerType),
276 LaneNumber: su.LaneNumber,
277 Suffix: su.Suffix,
278 CouchDBStatefulSet: su.CouchDBStatefulSet,
279 ChirpStatefulSet: su.ChirpStatefulSet,
280 ReplicationDB: su.ReplicationDB,
281 ReplicationSecret: su.ReplicationSecret,
282 ReplicationSecretNS: su.ReplicationSecretNS,
283 NodeUID: su.NodeUID,
284 }
285 }
286
287 func ToSubstitution(s map[SubstitutionVar]string) Substitution {
288 return Substitution{
289 ServerName: s[ServerName],
290 ServerType: dsapi.ServerType(s[ServerType]),
291 LaneNumber: s[LaneNumber],
292 Suffix: s[Suffix],
293 CouchDBStatefulSet: s[CouchDBStatefulSet],
294 ChirpStatefulSet: s[ChirpStatefulSet],
295 ReplicationDB: s[ReplicationDB],
296 ReplicationSecret: s[ReplicationSecret],
297 ReplicationSecretNS: s[ReplicationSecretNS],
298 NodeUID: s[NodeUID],
299 }
300 }
301
302
303 func statefulSetNodeTargetingSubstitution(un *unstructured.Unstructured, su Substitution) (*unstructured.Unstructured, error) {
304 sts := &appsv1.StatefulSet{}
305 err := unstructured.FromUnstructured(un, sts)
306 if err != nil {
307 return nil, err
308 }
309
310 if sts.Spec.Template.Spec.Affinity == nil {
311 sts.Spec.Template.Spec.Affinity = &corev1.Affinity{}
312 }
313 if sts.Spec.Template.Spec.Affinity.NodeAffinity == nil {
314 sts.Spec.Template.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
315 }
316 na := sts.Spec.Template.Spec.Affinity.NodeAffinity
317
318 sts.Spec.Template.ObjectMeta.Labels[persistenceApi.InstanceLabel] = sts.Name
319 sts.Spec.Selector.MatchLabels[persistenceApi.InstanceLabel] = sts.Name
320 if su.Leader {
321 sts.Spec.Template.ObjectMeta.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
322 }
323
324 if su.DSDS {
325 na.PreferredDuringSchedulingIgnoredDuringExecution = nil
326 na.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{
327 NodeSelectorTerms: []corev1.NodeSelectorTerm{
328 {
329 MatchExpressions: []corev1.NodeSelectorRequirement{
330 {
331 Key: couchdb.NodeUIDLabel,
332 Operator: corev1.NodeSelectorOpIn,
333 Values: []string{su.NodeUID},
334 },
335 },
336 },
337 },
338 }
339 } else {
340 na.RequiredDuringSchedulingIgnoredDuringExecution = nil
341 na.PreferredDuringSchedulingIgnoredDuringExecution = []corev1.PreferredSchedulingTerm{
342 {
343 Weight: int32(100),
344 Preference: corev1.NodeSelectorTerm{
345 MatchExpressions: []corev1.NodeSelectorRequirement{{
346 Key: nodemeta.RoleLabel,
347 Operator: corev1.NodeSelectorOpIn,
348 Values: []string{string(v1ien.ControlPlane)},
349 }},
350 },
351 },
352 }
353 }
354 return unstructured.ToUnstructured(sts)
355 }
356
View as plain text