1 package couchctl
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "strings"
9 "time"
10
11 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
12 "edge-infra.dev/pkg/edge/datasync/couchdb"
13 "edge-infra.dev/pkg/k8s/meta/status"
14 "edge-infra.dev/pkg/k8s/runtime/conditions"
15 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
16 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
17 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
18 "edge-infra.dev/pkg/k8s/runtime/patch"
19
20 "github.com/go-logr/logr"
21
22 kuberecorder "k8s.io/client-go/tools/record"
23 ctrl "sigs.k8s.io/controller-runtime"
24 "sigs.k8s.io/controller-runtime/pkg/builder"
25 "sigs.k8s.io/controller-runtime/pkg/client"
26 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
27 "sigs.k8s.io/controller-runtime/pkg/predicate"
28 )
29
30 type CouchDBDesignDocReconciler struct {
31 client.Client
32 NodeResourcePredicate
33 kuberecorder.EventRecorder
34 Name string
35 Config *Config
36 Metrics metrics.Metrics
37 patchOptions []patch.Option
38 }
39
40 var (
41 dDocConditions = reconcile.Conditions{
42 Target: status.ReadyCondition,
43 Owned: []string{
44 dsapi.DesignDocSetupSucceededReason,
45 status.ReadyCondition,
46 status.ReconcilingCondition,
47 status.StalledCondition,
48 },
49 Summarize: []string{
50 dsapi.DesignDocSetupSucceededReason,
51 status.StalledCondition,
52 },
53 NegativePolarity: []string{
54 status.ReconcilingCondition,
55 status.StalledCondition,
56 },
57 }
58 )
59
60
61 func (r *CouchDBDesignDocReconciler) SetupWithManager(mgr ctrl.Manager) error {
62 r.patchOptions = getPatchOptions(dDocConditions.Owned, r.Name)
63 return ctrl.NewControllerManagedBy(mgr).
64 For(&dsapi.CouchDBDesignDoc{}, r.designDocPredicates()).
65 Complete(r)
66 }
67
68 func (r *CouchDBDesignDocReconciler) designDocPredicates() builder.Predicates {
69 return builder.WithPredicates(
70 predicate.GenerationChangedPredicate{},
71 predicate.NewPredicateFuncs(func(_ client.Object) bool {
72 if r.Config.IsDSDS() {
73 server, err := couchDBLeader(context.Background(), r.Client, r.Config)
74 if err != nil {
75 return false
76 }
77 return r.ShouldReconcile(r.Config, server)
78 }
79 return true
80 }))
81 }
82
83 func (r *CouchDBDesignDocReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
84 reconcileStart := time.Now()
85
86 log := ctrl.LoggerFrom(ctx)
87
88 ddoc := &dsapi.CouchDBDesignDoc{}
89 if err := r.Client.Get(ctx, req.NamespacedName, ddoc); err != nil {
90 return ctrl.Result{}, client.IgnoreNotFound(err)
91 }
92 ddoc.Spec.SetRetryInterval(r.Config.RequeueTime)
93 ddoc.Spec.SetInterval(r.Config.PollingInterval)
94 log = log.WithValues("design doc", ddoc.Spec.ID)
95 ctx = logr.NewContext(ctx, log)
96
97 patcher := patch.NewSerialPatcher(ddoc, r.Client)
98 if err := reconcile.Progressing(ctx, ddoc, patcher, r.patchOptions...); err != nil {
99 log.Error(err, "unable to update CouchDBDesignDoc status")
100 return ctrl.Result{}, err
101 }
102
103 recResult := reconcile.ResultEmpty
104 var recErr recerr.Error
105
106 defer func() {
107 summarizer := reconcile.NewSummarizer(patcher)
108 res, err = summarizer.SummarizeAndPatch(ctx, ddoc, []reconcile.SummarizeOption{
109 reconcile.WithConditions(dDocConditions),
110 reconcile.WithResult(recResult),
111 reconcile.WithError(recErr),
112 reconcile.WithIgnoreNotFound(),
113 reconcile.WithProcessors(reconcile.RecordResult),
114 reconcile.WithFieldOwner(r.Name),
115 reconcile.WithEventRecorder(r.EventRecorder),
116 }...)
117 r.Metrics.RecordDuration(ctx, ddoc, reconcileStart)
118 r.Metrics.RecordReadiness(ctx, ddoc)
119 }()
120
121 if recErr = r.reconcile(ctx, ddoc); recErr != nil {
122 recErr.ToCondition(ddoc, dsapi.DesignDocCreationFailedReason)
123 return
124 }
125
126
127 if !controllerutil.ContainsFinalizer(ddoc, DatasyncFinalizer) {
128 controllerutil.AddFinalizer(ddoc, DatasyncFinalizer)
129
130
131 recResult = reconcile.ResultRequeue
132 return
133 }
134
135 if !ddoc.ObjectMeta.DeletionTimestamp.IsZero() {
136 recErr = r.finalize(ctx, ddoc)
137 return
138 }
139
140 recResult = reconcile.ResultSuccess
141 conditions.MarkTrue(ddoc, dsapi.DesignDocSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDBDesignDoc")
142 log.Info("Successfully set up CouchDBDesignDoc")
143 return
144 }
145
146 func (r *CouchDBDesignDocReconciler) reconcile(ctx context.Context, ddoc *dsapi.CouchDBDesignDoc) recerr.Error {
147 log := logr.FromContextOrDiscard(ctx)
148
149 cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
150 if err != nil {
151 log.Error(err, "failed to get leader couchdb client")
152 return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
153 }
154
155 defer func(cc *couchdb.CouchDB, ctx context.Context) {
156 err := cc.Close(ctx)
157 if err != nil {
158 log.Error(err, "failed to close couchdb client")
159 }
160 }(cc, ctx)
161
162 db := cc.Client.DB(ddoc.Spec.DB)
163 docID := ddoc.Spec.ID
164 if !strings.HasPrefix(docID, "_design/") {
165 docID = fmt.Sprintf("_design/%s", ddoc.Spec.ID)
166 }
167
168 var rev string
169 switch row := db.Get(ctx, docID); {
170 case row.Err() == nil:
171 doc := map[string]interface{}{}
172 if err = row.ScanDoc(&doc); err != nil {
173 log.Error(err, "failed to scan design doc")
174 return recerr.New(fmt.Errorf("failed to scan design doc: %w", err), status.DependencyInvalidReason)
175 }
176 rev = doc["_rev"].(string)
177 if designDocEqual(doc, ddoc.Spec.DesignDoc) {
178 log.Info("design doc already exists", "revision", rev)
179 return nil
180 }
181 case couchdb.IsNotFound(row.Err()):
182 rev = ""
183 default:
184 return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", row.Err()), status.DependencyInvalidReason)
185 }
186
187 var doc interface{} = ddoc.Spec.DesignDoc
188 if rev != "" {
189 doc = addRevisionDoc(doc, rev)
190 }
191 _, err = db.Put(ctx, docID, doc)
192 if err != nil {
193 log.Error(err, "failed to create couchdb design doc")
194 return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", err), dsapi.DesignDocCreationFailedReason)
195 }
196 return nil
197 }
198
199 func (r *CouchDBDesignDocReconciler) finalize(ctx context.Context, ddoc *dsapi.CouchDBDesignDoc) recerr.Error {
200 log := logr.FromContextOrDiscard(ctx)
201
202 cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
203 if err != nil {
204 log.Error(err, "failed to get leader couchdb client")
205 return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
206 }
207
208 defer func(cc *couchdb.CouchDB, ctx context.Context) {
209 err := cc.Close(ctx)
210 if err != nil {
211 log.Error(err, "failed to close couchdb client")
212 }
213 }(cc, ctx)
214
215 db := cc.Client.DB(ddoc.Spec.DB)
216
217 ddocID := ddoc.Spec.ID
218 if !strings.HasPrefix(ddocID, "_design/") {
219 ddocID = fmt.Sprintf("_design/%s", ddoc.Spec.ID)
220 }
221 rev, err := db.GetRev(ctx, ddocID)
222 if err != nil {
223 if couchdb.IsNotFound(err) {
224 return nil
225 }
226 log.Error(err, "failed to get couchdb design doc")
227 return recerr.New(fmt.Errorf("failed to get couchdb design doc: %w", err), status.DependencyInvalidReason)
228 }
229 _, err = db.Delete(ctx, ddocID, rev)
230 if err != nil {
231 log.Error(err, "failed to create couchdb design doc")
232 return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", err), dsapi.DesignDocCreationFailedReason)
233 }
234 controllerutil.RemoveFinalizer(ddoc, DatasyncFinalizer)
235 return nil
236 }
237
238 func designDocEqual(doc map[string]interface{}, specDoc dsapi.DesignDoc) bool {
239 lang, ok := doc["language"]
240 if specDoc.Language != "" && ok && specDoc.Language != lang.(string) {
241 return false
242 }
243 vdu, ok := doc["validateDocUpdate"]
244 if specDoc.ValidateDocUpdate != "" && ok && specDoc.ValidateDocUpdate != vdu.(string) {
245 return false
246 }
247 if !docValueEqual(specDoc.Views, doc["views"]) {
248 return false
249 }
250 if !docValueEqual(specDoc.Updates, doc["updates"]) {
251 return false
252 }
253 if !docValueEqual(specDoc.Filters, doc["filters"]) {
254 return false
255 }
256 return true
257 }
258
259 func docValueEqual(doc1 map[string]map[string]string, doc2 interface{}) bool {
260 if len(doc1) == 0 && doc2 == nil {
261 return true
262 }
263 if len(doc1) > 0 && doc2 == nil {
264 return false
265 }
266 if doc2 != nil && len(doc1) == 0 {
267 return false
268 }
269 data1, err := json.Marshal(doc1)
270 if err != nil {
271 return false
272 }
273 data2, err := json.Marshal(doc2)
274 if err != nil {
275 return false
276 }
277 return bytes.Equal(data1, data2)
278 }
279
280 func addRevisionDoc(doc interface{}, rev string) map[string]interface{} {
281 m := map[string]interface{}{}
282 m["_rev"] = rev
283 data, _ := json.Marshal(doc)
284 _ = json.Unmarshal(data, &m)
285 return m
286 }
287
View as plain text