1 package couchctl
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/go-logr/logr"
10
11 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
12 "edge-infra.dev/pkg/edge/datasync/couchdb"
13 "edge-infra.dev/pkg/k8s/meta/status"
14 "edge-infra.dev/pkg/k8s/runtime/conditions"
15 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
16 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
17 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
18 "edge-infra.dev/pkg/k8s/runtime/inventory"
19 "edge-infra.dev/pkg/k8s/runtime/patch"
20 "edge-infra.dev/pkg/k8s/runtime/sap"
21 unstructuredutil "edge-infra.dev/pkg/k8s/unstructured"
22
23 corev1 "k8s.io/api/core/v1"
24 netv1 "k8s.io/api/networking/v1"
25 kerrors "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/types"
27 "k8s.io/client-go/dynamic"
28 kuberecorder "k8s.io/client-go/tools/record"
29 "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
30 ctrl "sigs.k8s.io/controller-runtime"
31 "sigs.k8s.io/controller-runtime/pkg/builder"
32 "sigs.k8s.io/controller-runtime/pkg/client"
33 "sigs.k8s.io/controller-runtime/pkg/predicate"
34 )
35
36 type CouchServerReconciler struct {
37 client.Client
38 NodeResourcePredicate
39 kuberecorder.EventRecorder
40 ResourceManager *sap.ResourceManager
41 Name string
42 Config *Config
43 Metrics metrics.Metrics
44 patchOptions []patch.Option
45 }
46
47 type Payload struct {
48 Action string `json:"action"`
49 }
50
51 type ServerSetupResponse struct {
52 State string `json:"state,omitempty"`
53 Error string `json:"error,omitempty"`
54 Reason string `json:"reason,omitempty"`
55 }
56
57 var (
58 ErrPodsNotReady = errors.New("pods arent ready")
59
60 serverConditions = reconcile.Conditions{
61 Target: status.ReadyCondition,
62 Owned: []string{
63 dsapi.ServerSetupSucceededReason,
64 status.ReadyCondition,
65 status.ReconcilingCondition,
66 status.StalledCondition,
67 },
68 Summarize: []string{
69 dsapi.ServerSetupSucceededReason,
70 status.StalledCondition,
71 },
72 NegativePolarity: []string{
73 status.ReconcilingCondition,
74 status.StalledCondition,
75 },
76 }
77 )
78
79
80 func (r *CouchServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
81 r.patchOptions = getPatchOptions(serverConditions.Owned, r.Name)
82 d, err := dynamic.NewForConfig(mgr.GetConfig())
83 if err != nil {
84 return fmt.Errorf("fail to create dynamic client: %w", err)
85 }
86 r.ResourceManager = sap.NewResourceManager(
87 r.Client,
88 watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()),
89 sap.Owner{Field: r.Name},
90 )
91
92 b := ctrl.NewControllerManagedBy(mgr).
93 For(&dsapi.CouchDBServer{}, r.serverPredicates())
94 return b.Owns(&corev1.Secret{}).Complete(r)
95 }
96
97 func (r *CouchServerReconciler) serverPredicates() builder.Predicates {
98 return builder.WithPredicates(
99 predicate.GenerationChangedPredicate{},
100 predicate.NewPredicateFuncs(func(obj client.Object) bool {
101 if r.Config.IsDSDS() {
102 return r.ShouldReconcile(r.Config, obj)
103 }
104 return true
105 }))
106 }
107
108 func (r *CouchServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
109 reconcileStart := time.Now()
110
111 log := ctrl.LoggerFrom(ctx)
112
113 server := &dsapi.CouchDBServer{}
114 if err := r.Get(ctx, req.NamespacedName, server); err != nil {
115 return ctrl.Result{}, client.IgnoreNotFound(err)
116 }
117 server.WithRetry(r.Config.RequeueTime)
118 server.WithInterval(r.Config.PollingInterval)
119
120 log = log.WithValues("type", server.Type())
121 ctx = logr.NewContext(ctx, log)
122
123 patcher := patch.NewSerialPatcher(server, r.Client)
124
125 if err := reconcile.Progressing(ctx, server, patcher, r.patchOptions...); err != nil {
126 log.Error(err, "unable to update status")
127 return ctrl.Result{}, err
128 }
129
130 recResult := reconcile.ResultEmpty
131 var recErr recerr.Error
132
133 defer func() {
134 summarizer := reconcile.NewSummarizer(patcher)
135 res, err = summarizer.SummarizeAndPatch(ctx, server, []reconcile.SummarizeOption{
136 reconcile.WithConditions(serverConditions),
137 reconcile.WithResult(recResult),
138 reconcile.WithError(recErr),
139 reconcile.WithIgnoreNotFound(),
140 reconcile.WithProcessors(
141 reconcile.RecordResult,
142 ),
143 reconcile.WithFieldOwner(r.Name),
144 reconcile.WithEventRecorder(r.EventRecorder),
145 }...)
146 r.Metrics.RecordDuration(ctx, server, reconcileStart)
147 r.Metrics.RecordReadiness(ctx, server)
148 }()
149
150 if recErr = r.reconcile(ctx, server); recErr != nil {
151 recErr.ToCondition(server, dsapi.ServerSetupSucceededReason)
152 err = recErr
153 return
154 }
155 recResult = reconcile.ResultSuccess
156 conditions.MarkTrue(server, dsapi.ServerSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDB server")
157 log.Info("Successfully set up CouchDB Server")
158
159 return
160 }
161
162
163 func (r *CouchServerReconciler) reconcile(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error {
164 log := logr.FromContextOrDiscard(ctx)
165
166 changeSet := sap.NewChangeSet()
167
168 if server.IsCloud() {
169 err := r.waitForNetworkingSetup(ctx, server)
170 if err != nil {
171 log.Error(err, "waiting for network setup")
172 return err
173 }
174
175 cs, err := r.applyServerManifests(ctx, server)
176 if err != nil {
177 log.Error(err, "fail to apply generated manifests from CouchDBServer")
178 return err
179 }
180 changeSet.Add(*cs)
181
182
183 cs, creds, err := r.reconcileAdminCreds(ctx, server)
184 if err != nil {
185 log.Error(err, "fail to reconcile admin creds")
186 return err
187 }
188 changeSet.Add(*cs)
189
190 err = r.couchDBPodsReady(ctx, server)
191 if err != nil {
192
193 log.Error(err, "couchdb cloud server pods fail to be ready")
194 return err
195 }
196
197
198 if err := r.sendFinishClusterCommand(server, creds); err != nil {
199 return recerr.New(fmt.Errorf("failed to setup cluster: %w", err), dsapi.FinishClusterFailedReason)
200 }
201 } else {
202 cs, err := r.applyServerManifests(ctx, server)
203 if err != nil {
204 log.Error(err, "fail to apply generated manifests from CouchDBServer")
205 return err
206 }
207 changeSet.Add(*cs)
208
209
210 cs, _, err = r.reconcileAdminCreds(ctx, server)
211 if err != nil {
212 log.Error(err, "fail to reconcile store admin creds")
213 return err
214 }
215 changeSet.Add(*cs)
216
217 err = r.couchDBPodsReady(ctx, server)
218 if err != nil {
219 if !couchDBNotReadyOrNotFound(err) {
220 log.Error(err, "couchdb pods for store server fail to be ready")
221 }
222 return err
223 }
224 }
225 i := inventory.New(inventory.FromSapChangeSet(changeSet))
226 if err := r.prune(ctx, server, i); err != nil {
227 return recerr.New(err, dsapi.PruneFailed)
228 }
229 server.Status.Inventory = i
230 return nil
231 }
232
233
234 func (r *CouchServerReconciler) prune(ctx context.Context, server *dsapi.CouchDBServer, i *inventory.ResourceInventory) error {
235 log := logr.FromContextOrDiscard(ctx)
236 if server.Status.Inventory != nil {
237 diff, err := inventory.Diff(server.Status.Inventory, i)
238 if err != nil {
239 return nil
240 }
241 if len(diff) > 0 {
242 opt := sap.DefaultDeleteOptions()
243 opt.Exclusions = map[string]string{
244 couchdb.IgnoreDeletionLabel: couchdb.LabelValueTrue,
245
246 couchdb.SubstitutionLabel: couchdb.LabelValueTrue,
247 }
248 changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt)
249 if err != nil {
250 return err
251 }
252 log.Info("pruned objects", "changeset", changeSet.ToMap())
253 }
254 }
255 return nil
256 }
257
258
259 func (r *CouchServerReconciler) applyServerManifests(ctx context.Context, server *dsapi.CouchDBServer) (*sap.ChangeSetEntry, recerr.Error) {
260 log := logr.FromContextOrDiscard(ctx)
261
262 serverCopy := server.DeepCopy()
263
264 if !server.IsCloud() {
265 uid, err := server.GetNodeUID()
266 if err != nil {
267 log.Error(err, "could not retrieve/create couchdb uuid")
268 return nil, recerr.New(err, status.DependencyInvalidReason)
269 }
270 uid = dsapi.CleanUID(uid)
271 serverCopy.Spec.Base.UUID = &uid
272 }
273
274 configmap, err := ConfigMap(*serverCopy)
275 if err != nil {
276 log.Error(err, "unable to build couchdb manifests")
277 return nil, recerr.NewStalled(fmt.Errorf("unable to build couchdb manifests"), dsapi.ServerSetupFailedReason)
278 }
279 couchdbManifests, err := unstructuredutil.ToUnstructured(configmap)
280 if err != nil {
281 log.Error(err, "unable to build couchdb manifests")
282 return nil, recerr.NewStalled(fmt.Errorf("unable to build couchdb manifests"), dsapi.ServerSetupFailedReason)
283 }
284
285 opt := sap.DefaultApplyOptions()
286 opt.Force = true
287 changeSet, err := r.ResourceManager.Apply(ctx, couchdbManifests, opt)
288 if err != nil {
289 log.Error(err, "resource manager unable to apply couchdb manifests")
290 return nil, recerr.New(err, dsapi.ServerSetupFailedReason)
291 }
292
293 return changeSet, nil
294 }
295
296
297 func (r *CouchServerReconciler) reconcileAdminCreds(ctx context.Context, server *dsapi.CouchDBServer) (*sap.ChangeSetEntry, *couchdb.AdminCredentials, recerr.Error) {
298
299 if server.Spec.Admin.Credentials == (corev1.SecretReference{}) {
300 return nil, nil, recerr.NewStalled(fmt.Errorf("Admin.Credentials missing, cannot start server"), dsapi.CookieCreationFailedReason)
301 }
302
303
304 c := server.Spec.Admin.Credentials
305 nsn := types.NamespacedName{
306 Name: c.Name,
307 Namespace: server.Namespace,
308 }
309 var cs *sap.ChangeSetEntry
310 creds := &couchdb.AdminCredentials{}
311 secret, err := creds.FromSecret(ctx, r.Client, nsn)
312 if err != nil {
313 secret, err := creds.ToSecret(ctx, r.Client, nsn)
314 if err != nil {
315 return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
316 }
317 un, err := unstructuredutil.ToUnstructured(secret)
318 if err != nil {
319 return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
320 }
321 cs, err = r.ResourceManager.Apply(ctx, un, sap.ApplyOptions{})
322 if err != nil {
323 return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
324 }
325 } else {
326
327 if secret.Labels == nil {
328 secret.Labels = map[string]string{}
329 }
330 if secret.Labels[couchdb.IgnoreDeletionLabel] != couchdb.LabelValueTrue {
331 p := client.MergeFrom(secret.DeepCopy())
332 secret.Labels[couchdb.IgnoreDeletionLabel] = couchdb.LabelValueTrue
333 err = r.Client.Patch(ctx, secret, p)
334 if err != nil {
335 return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
336 }
337 }
338 cs, err = ExistingChangeSetEntry(secret)
339 if err != nil {
340 return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
341 }
342 }
343 return cs, creds, nil
344 }
345
346
347 func (r *CouchServerReconciler) couchDBPodsReady(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error {
348
349
350
351
352
353
354 sts, err := server.GetStatefulSetName()
355 if err != nil {
356 return recerr.New(err, status.DependencyInvalidReason)
357 }
358
359 podNum := server.Spec.Cluster.Nodes
360 if podNum == 0 {
361 podNum = 1
362 }
363
364 for i := 0; i < podNum; i++ {
365 podName := fmt.Sprintf("%s-%d", sts, i)
366 ready, err := CheckPod(ctx, r.Client, server.Namespace, podName)
367 switch {
368 case err != nil && kerrors.IsNotFound(err):
369 return recerr.NewWait(ErrPodsNotReady, status.DependencyNotFoundReason, r.Config.ServerNotReady)
370 case err != nil:
371 return recerr.New(err, status.DependencyInvalidReason)
372 }
373 if !ready {
374 return recerr.NewWait(ErrPodsNotReady, status.DependencyNotReadyReason, r.Config.ServerNotReady)
375 }
376 }
377 cc, err := couchDBServerClient(ctx, r.Client, r.Config, server)
378 if err != nil {
379 return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
380 }
381 defer cc.Close(ctx)
382 pong, err := cc.Client.Ping(ctx)
383 if err != nil {
384 return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
385 }
386 if !pong {
387 return recerr.NewWait(ErrPodsNotReady, status.DependencyNotReadyReason, r.Config.ServerNotReady)
388 }
389 return nil
390 }
391
392
393 func (r *CouchServerReconciler) sendFinishClusterCommand(server *dsapi.CouchDBServer, c *couchdb.AdminCredentials) error {
394 if !server.Spec.Cluster.AutoFinish {
395 return nil
396 }
397
398
399 url := couchdb.FormatFinishClusterURI(string(c.Username), string(c.Password), server.Spec.URI, fmt.Sprint(r.Config.CouchDBPort))
400
401
402 finished, err := checkFinishStatus(url)
403 if err != nil {
404 return err
405 }
406
407 if finished {
408 return nil
409 }
410
411
412 resp := &ServerSetupResponse{}
413 err = httpRequest("POST", url, Payload{Action: "finish_cluster"}, &resp)
414 if err != nil {
415 return err
416 }
417
418
419 finished, err = checkFinishStatus(url)
420 if err != nil {
421 return err
422 }
423
424 if !finished {
425 return fmt.Errorf("error finishing the server")
426 }
427
428
429 return nil
430 }
431
432
433 func (r *CouchServerReconciler) waitForNetworkingSetup(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error {
434
435 if server.Spec.Ingress == (dsapi.Ingress{}) {
436 return nil
437 }
438
439 log := ctrl.LoggerFrom(ctx)
440
441 ingress := &netv1.Ingress{}
442 err := r.Client.Get(ctx, types.NamespacedName{
443 Name: couchdb.CouchIngressName,
444 Namespace: server.Namespace,
445 }, ingress)
446 if err != nil {
447 log.Error(err, "failed to get Ingress")
448 return recerr.New(fmt.Errorf("failed to get Ingress: %w", err), dsapi.ServerSetupFailedReason)
449 }
450
451 for _, ing := range ingress.Status.LoadBalancer.Ingress {
452 if ing.IP != "" {
453 return nil
454 }
455 }
456
457 return recerr.NewWait(fmt.Errorf("ingress not ready, IP address not found"), status.DependencyNotReadyReason, r.Config.IngressNotReady)
458 }
459
View as plain text