package couchctl import ( "context" "errors" "fmt" "time" "github.com/go-logr/logr" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" unstructuredutil "edge-infra.dev/pkg/k8s/unstructured" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" kuberecorder "k8s.io/client-go/tools/record" "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" ) type CouchServerReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder ResourceManager *sap.ResourceManager Name string Config *Config Metrics metrics.Metrics patchOptions []patch.Option } type Payload struct { Action string `json:"action"` } type ServerSetupResponse struct { State string `json:"state,omitempty"` Error string `json:"error,omitempty"` Reason string `json:"reason,omitempty"` } var ( ErrPodsNotReady = errors.New("pods arent ready") serverConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ dsapi.ServerSetupSucceededReason, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ dsapi.ServerSetupSucceededReason, status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } ) // SetupWithManager sets up CouchServerReconciler with the manager func (r *CouchServerReconciler) SetupWithManager(mgr ctrl.Manager) error { r.patchOptions = getPatchOptions(serverConditions.Owned, r.Name) d, err := dynamic.NewForConfig(mgr.GetConfig()) if err != nil { return fmt.Errorf("fail to create dynamic client: %w", err) } r.ResourceManager = sap.NewResourceManager( r.Client, watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()), sap.Owner{Field: r.Name}, ) b := ctrl.NewControllerManagedBy(mgr). For(&dsapi.CouchDBServer{}, r.serverPredicates()) return b.Owns(&corev1.Secret{}).Complete(r) } func (r *CouchServerReconciler) serverPredicates() builder.Predicates { return builder.WithPredicates( predicate.GenerationChangedPredicate{}, predicate.NewPredicateFuncs(func(obj client.Object) bool { if r.Config.IsDSDS() { return r.ShouldReconcile(r.Config, obj) } return true })) } func (r *CouchServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { reconcileStart := time.Now() log := ctrl.LoggerFrom(ctx) server := &dsapi.CouchDBServer{} if err := r.Get(ctx, req.NamespacedName, server); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } server.WithRetry(r.Config.RequeueTime) server.WithInterval(r.Config.PollingInterval) log = log.WithValues("type", server.Type()) ctx = logr.NewContext(ctx, log) patcher := patch.NewSerialPatcher(server, r.Client) if err := reconcile.Progressing(ctx, server, patcher, r.patchOptions...); err != nil { log.Error(err, "unable to update status") return ctrl.Result{}, err } recResult := reconcile.ResultEmpty var recErr recerr.Error defer func() { summarizer := reconcile.NewSummarizer(patcher) res, err = summarizer.SummarizeAndPatch(ctx, server, []reconcile.SummarizeOption{ reconcile.WithConditions(serverConditions), reconcile.WithResult(recResult), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), reconcile.WithEventRecorder(r.EventRecorder), }...) r.Metrics.RecordDuration(ctx, server, reconcileStart) r.Metrics.RecordReadiness(ctx, server) }() if recErr = r.reconcile(ctx, server); recErr != nil { recErr.ToCondition(server, dsapi.ServerSetupSucceededReason) err = recErr return } recResult = reconcile.ResultSuccess conditions.MarkTrue(server, dsapi.ServerSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDB server") log.Info("Successfully set up CouchDB Server") return } // reconcile handles couchdb server set up func (r *CouchServerReconciler) reconcile(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error { log := logr.FromContextOrDiscard(ctx) changeSet := sap.NewChangeSet() if server.IsCloud() { // nolint err := r.waitForNetworkingSetup(ctx, server) if err != nil { log.Error(err, "waiting for network setup") return err } cs, err := r.applyServerManifests(ctx, server) if err != nil { log.Error(err, "fail to apply generated manifests from CouchDBServer") return err } changeSet.Add(*cs) // check that the admin secret exists or create a new admin secret cs, creds, err := r.reconcileAdminCreds(ctx, server) if err != nil { log.Error(err, "fail to reconcile admin creds") return err } changeSet.Add(*cs) err = r.couchDBPodsReady(ctx, server) if err != nil { // the cloud servers should always be up log.Error(err, "couchdb cloud server pods fail to be ready") return err } // send the finish_cluster command to complete the server creation for non-single node clusters if err := r.sendFinishClusterCommand(server, creds); err != nil { return recerr.New(fmt.Errorf("failed to setup cluster: %w", err), dsapi.FinishClusterFailedReason) } } else { cs, err := r.applyServerManifests(ctx, server) if err != nil { log.Error(err, "fail to apply generated manifests from CouchDBServer") return err } changeSet.Add(*cs) // TODO create a new secret per server cs, _, err = r.reconcileAdminCreds(ctx, server) if err != nil { log.Error(err, "fail to reconcile store admin creds") return err } changeSet.Add(*cs) err = r.couchDBPodsReady(ctx, server) if err != nil { if !couchDBNotReadyOrNotFound(err) { log.Error(err, "couchdb pods for store server fail to be ready") } return err } } i := inventory.New(inventory.FromSapChangeSet(changeSet)) if err := r.prune(ctx, server, i); err != nil { return recerr.New(err, dsapi.PruneFailed) } server.Status.Inventory = i return nil } // prune delete resources in inventory that are no longer created func (r *CouchServerReconciler) prune(ctx context.Context, server *dsapi.CouchDBServer, i *inventory.ResourceInventory) error { log := logr.FromContextOrDiscard(ctx) if server.Status.Inventory != nil { diff, err := inventory.Diff(server.Status.Inventory, i) if err != nil { return nil } if len(diff) > 0 { opt := sap.DefaultDeleteOptions() opt.Exclusions = map[string]string{ couchdb.IgnoreDeletionLabel: couchdb.LabelValueTrue, // Resources with SubstitutionLabel are managed by CouchDBPersistence couchdb.SubstitutionLabel: couchdb.LabelValueTrue, } changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt) if err != nil { return err } log.Info("pruned objects", "changeset", changeSet.ToMap()) } } return nil } // applyServerManifests dynamically generate couchdb server ini configuration as configmap func (r *CouchServerReconciler) applyServerManifests(ctx context.Context, server *dsapi.CouchDBServer) (*sap.ChangeSetEntry, recerr.Error) { log := logr.FromContextOrDiscard(ctx) serverCopy := server.DeepCopy() // cloud couch server UUIDs are unique per banner if !server.IsCloud() { //nolint complexity uid, err := server.GetNodeUID() if err != nil { log.Error(err, "could not retrieve/create couchdb uuid") return nil, recerr.New(err, status.DependencyInvalidReason) } uid = dsapi.CleanUID(uid) serverCopy.Spec.Base.UUID = &uid } configmap, err := ConfigMap(*serverCopy) if err != nil { log.Error(err, "unable to build couchdb manifests") return nil, recerr.NewStalled(fmt.Errorf("unable to build couchdb manifests"), dsapi.ServerSetupFailedReason) } couchdbManifests, err := unstructuredutil.ToUnstructured(configmap) if err != nil { log.Error(err, "unable to build couchdb manifests") return nil, recerr.NewStalled(fmt.Errorf("unable to build couchdb manifests"), dsapi.ServerSetupFailedReason) } opt := sap.DefaultApplyOptions() opt.Force = true changeSet, err := r.ResourceManager.Apply(ctx, couchdbManifests, opt) if err != nil { log.Error(err, "resource manager unable to apply couchdb manifests") return nil, recerr.New(err, dsapi.ServerSetupFailedReason) } return changeSet, nil } // reconcileAdminCreds creates the sever admin credentials for couchdb servers func (r *CouchServerReconciler) reconcileAdminCreds(ctx context.Context, server *dsapi.CouchDBServer) (*sap.ChangeSetEntry, *couchdb.AdminCredentials, recerr.Error) { // do nothing if creds not given in spec if server.Spec.Admin.Credentials == (corev1.SecretReference{}) { return nil, nil, recerr.NewStalled(fmt.Errorf("Admin.Credentials missing, cannot start server"), dsapi.CookieCreationFailedReason) } // get or generate credentials from/to specified secret if missing c := server.Spec.Admin.Credentials nsn := types.NamespacedName{ Name: c.Name, Namespace: server.Namespace, } var cs *sap.ChangeSetEntry creds := &couchdb.AdminCredentials{} secret, err := creds.FromSecret(ctx, r.Client, nsn) if err != nil { //nolint: nestif secret, err := creds.ToSecret(ctx, r.Client, nsn) if err != nil { return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason) } un, err := unstructuredutil.ToUnstructured(secret) if err != nil { return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason) } cs, err = r.ResourceManager.Apply(ctx, un, sap.ApplyOptions{}) if err != nil { return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason) } } else { // server admin secret can't be deleted, so update it if secret.Labels == nil { secret.Labels = map[string]string{} } if secret.Labels[couchdb.IgnoreDeletionLabel] != couchdb.LabelValueTrue { p := client.MergeFrom(secret.DeepCopy()) secret.Labels[couchdb.IgnoreDeletionLabel] = couchdb.LabelValueTrue err = r.Client.Patch(ctx, secret, p) if err != nil { return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason) } } cs, err = ExistingChangeSetEntry(secret) if err != nil { return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason) } } return cs, creds, nil } // couchDBPodsReady checks that couchdb StatefulSet pods are ready func (r *CouchServerReconciler) couchDBPodsReady(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error { // wait for the pods to come up // TODO: poll for these pods via the statefulset // https://github.com/kubernetes-sigs/cli-utils/tree/master/pkg/kstatus/polling // https://pkg.go.dev/sigs.k8s.io/cli-utils/pkg/kstatus // https://pkg.go.dev/sigs.k8s.io/cli-utils@v0.29.2/pkg/kstatus/polling sts, err := server.GetStatefulSetName() if err != nil { return recerr.New(err, status.DependencyInvalidReason) } podNum := server.Spec.Cluster.Nodes if podNum == 0 { podNum = 1 } for i := 0; i < podNum; i++ { podName := fmt.Sprintf("%s-%d", sts, i) ready, err := CheckPod(ctx, r.Client, server.Namespace, podName) switch { case err != nil && kerrors.IsNotFound(err): return recerr.NewWait(ErrPodsNotReady, status.DependencyNotFoundReason, r.Config.ServerNotReady) case err != nil: return recerr.New(err, status.DependencyInvalidReason) } if !ready { return recerr.NewWait(ErrPodsNotReady, status.DependencyNotReadyReason, r.Config.ServerNotReady) } } cc, err := couchDBServerClient(ctx, r.Client, r.Config, server) if err != nil { return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady) } defer cc.Close(ctx) pong, err := cc.Client.Ping(ctx) if err != nil { return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady) } if !pong { return recerr.NewWait(ErrPodsNotReady, status.DependencyNotReadyReason, r.Config.ServerNotReady) } return nil } // sendFinishClusterCommand for cloud couchdb in cluster mode send `finish_cluster` command func (r *CouchServerReconciler) sendFinishClusterCommand(server *dsapi.CouchDBServer, c *couchdb.AdminCredentials) error { if !server.Spec.Cluster.AutoFinish { return nil } // https://docs.couchdb.org/en/3.2.0/api/server/common.html#cluster-setup url := couchdb.FormatFinishClusterURI(string(c.Username), string(c.Password), server.Spec.URI, fmt.Sprint(r.Config.CouchDBPort)) // check if the server was already finished finished, err := checkFinishStatus(url) if err != nil { return err } // if it is already finished return if finished { return nil } // attempt to finish the server resp := &ServerSetupResponse{} err = httpRequest("POST", url, Payload{Action: "finish_cluster"}, &resp) if err != nil { return err } // check if it was finished finished, err = checkFinishStatus(url) if err != nil { return err } // if its not finished return a new error so we requeue and try again if !finished { return fmt.Errorf("error finishing the server") } // otherwise its finished so return without error return nil } // waitForNetworkingSetup check for ingress in ready state for cloud couchdb func (r *CouchServerReconciler) waitForNetworkingSetup(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error { // skip if empty if server.Spec.Ingress == (dsapi.Ingress{}) { return nil } log := ctrl.LoggerFrom(ctx) ingress := &netv1.Ingress{} err := r.Client.Get(ctx, types.NamespacedName{ Name: couchdb.CouchIngressName, // TODO add server.Spec.Ingress.Name Namespace: server.Namespace, }, ingress) if err != nil { log.Error(err, "failed to get Ingress") return recerr.New(fmt.Errorf("failed to get Ingress: %w", err), dsapi.ServerSetupFailedReason) } for _, ing := range ingress.Status.LoadBalancer.Ingress { if ing.IP != "" { return nil } } return recerr.NewWait(fmt.Errorf("ingress not ready, IP address not found"), status.DependencyNotReadyReason, r.Config.IngressNotReady) }