package provision import ( "context" "fmt" "time" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "edge-infra.dev/pkg/k8s/runtime/conditions" edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/lib/fog" v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1" "edge-infra.dev/pkg/sds/etcd/operator/internal/config" "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics" "edge-infra.dev/pkg/sds/etcd/operator/internal/resources" ) var ( caCertPath = "/etc/kubernetes/pki/etcd/ca.crt" caKeyPath = "/etc/kubernetes/pki/etcd/ca.key" operatorNamespace = "etcd-operator" ) // Conditions defines the relation between conditions and the // Reconciler var Conditions = edgereconcile.Conditions{ Target: v1etcd.Ready, Owned: []string{ v1etcd.Provisioned, v1etcd.InProgress, v1etcd.Reconciling, }, Summarize: []string{ v1etcd.Installed, v1etcd.InProgress, v1etcd.Provisioned, }, NegativePolarity: []string{ v1etcd.InProgress, v1etcd.Reconciling, }, } type Reconciler struct { config.Config edgereconcile.Conditions *metrics.Metrics } type summarizeOptions struct { patcher *patch.SerialPatcher handlers *Handlers recErr error } type resultOptions struct { startTime time.Time handlers *Handlers recErr error } // Handlers holds the handlers to manage the kubernetes resources // that the Reconciler depends on type Handlers struct { member *resources.EtcdMemberHandler secret *resources.SecretHandler } // SetupWithManager builds the controller for the manager func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error { r.Config = cfg r.Conditions = Conditions r.Metrics = metrics.New(r.Mgr, "provision") r.Metrics.Custom.Run(initialMembers) return ctrl.NewControllerManagedBy(r.Mgr). For(&v1etcd.EtcdMember{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})). WithEventFilter(r.createEventFilter()). Complete(r) } // createEventFilter filters kubernetes events so that we only reconcile on EtcdMember // create events func (r *Reconciler) createEventFilter() predicate.Predicate { return predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { etcdMember := e.Object.(*v1etcd.EtcdMember) handler := resources.NewEtcdMemberHandlerBuilder().Build() handler.DeepCopyFrom(etcdMember) // only reconcile on EtcdMember creations if the Secret has not yet // been successfully created return !handler.IsProvisioned() }, UpdateFunc: func(_ event.UpdateEvent) bool { return false }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // Reconcile is the top-level reconcilliation function for EtcdMember object create events. // // The required certificates for etcd, alongside the files required for LAN outage mode to // operate are compiled into a secret, ready for the worker to consume. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { log := fog.FromContext(ctx).WithName(r.Name) ctx = fog.IntoContext(ctx, log) startTime := time.Now() log.V(0).Info("started Secret provision reconciliation loop") defer log.V(0).Info("ended Secret provision reconciliation loop") handlers, err := r.generateHandlers(ctx, req) if err != nil { return ctrl.Result{}, err } if pass := r.checkPreconditions(ctx, handlers); !pass { return ctrl.Result{}, nil } if err := r.setConditions(ctx, handlers); err != nil { return ctrl.Result{}, err } patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client()) defer func() { summarizeOpts := summarizeOptions{ patcher, handlers, recErr, } _, recErr = r.summarize(ctx, summarizeOpts) resultOpts := resultOptions{ startTime, handlers, recErr, } r.recordResults(ctx, resultOpts) }() if err := r.reconcile(ctx, handlers); err != nil { return ctrl.Result{}, err } log.V(0).Info("secret provisioned successfully", "eoaudit", "") // set the Provisioned condition to "True" once the Secret is successfully created conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedSuccessReason, "%s", v1etcd.ProvisionedSuccessMessage) return ctrl.Result{}, nil } // setup sets up the Secret and EtcdMember handlers for the reconciler func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) { handlers := &Handlers{} if err := r.setSecretHandler(ctx, req, handlers); err != nil { return nil, err } if err := r.setMemberHandler(ctx, req, handlers); err != nil { return nil, err } return handlers, nil } // setSecretHandler sets the Secret handler for the reconciler and retrieves // the latest Secret object from the kubernetes API server func (r *Reconciler) setSecretHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error { key := req.NamespacedName key.Namespace = operatorNamespace handlers.secret = resources.NewSecretHandlerBuilder(). WithClient(r.KubeRetryClient). WithKey(key). HandlesSecret(). Named(req.Name). InNamespace(operatorNamespace). Build() // reconcile the local copy of the Secret with updated data from the remote copy err := handlers.secret.ReconcileLocal(ctx) if client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to retrieve node: %w", err) } // if the Secret exists, set Found to true. This is used as a behavioural flag if err == nil { handlers.secret.Found = true } return nil } // setMemberHandler sets the member handler for the reconciler and retrieves // the latest EtcdMember object from the kubernetes API server func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error { handlers.member = resources.NewEtcdMemberHandlerBuilder(). WithClient(r.KubeRetryClient). WithKey(req.NamespacedName). HandlesEtcdMember(). Named(req.Name). Build() // reconcile the local copy of the EtcdMember with updated data from the remote copy err := handlers.member.ReconcileLocal(ctx) if client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to retrieve EtcdMember: %w", err) } // if the EtcdMember exists, set Found to true. This is used as a behavioural flag if err == nil { handlers.member.Found = true } return nil } // checkPreconditions checks the preconditions for the reconciler to ensure the EtcdMember // still exists and is not suspended func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) bool { log := fog.FromContext(ctx) // if the EtcdMember does not exist, we do not need to reconcile if !handlers.member.Found { log.V(0).Info("EtcdMember not found") return false } if handlers.member.IsSuspended() { log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true") return false } if handlers.member.IsProvisioned() { log.V(0).Info("Secret is already prepared") return false } return true } // setConditions sets the 'Reconciling' and 'InProgress' conditions on the EtcdMember func (r *Reconciler) setConditions(ctx context.Context, handlers *Handlers) error { log := fog.FromContext(ctx) // set the Reconciling and InProgress conditions to "True". The Reconciling // condition will be unset once the reconcile is successfully completed. The // InProgress condition will remain until the SecretReconciler removes it. // This is to ensure that the Ready condition is not prematurely set to "True" // by the summarizer return handlers.member.WithReconcileRemote(ctx, func(e *v1etcd.EtcdMember) { if _, ok := e.GetCondition(v1etcd.Reconciling); ok { return } log.V(1).Info("setting 'Reconciling' condition") conditions.MarkTrue(e, v1etcd.Reconciling, v1etcd.ProvisionedReconcilingReason, "%s", v1etcd.ProvisionedReconcilingMessage) log.V(1).Info("setting 'InProgress' condition") conditions.MarkTrue(e, v1etcd.InProgress, v1etcd.InProgressReason, "%s", v1etcd.InProgressMessage) }) } // reconcile creates the secret containing the required certificates for etcd and then patches // the EtcdMember conditions with the result of the reconciliation. If a secret already exists, // it will first be deleted. func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error { content, err := r.secretContent(handlers) if err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedFailedReason, "%s", v1etcd.ProvisionedContentFailedMessage) return fmt.Errorf("%s: %w", v1etcd.ProvisionedContentFailedMessage, err) } // delete the old secret if one exists if err := client.IgnoreNotFound(handlers.secret.DeleteRemote(ctx)); err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedFailedReason, "%s", v1etcd.ProvisionedDeleteFailedMessage) return fmt.Errorf("%s: %w", v1etcd.ProvisionedDeleteFailedMessage, err) } r.generateSecret(content, handlers) if err := handlers.secret.CreateRemote(ctx); err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Provisioned, v1etcd.ProvisionedFailedReason, "%s", v1etcd.ProvisionedCreateFailedMessage) return fmt.Errorf("%s: %w", v1etcd.ProvisionedCreateFailedMessage, err) } return nil } // summarize summarizes the result of the reconcile and patches the EtcdMember object func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) { s := edgereconcile.NewSummarizer(opts.patcher) return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember, edgereconcile.WithConditions(r.Conditions), edgereconcile.WithResult(edgereconcile.ResultEmpty), edgereconcile.WithError(opts.recErr), edgereconcile.WithIgnoreNotFound(), edgereconcile.WithProcessors( edgereconcile.RecordReconcileReq, edgereconcile.RecordResult, UnsetReconciling, ), edgereconcile.WithFieldOwner(r.Name), ) } // unsetReconciling is a ResultProcessor that unsets the Reconciling condition // on an EtcdMember func UnsetReconciling(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) { log := fog.FromContext(ctx) member, ok := obj.(*v1etcd.EtcdMember) if !ok { return } if err == nil { log.V(1).Info("removing 'Reconciling' condition") conditions.Delete(member, v1etcd.Reconciling) } } // recordResults takes the results of the reconcile and records the metrics for them func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) { r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime) r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember) r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember) }