package sequel import ( "context" "fmt" "time" backendv1 "edge-infra.dev/pkg/edge/apis/sequel/k8s/v1alpha2" "edge-infra.dev/pkg/edge/controllers/sequel/dbctl" "edge-infra.dev/pkg/edge/controllers/sequel/internal" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/k8s/unstructured" "edge-infra.dev/pkg/lib/gcp/secretmanager" iamAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/iam/v1beta1" sqlAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/sql/v1beta1" "github.com/sethvargo/go-password/password" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) const ( // instanceUserPerms IAM role for Cloud SQL Users. instanceUserPerms = "roles/cloudsql.instanceUser" // sqlClientPerms IAM role for Cloud SQL Database connection permission. sqlClientPerms = "roles/cloudsql.client" // sqlClientPrefix prefix for the sqlClientPerms IAMPolicy Member. sqlClientPrefix = "sql-client" // sqlUserPrefix prefix for the instanceUserPerms IAMPolicy Member. sqlUserPrefix = "sql-user" ) // +kubebuilder:rbac:groups=backend.edge.ncr.com,resources=databaseusers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=backend.edge.ncr.com,resources=databaseusers/status,verbs=get;update;patch // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iampolicymembers,verbs=get;create;list;watch;patch;delete // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iampolicymembers/status,verbs=get;watch // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iamserviceaccounts,verbs=get;create;list;watch;patch;delete // +kubebuilder:rbac:groups="iam.cnrm.cloud.google.com",resources=iamserviceaccounts/status,verbs=get;watch // +kubebuilder:rbac:groups="sql.cnrm.cloud.google.com",resources=sqlusers,verbs=get;create;list;watch;patch;delete // +kubebuilder:rbac:groups="sql.cnrm.cloud.google.com",resources=sqlusers/status,verbs=get;watch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch type UserReconciler struct { *Reconciler // cfg is the controller configuration. cfg *config // sm is the secret manager client to interact with GCP Secret Manager. sm secretmanager.SecretManager // depRequeueInterval interval to requeue when dependencies are not ready. depRequeueInterval time.Duration // recorder custom metrics recorder. recorder *internal.MetricsRecorder } // userConditions is a list on reconciliation conditions/states that a DatabaseUser custom resource // will experience in the reconciliation loop. var userConditions = reconcile.Conditions{ // Target is the final condition that indicates complete health of the DatabaseUser custom resource. // When all conditions are summarized. Target: status.ReadyCondition, // Owned are conditions that can be encountered by a DatabaseUser custom resource. Owned: []string{ status.DependenciesReadyCondition, backendv1.HealthyCondition, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, // Summarize are conditions that sum up to make up the target condition when summarized. Summarize: []string{ backendv1.HealthyCondition, status.DependenciesReadyCondition, status.StalledCondition, }, // NegativePolarity are conditions that indicate negative or unhealthy states of the DatabaseUser custom resource. NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } func (u *UserReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := u.Reconciler.SetupWithManager(mgr); err != nil { return err } u.recorder.InitMetrics() return ctrl.NewControllerManagedBy(mgr). For(&backendv1.DatabaseUser{}). Owns(&sqlAPI.SQLUser{}). Owns(&iamAPI.IAMServiceAccount{}). Owns(&iamAPI.IAMPolicyMember{}). WithOptions(controller.Options{ MaxConcurrentReconciles: u.cfg.concurrency, }).Complete(u) } func (u *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { var ( reconcileStart = time.Now() log = ctrl.LoggerFrom(ctx) result = reconcile.ResultEmpty databaseUser = &backendv1.DatabaseUser{} ) if err := u.Get(ctx, req.NamespacedName, databaseUser); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } patcher := patch.NewSerialPatcher(databaseUser, u.Client) defer func() { if reconcileErr, ok := recErr.(recerr.Error); ok { reconcileErr.ToCondition(databaseUser, status.ReadyCondition) } summarizer := reconcile.NewSummarizer(patcher) res, recErr = summarizer.SummarizeAndPatch(ctx, databaseUser, []reconcile.SummarizeOption{ reconcile.WithConditions(u.Conditions), reconcile.WithResult(result), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordReconcileReq, reconcile.RecordResult, ), reconcile.WithFieldOwner(u.Name), }...) u.Metrics.RecordReadiness(ctx, databaseUser) u.Metrics.RecordReconciling(ctx, databaseUser) u.Metrics.RecordStalled(ctx, databaseUser) u.Metrics.RecordDuration(ctx, databaseUser, reconcileStart) u.Metrics.RecordSuspend(ctx, databaseUser, databaseUser.Spec.Suspend) }() if err := u.ping(ctx); err != nil { u.recorder.RecordDatabasePingFail(u.cfg.connectionName, err.Error()) result = reconcile.ResultRequeue return } if !controllerutil.ContainsFinalizer(databaseUser, backendv1.SequelFinalizer) { controllerutil.AddFinalizer(databaseUser, backendv1.SequelFinalizer) result = reconcile.ResultRequeue return } if !databaseUser.ObjectMeta.DeletionTimestamp.IsZero() { log.Info("detected deletion, executing finalizer and revoking SQL permissions", "database user", databaseUser.Name) result, recErr = u.finalizeDatabaseUser(ctx, databaseUser) return } if databaseUser.Spec.Suspend { log.Info("database user reconciliation is suspended", "suspended", "true") return } log.Info("reconciling") recErr = u.reconcile(ctx, patcher, databaseUser) if recErr == nil { result = reconcile.ResultSuccess } return } func (u *UserReconciler) finalizeDatabaseUser(ctx context.Context, databaseUser *backendv1.DatabaseUser) (reconcile.Result, error) { log := ctrl.LoggerFrom(ctx) if databaseUser.Spec.Type == backendv1.BuiltInUserType { identifier := fmt.Sprintf("%s-sql-password", databaseUser.Name) if err := u.sm.DeleteSecret(ctx, identifier); err != nil { log.Error(err, "an error occurred deleting database user sql password from secret manager", "database user", databaseUser.Name) u.recorder.RecordDatabaseBuiltInUserSecretDeletionFail(u.cfg.connectionName, databaseUser.Name, identifier, err.Error()) return reconcile.ResultRequeue, err } } if exists, err := u.cfg.db.UserExistsInDatabase(ctx, databaseUser.PostgresUsername()); err != nil { log.Error(err, "an error occurred when checking user exists in database", "database user", databaseUser.Name) u.recorder.RecordDatabaseUserExistsInDatabaseSQLFail(u.cfg.connectionName, databaseUser.PostgresUsername(), err.Error()) return reconcile.ResultRequeue, err } else if exists { if err = u.cfg.db.SetPrivileges(ctx, databaseUser.PostgresUsername(), make(dbctl.PrivilegeMap, 0)); err != nil { log.Error(err, "an error occurred revoking database permissions", "database user", databaseUser.Name) u.recorder.RecordDatabaseRevokeSQLPermissionsFail(u.cfg.connectionName, databaseUser.Name, databaseUser.Spec.Type, err.Error()) return reconcile.ResultRequeue, err } if err := u.cfg.db.RevokeSequencePerms(ctx, databaseUser.PostgresUsername()); err != nil { log.Error(err, "an error occurred revoking sequence permissions to user") u.recorder.RecordDatabaseRevokeSequencePermissionsFail(u.cfg.connectionName, databaseUser.Name, databaseUser.Spec.Type, err.Error()) return reconcile.ResultRequeue, err } log.Info("database user privileges revoked", "postgres username", databaseUser.PostgresUsername(), "database user", databaseUser.Name) } else { log.Info("database user does not exist in database", "postgres username", databaseUser.PostgresUsername(), "database user", databaseUser.Name) } controllerutil.RemoveFinalizer(databaseUser, backendv1.SequelFinalizer) log.Info("finalizer executed") return reconcile.ResultEmpty, nil } func (u *UserReconciler) reconcile( ctx context.Context, patcher *patch.SerialPatcher, user *backendv1.DatabaseUser, ) recerr.Error { objs := make([]*unstructured.Unstructured, 0) dataObjs := make([]client.Object, 0) log := ctrl.LoggerFrom(ctx) if err := user.Validate(); err != nil { u.recorder.RecordDatabaseUserSpecValidationFail(u.cfg.connectionName, user.Name, user.Spec.Type, err.Error()) return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), backendv1.InvalidSpecReason) } if err := reconcile.Progressing(ctx, user, patcher, u.PatchOpts()...); err != nil { return recerr.New(err, backendv1.ReconcileFailedReason) } secretIdentifier := fmt.Sprintf("%s-sql-password", user.Name) if err := u.reconcileDeps(ctx, user); client.IgnoreNotFound(err) != nil { err.ToCondition(user, status.DependenciesReadyCondition) return err } //Supported List //1) Builtin User - Provide password in k8s secret //2) Builtin User - Controller generates password and creates k8s secret //3) SA User - Provide exisiting service account //4) SA User - Controller generates service account switch { case user.Spec.Type == backendv1.BuiltInUserType: if user.Spec.PasswordRef != nil && !user.Spec.PasswordRef.CreatePassword { //nolint: nestif scrt := corev1.Secret{} if err := u.Client.Get(ctx, types.NamespacedName{ Name: user.Spec.PasswordRef.Name, Namespace: user.Spec.PasswordRef.Namespace, }, &scrt); err != nil { log.Error(err, "an error fetching database user password ref secret") errs := recerr.New(err, backendv1.ReconcileFailedReason) errs.ToCondition(user, backendv1.HealthyCondition) return errs } if err := u.sm.AddSecret(ctx, secretIdentifier, scrt.Data["password"], map[string]string{ "user": user.Name, "instance": user.Spec.InstanceRef.Name, }, false, nil, ""); err != nil { log.Error(err, "an error occurred storing sql user password in secret manager") errs := recerr.New(err, backendv1.PasswordSecretCreationFailedReason) errs.ToCondition(user, backendv1.HealthyCondition) return errs } } else { generatedPassword, err := password.Generate(20, 4, 4, false, true) if err != nil { log.Error(err, "an error occurred generating sql user password") errs := recerr.New(err, backendv1.PasswordGenerationFailedReason) errs.ToCondition(user, backendv1.HealthyCondition) return errs } dataObjs = append(dataObjs, user.PasswordSecret(generatedPassword)) if err := u.sm.AddSecret(ctx, secretIdentifier, []byte(generatedPassword), map[string]string{ "user": user.Name, "instance": user.Spec.InstanceRef.Name, }, false, nil, ""); err != nil { log.Error(err, "an error occurred storing sql user password in secret manager") errs := recerr.New(err, backendv1.PasswordSecretCreationFailedReason) errs.ToCondition(user, backendv1.HealthyCondition) return errs } user.Spec.PasswordRef.Name = secretIdentifier user.Spec.PasswordRef.Namespace = user.Namespace } case user.Spec.Type == backendv1.CloudSAUserType: if user.Spec.ServiceAccount != nil && !user.Spec.ServiceAccount.CreateServiceAccount { dataObjs = append(dataObjs, user.IAMPolicyMember(sqlUserPrefix, instanceUserPerms), user.IAMPolicyMember(sqlClientPrefix, sqlClientPerms)) } else { user.Spec.ServiceAccount = &backendv1.ServiceAccount{ EmailRef: user.ServiceAccountEmail(), IAMUsername: user.IAMServiceAccountName(), CreateServiceAccount: user.Spec.ServiceAccount.CreateServiceAccount, } dataObjs = append(dataObjs, user.IAMServiceAccount(), user.IAMPolicyMember(sqlUserPrefix, instanceUserPerms), user.IAMPolicyMember(sqlClientPrefix, sqlClientPerms)) } } cldSQLUsr, err := user.CloudSQLUser() if err != nil { log.Error(err, "an error occurred creating a cloudsql user unstructured object") errs := recerr.New(err, backendv1.ReconcileFailedReason) errs.ToCondition(user, backendv1.HealthyCondition) return errs } objs = append(objs, cldSQLUsr) for _, data := range dataObjs { user.Dependant(data.GetObjectKind().GroupVersionKind(), data.GetName()) unstructuredObj, err := unstructured.ToUnstructured(data) if err != nil { log.Error(err, "an error occurred converting to unstructured object") unStructuredErr := recerr.New(err, backendv1.ReconcileFailedReason) unStructuredErr.ToCondition(user, backendv1.ReconcileFailedReason) return unStructuredErr } objs = append(objs, unstructuredObj) } if err := u.apply(ctx, user, objs); err != nil { log.Error(err, "an error occurred applying database user manifests") err.ToCondition(user, backendv1.HealthyCondition) return err } if err := u.cfg.db.SetPrivileges(ctx, user.PostgresUsername(), user.Privileges()); err != nil { log.Error(err, "an error occurred executing sql grant statement") u.recorder.RecordDatabaseSetSQLPermissionsFail(u.cfg.connectionName, user.Name, user.Spec.Type, err.Error()) permissionErr := recerr.New(err, backendv1.PermissionGrantFailedReason) permissionErr.ToCondition(user, backendv1.HealthyCondition) return permissionErr } if err := u.cfg.db.GrantSequencePerms(ctx, user.PostgresUsername()); err != nil { log.Error(err, "an error occurred granting sequence permissions to user") u.recorder.RecordDatabaseSetSequencePermissionsFail(u.cfg.connectionName, user.Name, user.Spec.Type, err.Error()) setPrivErr := recerr.New(err, backendv1.PermissionGrantFailedReason) setPrivErr.ToCondition(user, backendv1.PermissionGrantFailedReason) return setPrivErr } u.recorder.RecordDatabaseUserSuccess(user.Name, user.Spec.Type) conditions.MarkTrue(user, backendv1.HealthyCondition, status.SucceededReason, "Created CloudSQL User: %s", user.Name) return nil } func (u *UserReconciler) reconcileDeps(ctx context.Context, user *backendv1.DatabaseUser) recerr.Error { switch { case len(user.Spec.DependsOn) == 0: conditions.Delete(user, status.DependenciesReadyCondition) return nil } unready := make([]string, 0) ready := make([]string, 0) resourcesWithNoStatus := map[string]bool{"Secret": true} for _, dep := range user.Spec.DependsOn { dependent := unstructured.New(schema.GroupVersion{ Group: dep.GVK.Group, Version: dep.GVK.Version, }, dep.GVK.Kind, user.Namespace, dep.ObjectReference.Name) if err := u.Client.Get(ctx, client.ObjectKey{Name: dep.ObjectReference.Name, Namespace: user.Namespace}, dependent); err != nil { return recerr.NewWait(err, status.DependencyNotFoundReason, user.RetryInterval()) } _, exists := resourcesWithNoStatus[dep.GVK.Kind] if exists { ready = append(ready, fmt.Sprintf("%s/%s", dep.GVK.Kind, dependent.GetName())) } else if !IsUpToDate(dependent) { unready = append(unready, fmt.Sprintf("%s/%s", dep.GVK.Kind, dependent.GetName())) } } u.recorder.RecordDatabaseUserDependencyNotReady(u.cfg.connectionName, user.Name, user.Spec.Type, unready, ready) if len(unready) == 0 { conditions.MarkTrue( user, status.DependenciesReadyCondition, status.DependencyReadyReason, "Dependencies up to date", ) return nil } return recerr.NewWait( fmt.Errorf("%d/%d dependencies ready: waiting for %s", len(user.Spec.DependsOn)-len(unready), len(user.Spec.DependsOn), unready), status.DependencyNotReadyReason, u.depRequeueInterval, ) } func IsUpToDate(u *unstructured.Unstructured) bool { if u.Object["status"] != nil { status := u.Object["status"].(map[string]interface{}) for _, condition := range status["conditions"].([]interface{}) { parsedCondition := condition.(map[string]interface{}) if parsedCondition["type"] == "Ready" && parsedCondition["status"] == "True" { return true } } } return false } func (u *UserReconciler) apply(ctx context.Context, user *backendv1.DatabaseUser, objs []*unstructured.Unstructured) recerr.Error { mgr := u.ResourceManager mgr.SetOwnerLabels(objs, u.Name, "") log := ctrl.LoggerFrom(ctx) changeset, err := mgr.ApplyAll(ctx, objs, sap.ApplyOptions{ Force: user.Spec.Force, WaitTimeout: user.Spec.Timeout.Duration, }) if err != nil { return recerr.New(err, backendv1.ApplyFailedReason) } log.Info("applied objects", "changeset", changeset.ToMap()) newInv := inventory.New(inventory.FromSapChangeSet(changeset)) if err := u.prune(ctx, user, newInv); err != nil { return recerr.New(err, backendv1.PruneFailedReason) } user.Status.Inventory = newInv if err := mgr.WaitForSet(ctx, changeset.ToObjMetadataSet(), sap.WaitOptions{ Timeout: user.Spec.Timeout.Duration, }); err != nil { err := recerr.NewWait(err, backendv1.ReconcileFailedReason, user.RetryInterval()) return err } return nil } func (u *UserReconciler) prune(ctx context.Context, user *backendv1.DatabaseUser, inv *inventory.ResourceInventory) error { log := ctrl.LoggerFrom(ctx).WithName("prune") switch { case !user.Spec.Prune: log.Info("pruning is disabled") return nil case user.Status.Inventory == nil: return nil default: diff, err := user.Status.Inventory.Diff(inv) if err != nil || len(diff) == 0 { return err } deleted, err := u.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions()) if err != nil { return err } conditions.Delete(user, status.DependenciesReadyCondition) log.Info("pruned", "changeset", deleted.ToMap()) return nil } } func (u *UserReconciler) ping(ctx context.Context) error { log := ctrl.LoggerFrom(ctx) if err := u.cfg.db.PingContext(ctx); err != nil { log.Error(err, "database ping failed, reconnecting to database...") conn, err := u.cfg.connectDatabase() if err != nil { log.Error(err, "failed reconnecting to database") return err } u.cfg.db = dbctl.New(conn) } return nil }