1 package couchctl
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "time"
8
9 "github.com/go-kivik/kivik/v4"
10
11 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
12 "edge-infra.dev/pkg/edge/datasync/couchdb"
13 "edge-infra.dev/pkg/k8s/meta/status"
14 "edge-infra.dev/pkg/k8s/runtime/conditions"
15 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
16 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
17 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
18 "edge-infra.dev/pkg/k8s/runtime/patch"
19
20 "github.com/go-logr/logr"
21
22 kuberecorder "k8s.io/client-go/tools/record"
23 ctrl "sigs.k8s.io/controller-runtime"
24 "sigs.k8s.io/controller-runtime/pkg/builder"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
27 "sigs.k8s.io/controller-runtime/pkg/predicate"
28 )
29
30 type CouchIndexReconciler struct {
31 client.Client
32 NodeResourcePredicate
33 kuberecorder.EventRecorder
34 Name string
35 Config *Config
36 Metrics metrics.Metrics
37 patchOptions []patch.Option
38 }
39
40 var (
41 indexConditions = reconcile.Conditions{
42 Target: status.ReadyCondition,
43 Owned: []string{
44 dsapi.IndexSetupSucceededReason,
45 status.ReadyCondition,
46 status.ReconcilingCondition,
47 status.StalledCondition,
48 },
49 Summarize: []string{
50 dsapi.IndexSetupSucceededReason,
51 status.StalledCondition,
52 },
53 NegativePolarity: []string{
54 status.ReconcilingCondition,
55 status.StalledCondition,
56 },
57 }
58 )
59
60
61 func (r *CouchIndexReconciler) SetupWithManager(mgr ctrl.Manager) error {
62 r.patchOptions = getPatchOptions(indexConditions.Owned, r.Name)
63 return ctrl.NewControllerManagedBy(mgr).
64 For(&dsapi.CouchDBIndex{}, r.indexPredicates()).
65 Complete(r)
66 }
67
68 func (r *CouchIndexReconciler) indexPredicates() builder.Predicates {
69 return builder.WithPredicates(
70 predicate.GenerationChangedPredicate{},
71 predicate.NewPredicateFuncs(func(_ client.Object) bool {
72 if r.Config.IsDSDS() {
73 server, err := couchDBLeader(context.Background(), r.Client, r.Config)
74 if err != nil {
75 return false
76 }
77 return r.ShouldReconcile(r.Config, server)
78 }
79 return true
80 }))
81 }
82
83 func (r *CouchIndexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
84 reconcileStart := time.Now()
85
86 log := ctrl.LoggerFrom(ctx)
87
88 index := &dsapi.CouchDBIndex{}
89 if err := r.Client.Get(ctx, req.NamespacedName, index); err != nil {
90 return ctrl.Result{}, client.IgnoreNotFound(err)
91 }
92 index.Spec.SetRetryInterval(r.Config.RequeueTime)
93 index.Spec.SetInterval(r.Config.PollingInterval)
94
95 log = log.WithValues("dbname", index.Spec.DB)
96 ctx = logr.NewContext(ctx, log)
97
98 patcher := patch.NewSerialPatcher(index, r.Client)
99 if err := reconcile.Progressing(ctx, index, patcher, r.patchOptions...); err != nil {
100 log.Error(err, "unable to update CouchDBIndex status")
101 return ctrl.Result{}, err
102 }
103
104 recResult := reconcile.ResultEmpty
105 var recErr recerr.Error
106
107 defer func() {
108 summarizer := reconcile.NewSummarizer(patcher)
109 res, err = summarizer.SummarizeAndPatch(ctx, index, []reconcile.SummarizeOption{
110 reconcile.WithConditions(serverConditions),
111 reconcile.WithResult(recResult),
112 reconcile.WithError(recErr),
113 reconcile.WithIgnoreNotFound(),
114 reconcile.WithProcessors(reconcile.RecordResult),
115 reconcile.WithFieldOwner(r.Name),
116 reconcile.WithEventRecorder(r.EventRecorder),
117 }...)
118 r.Metrics.RecordDuration(ctx, index, reconcileStart)
119 r.Metrics.RecordReadiness(ctx, index)
120 log.Info("Reconcile finished", "duration", time.Since(reconcileStart).String())
121 }()
122
123
124 if !controllerutil.ContainsFinalizer(index, DatasyncFinalizer) {
125 controllerutil.AddFinalizer(index, DatasyncFinalizer)
126
127
128 recResult = reconcile.ResultRequeue
129 return
130 }
131
132 if !index.ObjectMeta.DeletionTimestamp.IsZero() {
133 recErr = r.finalize(ctx, index)
134 return
135 }
136
137 if recErr = r.reconcile(ctx, index); recErr != nil {
138 recErr.ToCondition(index, dsapi.IndexCreationFailedReason)
139 err = recErr
140 return
141 }
142
143 recResult = reconcile.ResultSuccess
144 conditions.MarkTrue(index, dsapi.ServerSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDBIndex")
145 log.Info("Successfully set up CouchDBIndex")
146 return
147 }
148
149 func (r *CouchIndexReconciler) reconcile(ctx context.Context, index *dsapi.CouchDBIndex) recerr.Error {
150 log := logr.FromContextOrDiscard(ctx)
151
152 name := index.IndexName()
153 indexSelector := map[string]interface{}{}
154 indexSelector["fields"] = index.Spec.Index.Fields
155 if len(index.Spec.Index.PartialFilterSelector) > 0 {
156 pfs := map[string]interface{}{}
157 err := json.Unmarshal([]byte(index.Spec.Index.PartialFilterSelector), &pfs)
158 if err != nil {
159 log.Error(err, "fail to Unmarshal PartialFilterSelector, will not requeue")
160 return recerr.NewStalled(fmt.Errorf("fail to Unmarshal PartialFilterSelector, will not requeue: %w", err), dsapi.IndexMappingFailedReason)
161 }
162 indexSelector["partial_filter_selector"] = pfs
163 }
164
165 cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
166 if err != nil {
167 log.Error(err, "failed to get leader couchdb client")
168 return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
169 }
170
171 defer func(cc *couchdb.CouchDB, ctx context.Context) {
172 err := cc.Close(ctx)
173 if err != nil {
174 log.Error(err, "failed to close couchdb client")
175 }
176 }(cc, ctx)
177
178 err = cc.Client.DB(index.Spec.DB).CreateIndex(ctx, index.Spec.DDoc, name, indexSelector, kivik.Param("partitioned", index.Spec.Partitioned))
179 if err != nil {
180 log.Error(err, "failed to create couchdb index")
181 return recerr.New(fmt.Errorf("failed to create couchdb index: %w", err), dsapi.IndexCreationFailedReason)
182 }
183 return nil
184 }
185
186 func (r *CouchIndexReconciler) finalize(ctx context.Context, index *dsapi.CouchDBIndex) recerr.Error {
187 log := logr.FromContextOrDiscard(ctx)
188
189 cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
190 if err != nil {
191 log.Error(err, "failed to get leader couchdb client")
192 return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
193 }
194
195 defer func(cc *couchdb.CouchDB, ctx context.Context) {
196 err := cc.Close(ctx)
197 if err != nil {
198 log.Error(err, "failed to close couchdb client")
199 }
200 }(cc, ctx)
201
202 err = cc.Client.DB(index.Spec.DB).DeleteIndex(ctx, index.Spec.DDoc, index.IndexName())
203 if err != nil {
204 if couchdb.IsNotFound(err) {
205 controllerutil.RemoveFinalizer(index, DatasyncFinalizer)
206 return nil
207 }
208 log.Error(err, "failed to delete couchdb index")
209 return recerr.New(fmt.Errorf("failed to delete couchdb index: %w", err), dsapi.IndexCreationFailedReason)
210 }
211 controllerutil.RemoveFinalizer(index, DatasyncFinalizer)
212 return nil
213 }
214
View as plain text