1 package sequel
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 backendv1 "edge-infra.dev/pkg/edge/apis/sequel/k8s/v1alpha2"
9 "edge-infra.dev/pkg/edge/controllers/sequel/dbctl"
10 "edge-infra.dev/pkg/edge/controllers/sequel/internal"
11 "edge-infra.dev/pkg/k8s/meta/status"
12 "edge-infra.dev/pkg/k8s/runtime/conditions"
13 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
14 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
15 "edge-infra.dev/pkg/k8s/runtime/inventory"
16 "edge-infra.dev/pkg/k8s/runtime/patch"
17 "edge-infra.dev/pkg/k8s/runtime/sap"
18 "edge-infra.dev/pkg/k8s/unstructured"
19 "edge-infra.dev/pkg/lib/gcp/secretmanager"
20
21 iamAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/iam/v1beta1"
22 sqlAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/sql/v1beta1"
23 "github.com/sethvargo/go-password/password"
24 corev1 "k8s.io/api/core/v1"
25
26 "k8s.io/apimachinery/pkg/runtime/schema"
27 "k8s.io/apimachinery/pkg/types"
28 ctrl "sigs.k8s.io/controller-runtime"
29 "sigs.k8s.io/controller-runtime/pkg/client"
30 "sigs.k8s.io/controller-runtime/pkg/controller"
31 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
32 )
33
34 const (
35
36 instanceUserPerms = "roles/cloudsql.instanceUser"
37
38 sqlClientPerms = "roles/cloudsql.client"
39
40 sqlClientPrefix = "sql-client"
41
42 sqlUserPrefix = "sql-user"
43 )
44
45
46
47
48
49
50
51
52
53
54
55 type UserReconciler struct {
56 *Reconciler
57
58 cfg *config
59
60 sm secretmanager.SecretManager
61
62 depRequeueInterval time.Duration
63
64 recorder *internal.MetricsRecorder
65 }
66
67
68
69 var userConditions = reconcile.Conditions{
70
71
72 Target: status.ReadyCondition,
73
74 Owned: []string{
75 status.DependenciesReadyCondition,
76 backendv1.HealthyCondition,
77 status.ReadyCondition,
78 status.ReconcilingCondition,
79 status.StalledCondition,
80 },
81
82 Summarize: []string{
83 backendv1.HealthyCondition,
84 status.DependenciesReadyCondition,
85 status.StalledCondition,
86 },
87
88 NegativePolarity: []string{
89 status.ReconcilingCondition,
90 status.StalledCondition,
91 },
92 }
93
94 func (u *UserReconciler) SetupWithManager(mgr ctrl.Manager) error {
95 if err := u.Reconciler.SetupWithManager(mgr); err != nil {
96 return err
97 }
98 u.recorder.InitMetrics()
99 return ctrl.NewControllerManagedBy(mgr).
100 For(&backendv1.DatabaseUser{}).
101 Owns(&sqlAPI.SQLUser{}).
102 Owns(&iamAPI.IAMServiceAccount{}).
103 Owns(&iamAPI.IAMPolicyMember{}).
104 WithOptions(controller.Options{
105 MaxConcurrentReconciles: u.cfg.concurrency,
106 }).Complete(u)
107 }
108
109 func (u *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
110 var (
111 reconcileStart = time.Now()
112 log = ctrl.LoggerFrom(ctx)
113 result = reconcile.ResultEmpty
114 databaseUser = &backendv1.DatabaseUser{}
115 )
116
117 if err := u.Get(ctx, req.NamespacedName, databaseUser); err != nil {
118 return ctrl.Result{}, client.IgnoreNotFound(err)
119 }
120
121 patcher := patch.NewSerialPatcher(databaseUser, u.Client)
122
123 defer func() {
124 if reconcileErr, ok := recErr.(recerr.Error); ok {
125 reconcileErr.ToCondition(databaseUser, status.ReadyCondition)
126 }
127
128 summarizer := reconcile.NewSummarizer(patcher)
129 res, recErr = summarizer.SummarizeAndPatch(ctx, databaseUser, []reconcile.SummarizeOption{
130 reconcile.WithConditions(u.Conditions),
131 reconcile.WithResult(result),
132 reconcile.WithError(recErr),
133 reconcile.WithIgnoreNotFound(),
134 reconcile.WithProcessors(
135 reconcile.RecordReconcileReq,
136 reconcile.RecordResult,
137 ),
138 reconcile.WithFieldOwner(u.Name),
139 }...)
140
141 u.Metrics.RecordReadiness(ctx, databaseUser)
142 u.Metrics.RecordReconciling(ctx, databaseUser)
143 u.Metrics.RecordStalled(ctx, databaseUser)
144 u.Metrics.RecordDuration(ctx, databaseUser, reconcileStart)
145 u.Metrics.RecordSuspend(ctx, databaseUser, databaseUser.Spec.Suspend)
146 }()
147
148 if err := u.ping(ctx); err != nil {
149 u.recorder.RecordDatabasePingFail(u.cfg.connectionName, err.Error())
150 result = reconcile.ResultRequeue
151 return
152 }
153
154 if !controllerutil.ContainsFinalizer(databaseUser, backendv1.SequelFinalizer) {
155 controllerutil.AddFinalizer(databaseUser, backendv1.SequelFinalizer)
156 result = reconcile.ResultRequeue
157 return
158 }
159
160 if !databaseUser.ObjectMeta.DeletionTimestamp.IsZero() {
161 log.Info("detected deletion, executing finalizer and revoking SQL permissions", "database user", databaseUser.Name)
162 result, recErr = u.finalizeDatabaseUser(ctx, databaseUser)
163 return
164 }
165
166 if databaseUser.Spec.Suspend {
167 log.Info("database user reconciliation is suspended", "suspended", "true")
168 return
169 }
170
171 log.Info("reconciling")
172 recErr = u.reconcile(ctx, patcher, databaseUser)
173 if recErr == nil {
174 result = reconcile.ResultSuccess
175 }
176
177 return
178 }
179
180 func (u *UserReconciler) finalizeDatabaseUser(ctx context.Context, databaseUser *backendv1.DatabaseUser) (reconcile.Result, error) {
181 log := ctrl.LoggerFrom(ctx)
182 if databaseUser.Spec.Type == backendv1.BuiltInUserType {
183 identifier := fmt.Sprintf("%s-sql-password", databaseUser.Name)
184 if err := u.sm.DeleteSecret(ctx, identifier); err != nil {
185 log.Error(err, "an error occurred deleting database user sql password from secret manager", "database user", databaseUser.Name)
186 u.recorder.RecordDatabaseBuiltInUserSecretDeletionFail(u.cfg.connectionName, databaseUser.Name, identifier, err.Error())
187 return reconcile.ResultRequeue, err
188 }
189 }
190
191 if exists, err := u.cfg.db.UserExistsInDatabase(ctx, databaseUser.PostgresUsername()); err != nil {
192 log.Error(err, "an error occurred when checking user exists in database", "database user", databaseUser.Name)
193 u.recorder.RecordDatabaseUserExistsInDatabaseSQLFail(u.cfg.connectionName, databaseUser.PostgresUsername(), err.Error())
194 return reconcile.ResultRequeue, err
195 } else if exists {
196 if err = u.cfg.db.SetPrivileges(ctx, databaseUser.PostgresUsername(), make(dbctl.PrivilegeMap, 0)); err != nil {
197 log.Error(err, "an error occurred revoking database permissions", "database user", databaseUser.Name)
198 u.recorder.RecordDatabaseRevokeSQLPermissionsFail(u.cfg.connectionName, databaseUser.Name, databaseUser.Spec.Type, err.Error())
199 return reconcile.ResultRequeue, err
200 }
201 if err := u.cfg.db.RevokeSequencePerms(ctx, databaseUser.PostgresUsername()); err != nil {
202 log.Error(err, "an error occurred revoking sequence permissions to user")
203 u.recorder.RecordDatabaseRevokeSequencePermissionsFail(u.cfg.connectionName, databaseUser.Name, databaseUser.Spec.Type, err.Error())
204 return reconcile.ResultRequeue, err
205 }
206 log.Info("database user privileges revoked", "postgres username", databaseUser.PostgresUsername(), "database user", databaseUser.Name)
207 } else {
208 log.Info("database user does not exist in database", "postgres username", databaseUser.PostgresUsername(), "database user", databaseUser.Name)
209 }
210
211 controllerutil.RemoveFinalizer(databaseUser, backendv1.SequelFinalizer)
212 log.Info("finalizer executed")
213 return reconcile.ResultEmpty, nil
214 }
215
216 func (u *UserReconciler) reconcile(
217 ctx context.Context,
218 patcher *patch.SerialPatcher,
219 user *backendv1.DatabaseUser,
220 ) recerr.Error {
221 objs := make([]*unstructured.Unstructured, 0)
222 dataObjs := make([]client.Object, 0)
223 log := ctrl.LoggerFrom(ctx)
224
225 if err := user.Validate(); err != nil {
226 u.recorder.RecordDatabaseUserSpecValidationFail(u.cfg.connectionName, user.Name, user.Spec.Type, err.Error())
227 return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), backendv1.InvalidSpecReason)
228 }
229
230 if err := reconcile.Progressing(ctx, user, patcher, u.PatchOpts()...); err != nil {
231 return recerr.New(err, backendv1.ReconcileFailedReason)
232 }
233
234 secretIdentifier := fmt.Sprintf("%s-sql-password", user.Name)
235 if err := u.reconcileDeps(ctx, user); client.IgnoreNotFound(err) != nil {
236 err.ToCondition(user, status.DependenciesReadyCondition)
237 return err
238 }
239
240
241
242
243
244
245
246 switch {
247 case user.Spec.Type == backendv1.BuiltInUserType:
248 if user.Spec.PasswordRef != nil && !user.Spec.PasswordRef.CreatePassword {
249 scrt := corev1.Secret{}
250 if err := u.Client.Get(ctx, types.NamespacedName{
251 Name: user.Spec.PasswordRef.Name,
252 Namespace: user.Spec.PasswordRef.Namespace,
253 }, &scrt); err != nil {
254 log.Error(err, "an error fetching database user password ref secret")
255 errs := recerr.New(err, backendv1.ReconcileFailedReason)
256 errs.ToCondition(user, backendv1.HealthyCondition)
257 return errs
258 }
259 if err := u.sm.AddSecret(ctx, secretIdentifier, scrt.Data["password"], map[string]string{
260 "user": user.Name,
261 "instance": user.Spec.InstanceRef.Name,
262 }, false, nil, ""); err != nil {
263 log.Error(err, "an error occurred storing sql user password in secret manager")
264 errs := recerr.New(err, backendv1.PasswordSecretCreationFailedReason)
265 errs.ToCondition(user, backendv1.HealthyCondition)
266 return errs
267 }
268 } else {
269 generatedPassword, err := password.Generate(20, 4, 4, false, true)
270 if err != nil {
271 log.Error(err, "an error occurred generating sql user password")
272 errs := recerr.New(err, backendv1.PasswordGenerationFailedReason)
273 errs.ToCondition(user, backendv1.HealthyCondition)
274 return errs
275 }
276 dataObjs = append(dataObjs, user.PasswordSecret(generatedPassword))
277 if err := u.sm.AddSecret(ctx, secretIdentifier, []byte(generatedPassword), map[string]string{
278 "user": user.Name,
279 "instance": user.Spec.InstanceRef.Name,
280 }, false, nil, ""); err != nil {
281 log.Error(err, "an error occurred storing sql user password in secret manager")
282 errs := recerr.New(err, backendv1.PasswordSecretCreationFailedReason)
283 errs.ToCondition(user, backendv1.HealthyCondition)
284 return errs
285 }
286 user.Spec.PasswordRef.Name = secretIdentifier
287 user.Spec.PasswordRef.Namespace = user.Namespace
288 }
289
290 case user.Spec.Type == backendv1.CloudSAUserType:
291 if user.Spec.ServiceAccount != nil && !user.Spec.ServiceAccount.CreateServiceAccount {
292 dataObjs = append(dataObjs, user.IAMPolicyMember(sqlUserPrefix, instanceUserPerms), user.IAMPolicyMember(sqlClientPrefix, sqlClientPerms))
293 } else {
294 user.Spec.ServiceAccount = &backendv1.ServiceAccount{
295 EmailRef: user.ServiceAccountEmail(),
296 IAMUsername: user.IAMServiceAccountName(),
297 CreateServiceAccount: user.Spec.ServiceAccount.CreateServiceAccount,
298 }
299 dataObjs = append(dataObjs, user.IAMServiceAccount(), user.IAMPolicyMember(sqlUserPrefix, instanceUserPerms), user.IAMPolicyMember(sqlClientPrefix, sqlClientPerms))
300 }
301 }
302
303 cldSQLUsr, err := user.CloudSQLUser()
304 if err != nil {
305 log.Error(err, "an error occurred creating a cloudsql user unstructured object")
306 errs := recerr.New(err, backendv1.ReconcileFailedReason)
307 errs.ToCondition(user, backendv1.HealthyCondition)
308 return errs
309 }
310
311 objs = append(objs, cldSQLUsr)
312
313 for _, data := range dataObjs {
314 user.Dependant(data.GetObjectKind().GroupVersionKind(), data.GetName())
315 unstructuredObj, err := unstructured.ToUnstructured(data)
316 if err != nil {
317 log.Error(err, "an error occurred converting to unstructured object")
318 unStructuredErr := recerr.New(err, backendv1.ReconcileFailedReason)
319 unStructuredErr.ToCondition(user, backendv1.ReconcileFailedReason)
320 return unStructuredErr
321 }
322 objs = append(objs, unstructuredObj)
323 }
324
325 if err := u.apply(ctx, user, objs); err != nil {
326 log.Error(err, "an error occurred applying database user manifests")
327 err.ToCondition(user, backendv1.HealthyCondition)
328 return err
329 }
330
331 if err := u.cfg.db.SetPrivileges(ctx, user.PostgresUsername(), user.Privileges()); err != nil {
332 log.Error(err, "an error occurred executing sql grant statement")
333 u.recorder.RecordDatabaseSetSQLPermissionsFail(u.cfg.connectionName, user.Name, user.Spec.Type, err.Error())
334 permissionErr := recerr.New(err, backendv1.PermissionGrantFailedReason)
335 permissionErr.ToCondition(user, backendv1.HealthyCondition)
336
337 return permissionErr
338 }
339
340 if err := u.cfg.db.GrantSequencePerms(ctx, user.PostgresUsername()); err != nil {
341 log.Error(err, "an error occurred granting sequence permissions to user")
342 u.recorder.RecordDatabaseSetSequencePermissionsFail(u.cfg.connectionName, user.Name, user.Spec.Type, err.Error())
343 setPrivErr := recerr.New(err, backendv1.PermissionGrantFailedReason)
344 setPrivErr.ToCondition(user, backendv1.PermissionGrantFailedReason)
345 return setPrivErr
346 }
347
348 u.recorder.RecordDatabaseUserSuccess(user.Name, user.Spec.Type)
349 conditions.MarkTrue(user, backendv1.HealthyCondition, status.SucceededReason, "Created CloudSQL User: %s", user.Name)
350 return nil
351 }
352
353 func (u *UserReconciler) reconcileDeps(ctx context.Context, user *backendv1.DatabaseUser) recerr.Error {
354 switch {
355 case len(user.Spec.DependsOn) == 0:
356 conditions.Delete(user, status.DependenciesReadyCondition)
357 return nil
358 }
359
360 unready := make([]string, 0)
361 ready := make([]string, 0)
362
363 resourcesWithNoStatus := map[string]bool{"Secret": true}
364
365 for _, dep := range user.Spec.DependsOn {
366 dependent := unstructured.New(schema.GroupVersion{
367 Group: dep.GVK.Group,
368 Version: dep.GVK.Version,
369 }, dep.GVK.Kind, user.Namespace, dep.ObjectReference.Name)
370 if err := u.Client.Get(ctx, client.ObjectKey{Name: dep.ObjectReference.Name, Namespace: user.Namespace}, dependent); err != nil {
371 return recerr.NewWait(err, status.DependencyNotFoundReason, user.RetryInterval())
372 }
373 _, exists := resourcesWithNoStatus[dep.GVK.Kind]
374 if exists {
375 ready = append(ready, fmt.Sprintf("%s/%s", dep.GVK.Kind, dependent.GetName()))
376 } else if !IsUpToDate(dependent) {
377 unready = append(unready, fmt.Sprintf("%s/%s", dep.GVK.Kind, dependent.GetName()))
378 }
379 }
380
381 u.recorder.RecordDatabaseUserDependencyNotReady(u.cfg.connectionName, user.Name, user.Spec.Type, unready, ready)
382
383 if len(unready) == 0 {
384 conditions.MarkTrue(
385 user,
386 status.DependenciesReadyCondition,
387 status.DependencyReadyReason,
388 "Dependencies up to date",
389 )
390 return nil
391 }
392
393 return recerr.NewWait(
394 fmt.Errorf("%d/%d dependencies ready: waiting for %s",
395 len(user.Spec.DependsOn)-len(unready), len(user.Spec.DependsOn), unready),
396 status.DependencyNotReadyReason,
397 u.depRequeueInterval,
398 )
399 }
400
401 func IsUpToDate(u *unstructured.Unstructured) bool {
402 if u.Object["status"] != nil {
403 status := u.Object["status"].(map[string]interface{})
404 for _, condition := range status["conditions"].([]interface{}) {
405 parsedCondition := condition.(map[string]interface{})
406 if parsedCondition["type"] == "Ready" && parsedCondition["status"] == "True" {
407 return true
408 }
409 }
410 }
411 return false
412 }
413
414 func (u *UserReconciler) apply(ctx context.Context, user *backendv1.DatabaseUser, objs []*unstructured.Unstructured) recerr.Error {
415 mgr := u.ResourceManager
416 mgr.SetOwnerLabels(objs, u.Name, "")
417 log := ctrl.LoggerFrom(ctx)
418
419 changeset, err := mgr.ApplyAll(ctx, objs, sap.ApplyOptions{
420 Force: user.Spec.Force,
421 WaitTimeout: user.Spec.Timeout.Duration,
422 })
423 if err != nil {
424 return recerr.New(err, backendv1.ApplyFailedReason)
425 }
426
427 log.Info("applied objects", "changeset", changeset.ToMap())
428
429 newInv := inventory.New(inventory.FromSapChangeSet(changeset))
430 if err := u.prune(ctx, user, newInv); err != nil {
431 return recerr.New(err, backendv1.PruneFailedReason)
432 }
433
434 user.Status.Inventory = newInv
435
436 if err := mgr.WaitForSet(ctx, changeset.ToObjMetadataSet(), sap.WaitOptions{
437 Timeout: user.Spec.Timeout.Duration,
438 }); err != nil {
439 err := recerr.NewWait(err, backendv1.ReconcileFailedReason, user.RetryInterval())
440 return err
441 }
442
443 return nil
444 }
445
446 func (u *UserReconciler) prune(ctx context.Context, user *backendv1.DatabaseUser, inv *inventory.ResourceInventory) error {
447 log := ctrl.LoggerFrom(ctx).WithName("prune")
448 switch {
449 case !user.Spec.Prune:
450 log.Info("pruning is disabled")
451 return nil
452 case user.Status.Inventory == nil:
453 return nil
454 default:
455 diff, err := user.Status.Inventory.Diff(inv)
456 if err != nil || len(diff) == 0 {
457 return err
458 }
459 deleted, err := u.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
460 if err != nil {
461 return err
462 }
463 conditions.Delete(user, status.DependenciesReadyCondition)
464 log.Info("pruned", "changeset", deleted.ToMap())
465 return nil
466 }
467 }
468
469 func (u *UserReconciler) ping(ctx context.Context) error {
470 log := ctrl.LoggerFrom(ctx)
471 if err := u.cfg.db.PingContext(ctx); err != nil {
472 log.Error(err, "database ping failed, reconnecting to database...")
473 conn, err := u.cfg.connectDatabase()
474 if err != nil {
475 log.Error(err, "failed reconnecting to database")
476 return err
477 }
478 u.cfg.db = dbctl.New(conn)
479 }
480 return nil
481 }
482
View as plain text