1 package couchctl
2
3 import (
4 "context"
5 "fmt"
6 "reflect"
7 "time"
8
9 "edge-infra.dev/pkg/edge/clientutils"
10
11 "github.com/go-logr/logr"
12 "github.com/google/uuid"
13 appsv1 "k8s.io/api/apps/v1"
14 corev1 "k8s.io/api/core/v1"
15 "k8s.io/apimachinery/pkg/api/errors"
16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17 "k8s.io/apimachinery/pkg/types"
18 "k8s.io/client-go/dynamic"
19 kuberecorder "k8s.io/client-go/tools/record"
20 "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
21 ctrl "sigs.k8s.io/controller-runtime"
22 "sigs.k8s.io/controller-runtime/pkg/builder"
23 "sigs.k8s.io/controller-runtime/pkg/client"
24 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
25 "sigs.k8s.io/controller-runtime/pkg/event"
26 "sigs.k8s.io/controller-runtime/pkg/handler"
27 "sigs.k8s.io/controller-runtime/pkg/predicate"
28 ctrlReconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
29
30 "edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils"
31 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
32 "edge-infra.dev/pkg/edge/datasync/couchdb"
33 "edge-infra.dev/pkg/k8s/meta/status"
34 "edge-infra.dev/pkg/k8s/runtime/conditions"
35 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
36 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
37 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
38 "edge-infra.dev/pkg/k8s/runtime/inventory"
39 "edge-infra.dev/pkg/k8s/runtime/patch"
40 "edge-infra.dev/pkg/k8s/runtime/sap"
41 "edge-infra.dev/pkg/k8s/unstructured"
42 )
43
44 var (
45 persistenceConditions = reconcile.Conditions{
46 Target: status.ReadyCondition,
47 Owned: []string{
48 dsapi.PersistenceSetupSucceededReason,
49 status.ReadyCondition,
50 status.ReconcilingCondition,
51 status.StalledCondition,
52 },
53 Summarize: []string{
54 dsapi.PersistenceSetupSucceededReason,
55 status.StalledCondition,
56 },
57 NegativePolarity: []string{
58 status.ReconcilingCondition,
59 status.StalledCondition,
60 },
61 }
62 oldPVCs = map[string]string{
63 "data-sync-couchdb": "database-storage",
64 "data-sync-messaging": "outbox",
65 }
66 )
67
68 type CouchDBPersistenceReconciler struct {
69 client.Client
70 LeaderElector
71 kuberecorder.EventRecorder
72 ResourceManager *sap.ResourceManager
73 Name string
74 Config *Config
75 Metrics metrics.Metrics
76 patchOptions []patch.Option
77 replicationDB string
78 PersistenceLeaderElector
79 }
80
81 func (r *CouchDBPersistenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
82 r.replicationDB = r.Config.ReplicationDB()
83 r.patchOptions = getPatchOptions(persistenceConditions.Owned, r.Name)
84 d, err := dynamic.NewForConfig(mgr.GetConfig())
85 if err != nil {
86 return fmt.Errorf("fail to create dynamic client: %w", err)
87 }
88 r.ResourceManager = sap.NewResourceManager(
89 r.Client,
90 watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()),
91 sap.Owner{Field: r.Name},
92 )
93 return ctrl.NewControllerManagedBy(mgr).
94 For(&dsapi.CouchDBPersistence{}, r.persistencePredicates()).
95 Watches(
96 &corev1.Node{},
97 handler.EnqueueRequestsFromMapFunc(r.enqueue),
98 builder.WithPredicates(nodePredicate()),
99 ).
100 Owns(&dsapi.CouchDBServer{}).
101 Owns(&dsapi.CouchDBDatabase{}).
102 Owns(&dsapi.CouchDBUser{}).
103 Owns(&dsapi.CouchDBReplicationSet{}).
104 Owns(&appsv1.StatefulSet{}).
105 Complete(r)
106 }
107
108 func (r *CouchDBPersistenceReconciler) persistencePredicates() builder.Predicates {
109 return builder.WithPredicates(
110 predicate.GenerationChangedPredicate{},
111 predicate.NewPredicateFuncs(func(_ client.Object) bool {
112 if r.Config.IsDSDS() {
113 return r.IsLeader()
114 }
115 return true
116 }))
117 }
118
119 func (r *CouchDBPersistenceReconciler) enqueue(ctx context.Context, _ client.Object) []ctrlReconcile.Request {
120 if r.Config.IsDSDS() && !r.IsLeader() {
121 return nil
122 }
123 persList := &dsapi.CouchDBPersistenceList{}
124 if err := r.Client.List(ctx, persList); client.IgnoreNotFound(err) != nil {
125 return nil
126 }
127 var requests []ctrlReconcile.Request
128 for _, p := range persList.Items {
129 requests = append(requests,
130 ctrlReconcile.Request{
131 NamespacedName: types.NamespacedName{
132 Name: p.Name,
133 Namespace: p.Namespace,
134 }})
135 }
136 return requests
137 }
138
139 func nodePredicate() predicate.Predicate {
140 return predicate.Funcs{
141 UpdateFunc: nodeUpdatePredicate,
142 CreateFunc: func(_ event.CreateEvent) bool {
143 return true
144 },
145 DeleteFunc: func(_ event.DeleteEvent) bool {
146 return true
147 },
148 }
149 }
150
151 func nodeUpdatePredicate(e event.UpdateEvent) bool {
152 if !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels()) {
153 return true
154 }
155 updatedNode, ok := e.ObjectNew.(*corev1.Node)
156 if !ok {
157 return false
158 }
159 oldNode, ok := e.ObjectOld.(*corev1.Node)
160 if !ok {
161 return false
162 }
163 return updatedNode.Spec.Unschedulable != oldNode.Spec.Unschedulable
164 }
165
166 func (r *CouchDBPersistenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
167 reconcileStart := time.Now()
168 log := ctrl.LoggerFrom(ctx)
169
170 p := &dsapi.CouchDBPersistence{}
171 if err := r.Client.Get(ctx, req.NamespacedName, p); err != nil {
172 return ctrl.Result{}, client.IgnoreNotFound(err)
173 }
174
175 ctx = logr.NewContext(ctx, log)
176
177 patcher := patch.NewSerialPatcher(p, r.Client)
178 if patchErr := reconcile.Progressing(ctx, p, patcher, r.patchOptions...); patchErr != nil {
179 log.Error(patchErr, "unable to update status")
180 return ctrl.Result{}, err
181 }
182
183 recResult := reconcile.ResultEmpty
184 var recErr recerr.Error
185
186 defer func() {
187 summarizer := reconcile.NewSummarizer(patcher)
188 res, err = summarizer.SummarizeAndPatch(ctx, p, []reconcile.SummarizeOption{
189 reconcile.WithConditions(persistenceConditions),
190 reconcile.WithResult(recResult),
191 reconcile.WithError(recErr),
192 reconcile.WithIgnoreNotFound(),
193 reconcile.WithProcessors(
194 reconcile.RecordResult,
195 ),
196 reconcile.WithFieldOwner(r.Name),
197 reconcile.WithEventRecorder(r.EventRecorder),
198 }...)
199 r.Metrics.RecordDuration(ctx, p, reconcileStart)
200 r.Metrics.RecordReadiness(ctx, p)
201 }()
202
203
204 if !controllerutil.ContainsFinalizer(p, DatasyncFinalizer) {
205 controllerutil.AddFinalizer(p, DatasyncFinalizer)
206 recResult = reconcile.ResultRequeue
207 return
208 }
209
210
211 if !p.ObjectMeta.DeletionTimestamp.IsZero() {
212 log.Info("executing finalizer")
213 if fErr := pruneInventory(ctx, r.ResourceManager, p); fErr != nil {
214 err = fErr
215 return
216 }
217 controllerutil.RemoveFinalizer(p, DatasyncFinalizer)
218 log.Info("finalizer executed")
219 return
220 }
221
222 if recErr = r.reconcile(ctx, p); recErr != nil {
223 recErr.ToCondition(p, dsapi.PersistenceSetupSucceededReason)
224 err = recErr
225 return
226 }
227
228 recResult = reconcile.ResultSuccess
229 conditions.MarkTrue(p, dsapi.PersistenceSetupSucceededReason, status.SucceededReason, "Successfully created CouchDBPersistence resources")
230 log.Info("Successfully created CouchDBPersistence resources")
231
232 return
233 }
234
235 func (r *CouchDBPersistenceReconciler) reconcile(ctx context.Context, p *dsapi.CouchDBPersistence) recerr.Error {
236 log := logr.FromContextOrDiscard(ctx)
237 objs := p.PersistenceObjects()
238 if len(objs) == 0 {
239 log.Error(fmt.Errorf("invalid spec"), "no resources provided")
240 return recerr.NewStalled(fmt.Errorf("invalid spec"), "no resources provided")
241 }
242
243 substitutions, err := r.buildNodeSubstitutions(ctx)
244 if err != nil {
245 log.Error(err, "could not build substitutions")
246 return recerr.NewWait(err, "could not build substitutions", r.Config.RequeueTime)
247 }
248
249 selectNodeByRole, role := p.NodeRoleFilter()
250 selectNodeByClass, class := p.NodeClassFilter()
251 var uns []*unstructured.Unstructured
252 for _, obj := range objs {
253 for _, s := range substitutions {
254 if selectNodeByRole && s.DSDS && role != s.NodeRole() {
255 continue
256 }
257 if selectNodeByClass && s.DSDS && class != s.NodeClass() {
258 continue
259 }
260 un, err := ApplySubstitutions(obj, s)
261 if err != nil {
262 return recerr.NewStalled(err, "spec invalid substitution")
263 }
264 un.SetOwnerReferences(r.ownerRef(p))
265 un.SetNamespace(p.Namespace)
266 uns = append(uns, un)
267 }
268 }
269
270 changeSet, err := r.ResourceManager.ApplyAll(ctx, uns, sap.ApplyOptions{Force: true})
271 if err != nil {
272 log.Error(err, "fail to apply persistence resources")
273 return recerr.New(err, dsapi.PersistenceObjectsCreationFailedReason)
274 }
275
276 i := inventory.New(inventory.FromSapChangeSet(changeSet))
277 if err := r.prune(ctx, p, i); err != nil {
278 log.Error(err, "fail to prune resources")
279 return recerr.New(err, dsapi.PruneFailed)
280 }
281 p.Status.Inventory = i
282
283 return nil
284 }
285
286
287
288 func (r *CouchDBPersistenceReconciler) buildNodeSubstitutions(ctx context.Context) (map[string]Substitution, error) {
289 log := logr.FromContextOrDiscard(ctx)
290 m := map[string]Substitution{}
291 if !r.Config.IsDSDS() {
292 su := StoreSubstitution(r.replicationDB)
293 genericUID, err := r.getNodeUIDGeneric(ctx)
294 su.NodeUID = genericUID
295 if err != nil {
296 log.Error(err, "failed to fetch Node UID for generic cluster")
297 return nil, err
298 }
299 m[su.ServerName] = su
300 return m, nil
301 }
302 nodes := &corev1.NodeList{}
303 if err := r.Client.List(ctx, nodes); client.IgnoreNotFound(err) != nil {
304 log.Error(err, "failed to get dsds nodes")
305 return nil, err
306 }
307 oldPVCs, err := oldPVCsSuffixes(ctx, r.Client)
308 if err != nil {
309 log.Error(err, "fail to get old pvcs")
310 return nil, err
311 }
312 leaderNode, err := r.LeaderElector.Elect(nodes.Items)
313 if err != nil {
314 log.Error(err, "fail to elect leader node")
315 return nil, err
316 }
317 log.Info("LEADER NODE", "node", leaderNode.Name)
318 var oldLeader *corev1.Node
319 for i := range nodes.Items {
320 node := nodes.Items[i]
321 if node.Spec.Unschedulable {
322 if node.Labels[couchdb.NodeLeaderLabel] == couchdb.LabelValueTrue {
323 oldLeader = &node
324 }
325 continue
326 }
327
328 ni, err := nameutils.GetNodeInfo(node, LaneNumberSubstitutionMaxLength)
329 if err != nil {
330 log.Info("Fail to acquire Node Info", "node", node.Name, "err", err)
331 continue
332 }
333
334 su := LaneSubstitution(ni, oldPVCs, r.replicationDB, string(leaderNode.UID))
335
336
337 node.Labels[couchdb.NodeUIDLabel] = string(node.UID)
338 if su.Leader {
339 node.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
340 } else {
341 delete(node.Labels, couchdb.NodeLeaderLabel)
342 }
343
344 nodeWithLabel := node
345 if err = r.Client.Update(ctx, &nodeWithLabel); err != nil {
346 return nil, fmt.Errorf("fail to update/annotate node: %s, %w", node.Name, err)
347 }
348 m[su.ServerName] = su
349 }
350 if oldLeader != nil {
351 delete(oldLeader.Labels, couchdb.NodeLeaderLabel)
352 if err = r.Client.Update(ctx, oldLeader); err != nil && !errors.IsNotFound(err) {
353 return nil, fmt.Errorf("fail to update/annotate node: %s, %w", oldLeader.Name, err)
354 }
355 }
356 return m, nil
357 }
358
359 func (r *CouchDBPersistenceReconciler) getNodeUIDGeneric(ctx context.Context) (string, error) {
360 log := logr.FromContextOrDiscard(ctx)
361
362 cm := &corev1.ConfigMap{}
363 err := r.Client.Get(context.TODO(), client.ObjectKey{
364 Name: ConfigMapUID,
365 Namespace: r.Config.CouchNamespace,
366 }, cm)
367
368 if err != nil && errors.IsNotFound(err) {
369 u := uuid.New().String()
370 newCM := &corev1.ConfigMap{
371 ObjectMeta: metav1.ObjectMeta{
372 Name: ConfigMapUID,
373 Namespace: r.Config.CouchNamespace,
374 },
375 Data: map[string]string{
376 "uuid": u,
377 },
378 }
379 err = r.Client.Create(ctx, newCM)
380 if err != nil {
381 log.Error(err, "Failed to create ConfigMap with uuid")
382 return "", err
383 }
384 return u, nil
385 } else if err != nil {
386 log.Error(err, "Failed to fetch ConfigMap")
387 return "", err
388 }
389
390 if u, exists := cm.Data["uuid"]; exists {
391 return u, nil
392 }
393 log.Info("ConfigMap exists but no uuid found, recreating")
394 newUUID := uuid.New().String()
395 cm.Data["uuid"] = newUUID
396 err = clientutils.CreateOrUpdateConfigmap(ctx, r.Client, cm)
397 if err != nil {
398 log.Error(err, "Failed to update ConfigMap with new uuid")
399 return "", err
400 }
401 return newUUID, nil
402 }
403 func (r *CouchDBPersistenceReconciler) prune(ctx context.Context, p *dsapi.CouchDBPersistence, i *inventory.ResourceInventory) error {
404 if p.Status.Inventory != nil {
405 diff, err := inventory.Diff(p.Status.Inventory, i)
406 if err != nil {
407 return nil
408 }
409 if len(diff) > 0 {
410 changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
411 if err != nil {
412 return err
413 }
414 log := logr.FromContextOrDiscard(ctx)
415 log.Info("pruned objects", "changeset", changeSet.ToMap())
416 }
417 }
418 return nil
419 }
420
421
422 func (r *CouchDBPersistenceReconciler) ownerRef(p *dsapi.CouchDBPersistence) []metav1.OwnerReference {
423 return []metav1.OwnerReference{
424 *metav1.NewControllerRef(
425 p,
426 dsapi.GroupVersion.WithKind("CouchDBPersistence"),
427 ),
428 }
429 }
430
View as plain text