package install import ( "context" "fmt" "os" "time" "github.com/spf13/afero" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "edge-infra.dev/pkg/k8s/runtime/conditions" edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/lib/fog" v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1" "edge-infra.dev/pkg/sds/etcd/operator/internal/config" "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics" "edge-infra.dev/pkg/sds/etcd/operator/internal/resources" ) var ( OperatorFilewall = "-p tcp --dport 6443 -j REJECT" operatorNamespace = "etcd-operator" lanOutageFlagFile = "/zynstra/config/.lan_outage_mode" ) // Conditions defines the relation between conditions and the // Reconciler var Conditions = edgereconcile.Conditions{ Target: v1etcd.Ready, Owned: []string{ v1etcd.Installed, v1etcd.InProgress, v1etcd.Reconciling, }, Summarize: []string{ v1etcd.Installed, v1etcd.InProgress, v1etcd.Provisioned, }, NegativePolarity: []string{ v1etcd.InProgress, v1etcd.Reconciling, }, } type containerImageVersions struct { Containers map[string]string `yaml:"containers"` } type Reconciler struct { config.Config edgereconcile.Conditions *metrics.Metrics } type summarizeOptions struct { patcher *patch.SerialPatcher handlers *Handlers recErr error } type resultOptions struct { startTime time.Time handlers *Handlers recErr error } // Handlers holds the handlers to manage the kubernetes resources // that the Reconciler depends on type Handlers struct { member *resources.EtcdMemberHandler secret *resources.SecretHandler } // SetupWithManager builds the controller for the manager func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error { r.Config = cfg r.Conditions = Conditions r.Metrics = metrics.New(r.Mgr, "install") localMember := &v1etcd.EtcdMemberList{} for _, member := range initialMembers.Items { if member.Name == cfg.NodeName { localMember.Items = append(localMember.Items, member) } } r.Metrics.Custom.Run(localMember) return ctrl.NewControllerManagedBy(r.Mgr). For(&corev1.Secret{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})). WithEventFilter(r.createEventFilter()). Complete(r) } // createEventFilter filters kubernetes events so that we only reconcile on Secret // create events func (r *Reconciler) createEventFilter() predicate.Predicate { return predicate.Funcs{ CreateFunc: func(e event.CreateEvent) bool { secret := e.Object.(*corev1.Secret) handler := resources.NewSecretHandlerBuilder().Build() handler.DeepCopyFrom(secret) // only reconcile on create events for secrets that are owned by the EtcdMember // that represents the current node return handler.Name == os.Getenv("NODE_NAME") && handler.OwnedByEtcdMember() }, UpdateFunc: func(_ event.UpdateEvent) bool { return false }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // Reconcile is the top-level reconcilliation function for create events for secrets that // are owned by the EtcdMember that represents the current node. // // The required certificates for etcd, as well as the files required for LAN outage mode, // are extracted from the secret and distributed onto the filesystem. The node is then // added to the etcd cluster, first as a learner member, before writing the node's etcd // manifest and promoting to to a full member. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { log := fog.FromContext(ctx).WithName(r.Name) ctx = fog.IntoContext(ctx, log) startTime := time.Now() log.V(0).Info("started etcd configuration reconciliation loop") defer log.V(0).Info("ended etcd configuration reconciliation loop") handlers, err := r.generateHandlers(ctx, req) if err != nil { return ctrl.Result{}, err } pass, err := r.checkPreconditions(ctx, handlers) if err != nil { return ctrl.Result{}, err } if !pass { return ctrl.Result{}, nil } // if the Provisioned condition has not been updated yet, requeue. // This is to ensure the ProvisionReconciler has finished updating // the EtcdMember conditions, preventing race conditions if !handlers.member.IsProvisioned() { return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } if err := r.setReconciling(ctx, handlers); err != nil { return ctrl.Result{}, err } patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client()) defer func() { summarizeOpts := summarizeOptions{ patcher, handlers, recErr, } _, recErr = r.summarize(ctx, summarizeOpts) resultOpts := resultOptions{ startTime, handlers, recErr, } r.recordResults(ctx, resultOpts) }() if err := r.reconcile(ctx, handlers); err != nil { return ctrl.Result{}, err } log.V(0).Info("etcd installed successfully", "eoaudit", "") // set the Installed condition to true once the node has been successfully installed // and added to the etcd cluster as a full member conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledSuccessReason, "%s", v1etcd.InstalledSuccessMessage) return ctrl.Result{}, nil } // setup sets up the Secret and EtcdMember handlers for the reconciler func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) { handlers := &Handlers{} if err := r.setSecretHandler(ctx, req, handlers); err != nil { return nil, err } if err := r.setMemberHandler(ctx, req, handlers); err != nil { return nil, err } return handlers, nil } // setSecretHandler sets the Secret handler for the reconciler and retrieves // the latest Secret object from the kubernetes API server func (r *Reconciler) setSecretHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error { handlers.secret = resources.NewSecretHandlerBuilder(). WithClient(r.KubeRetryClient). WithKey(req.NamespacedName). HandlesSecret(). Named(req.Name). InNamespace(operatorNamespace). Build() // reconcile the local copy of the Secret with updated data from the remote copy err := handlers.secret.ReconcileLocal(ctx) if client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to retrieve node: %w", err) } // if the Secret exists, set Found to true. This is used as a behavioural flag if err == nil { handlers.secret.Found = true } return nil } // setMemberHandler sets the member handler for the reconciler and retrieves // the latest EtcdMember object from the kubernetes API server func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error { handlers.member = resources.NewEtcdMemberHandlerBuilder(). WithClient(r.KubeRetryClient). WithKey(req.NamespacedName). HandlesEtcdMember(). Named(req.Name). Build() // reconcile the local copy of the EtcdMember with updated data from the remote copy err := handlers.member.ReconcileLocal(ctx) if client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to retrieve EtcdMember: %w", err) } // if the EtcdMember exists, set Found to true. This is used as a behavioural flag if err == nil { handlers.member.Found = true } return nil } // checkPreconditions checks the preconditions for the reconciler to ensure the EtcdMember // and Secret still exists and that the EtcdMember is not suspended. func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) (bool, error) { log := fog.FromContext(ctx) exists, err := afero.Exists(r.Fs, lanOutageFlagFile) // TODO: use a method from LAN outage package if err != nil { return false, err } if exists { log.V(0).Info("node is in LAN outage mode") return false, nil } // if either the Secret or EtcdMember do not exist, we do not want // to reconcile if !handlers.secret.Found { log.V(0).Info("Secret not found") return false, nil } if !handlers.member.Found { log.V(0).Info("EtcdMember not found") return false, nil } if handlers.member.IsSuspended() { log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true") return false, nil } if handlers.member.IsInstalled() { log.V(0).Info("etcd already installed") return false, nil } return true, nil } // setReconciling sets the 'Reconciling' condition on the EtcdMember func (r *Reconciler) setReconciling(ctx context.Context, handlers *Handlers) error { log := fog.FromContext(ctx) // set the Reconciling condition to "True". This will be unset after // successful reconciliation return handlers.member.WithReconcileRemote(ctx, func(e *v1etcd.EtcdMember) { if _, ok := e.GetCondition(v1etcd.Reconciling); ok { return } log.V(1).Info("setting 'Reconciling' condition") conditions.MarkTrue(e, v1etcd.Reconciling, v1etcd.InstalledReconcilingReason, "%s", v1etcd.InstalledReconcilingMessage) }) } // reconcile creates the secret containing the required certificates for etcd and then patches // the EtcdMember conditions with the result of the reconciliation. If a secret already exists, // it will first be deleted. func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error { if err := r.reconcileFiles(handlers); err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledSecretFailedMessage) return fmt.Errorf("%s: %w", v1etcd.InstalledSecretFailedMessage, err) } if err := r.WithDefaultEtcdRetryClient(ctx); err != nil { return fmt.Errorf("failed to setup etcd retry client: %w", err) } defer r.EtcdRetryClient.Close() if err := handlers.member.ReconcileMembershipStatus(ctx, r.EtcdRetryClient); err != nil { return fmt.Errorf("failed to update membership status: %w", err) } memberID, err := r.addMemberAsLearner(ctx, handlers) if err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledLearnerFailedMessage) return fmt.Errorf("%s: %w", v1etcd.InstalledLearnerFailedMessage, err) } if err := r.withFirewall(ctx, handlers, r.configureEtcd); err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledManifestFailedMessage) return fmt.Errorf("%s: %w", v1etcd.InstalledManifestFailedMessage, err) } if err := r.promoteLearner(ctx, memberID); err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledPromoteFailedMessage) return fmt.Errorf("%s: %w", v1etcd.InstalledPromoteFailedMessage, err) } // delete the secret once it has been used to ensure another reconciliation // does not take place if err := client.IgnoreNotFound(handlers.secret.DeleteRemote(ctx)); err != nil { conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledDeleteFailedMessage) return fmt.Errorf("%s: %w", v1etcd.InstalledDeleteFailedMessage, err) } return nil } // summarize summarizes the result of the reconcile and patches the EtcdMember object func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) { s := edgereconcile.NewSummarizer(opts.patcher) return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember, edgereconcile.WithConditions(r.Conditions), edgereconcile.WithResult(edgereconcile.ResultEmpty), edgereconcile.WithError(opts.recErr), edgereconcile.WithIgnoreNotFound(), edgereconcile.WithProcessors( edgereconcile.RecordReconcileReq, edgereconcile.RecordResult, UnsetReconciling, UnsetInProgress, ), edgereconcile.WithFieldOwner(r.Name), ) } // UnsetInProgress is a ResultProcessor that unsets the InProgress condition // on an EtcdMember func UnsetInProgress(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) { log := fog.FromContext(ctx) etcdMember, ok := obj.(*v1etcd.EtcdMember) if !ok { return } if err == nil { log.V(1).Info("removing 'InProgress' condition") conditions.Delete(etcdMember, v1etcd.InProgress) } } // UnsetReconciling is a ResultProcessor that unsets the Reconciling condition // on an EtcdMember func UnsetReconciling(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) { log := fog.FromContext(ctx) etcdMember, ok := obj.(*v1etcd.EtcdMember) if !ok { return } if err == nil { log.V(1).Info("removing 'Reconciling' condition") conditions.Delete(etcdMember, v1etcd.Reconciling) } } // recordResults takes the results of the reconcile and records the metrics for them func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) { r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime) r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember) r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember) }