...

Source file src/edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install/install.go

Documentation: edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install

     1  package install
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"time"
     8  
     9  	"github.com/spf13/afero"
    10  	corev1 "k8s.io/api/core/v1"
    11  	"k8s.io/client-go/tools/record"
    12  	ctrl "sigs.k8s.io/controller-runtime"
    13  	"sigs.k8s.io/controller-runtime/pkg/builder"
    14  	"sigs.k8s.io/controller-runtime/pkg/client"
    15  	"sigs.k8s.io/controller-runtime/pkg/event"
    16  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    17  
    18  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    19  	edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    20  	"edge-infra.dev/pkg/k8s/runtime/patch"
    21  	"edge-infra.dev/pkg/lib/fog"
    22  	v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
    23  	"edge-infra.dev/pkg/sds/etcd/operator/internal/config"
    24  	"edge-infra.dev/pkg/sds/etcd/operator/internal/metrics"
    25  	"edge-infra.dev/pkg/sds/etcd/operator/internal/resources"
    26  )
    27  
    28  var (
    29  	OperatorFilewall  = "-p tcp --dport 6443 -j REJECT"
    30  	operatorNamespace = "etcd-operator"
    31  	lanOutageFlagFile = "/zynstra/config/.lan_outage_mode"
    32  )
    33  
    34  // Conditions defines the relation between conditions and the
    35  // Reconciler
    36  var Conditions = edgereconcile.Conditions{
    37  	Target: v1etcd.Ready,
    38  	Owned: []string{
    39  		v1etcd.Installed,
    40  		v1etcd.InProgress,
    41  		v1etcd.Reconciling,
    42  	},
    43  	Summarize: []string{
    44  		v1etcd.Installed,
    45  		v1etcd.InProgress,
    46  		v1etcd.Provisioned,
    47  	},
    48  	NegativePolarity: []string{
    49  		v1etcd.InProgress,
    50  		v1etcd.Reconciling,
    51  	},
    52  }
    53  
    54  type containerImageVersions struct {
    55  	Containers map[string]string `yaml:"containers"`
    56  }
    57  
    58  type Reconciler struct {
    59  	config.Config
    60  	edgereconcile.Conditions
    61  	*metrics.Metrics
    62  }
    63  
    64  type summarizeOptions struct {
    65  	patcher  *patch.SerialPatcher
    66  	handlers *Handlers
    67  	recErr   error
    68  }
    69  
    70  type resultOptions struct {
    71  	startTime time.Time
    72  	handlers  *Handlers
    73  	recErr    error
    74  }
    75  
    76  // Handlers holds the handlers to manage the kubernetes resources
    77  // that the Reconciler depends on
    78  type Handlers struct {
    79  	member *resources.EtcdMemberHandler
    80  	secret *resources.SecretHandler
    81  }
    82  
    83  // SetupWithManager builds the controller for the manager
    84  func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error {
    85  	r.Config = cfg
    86  	r.Conditions = Conditions
    87  	r.Metrics = metrics.New(r.Mgr, "install")
    88  	localMember := &v1etcd.EtcdMemberList{}
    89  	for _, member := range initialMembers.Items {
    90  		if member.Name == cfg.NodeName {
    91  			localMember.Items = append(localMember.Items, member)
    92  		}
    93  	}
    94  	r.Metrics.Custom.Run(localMember)
    95  
    96  	return ctrl.NewControllerManagedBy(r.Mgr).
    97  		For(&corev1.Secret{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
    98  		WithEventFilter(r.createEventFilter()).
    99  		Complete(r)
   100  }
   101  
   102  // createEventFilter filters kubernetes events so that we only reconcile on Secret
   103  // create events
   104  func (r *Reconciler) createEventFilter() predicate.Predicate {
   105  	return predicate.Funcs{
   106  		CreateFunc: func(e event.CreateEvent) bool {
   107  			secret := e.Object.(*corev1.Secret)
   108  			handler := resources.NewSecretHandlerBuilder().Build()
   109  			handler.DeepCopyFrom(secret)
   110  			// only reconcile on create events for secrets that are owned by the EtcdMember
   111  			// that represents the current node
   112  			return handler.Name == os.Getenv("NODE_NAME") && handler.OwnedByEtcdMember()
   113  		},
   114  		UpdateFunc: func(_ event.UpdateEvent) bool {
   115  			return false
   116  		},
   117  		DeleteFunc: func(_ event.DeleteEvent) bool {
   118  			return false
   119  		},
   120  	}
   121  }
   122  
   123  // Reconcile is the top-level reconcilliation function for create events for secrets that
   124  // are owned by the EtcdMember that represents the current node.
   125  //
   126  // The required certificates for etcd, as well as the files required for LAN outage mode,
   127  // are extracted from the secret and distributed onto the filesystem. The node is then
   128  // added to the etcd cluster, first as a learner member, before writing the node's etcd
   129  // manifest and promoting to to a full member.
   130  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   131  	log := fog.FromContext(ctx).WithName(r.Name)
   132  	ctx = fog.IntoContext(ctx, log)
   133  	startTime := time.Now()
   134  	log.V(0).Info("started etcd configuration reconciliation loop")
   135  	defer log.V(0).Info("ended etcd configuration reconciliation loop")
   136  
   137  	handlers, err := r.generateHandlers(ctx, req)
   138  	if err != nil {
   139  		return ctrl.Result{}, err
   140  	}
   141  
   142  	pass, err := r.checkPreconditions(ctx, handlers)
   143  	if err != nil {
   144  		return ctrl.Result{}, err
   145  	}
   146  	if !pass {
   147  		return ctrl.Result{}, nil
   148  	}
   149  
   150  	// if the Provisioned condition has not been updated yet, requeue.
   151  	// This is to ensure the ProvisionReconciler has finished updating
   152  	// the EtcdMember conditions, preventing race conditions
   153  	if !handlers.member.IsProvisioned() {
   154  		return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
   155  	}
   156  
   157  	if err := r.setReconciling(ctx, handlers); err != nil {
   158  		return ctrl.Result{}, err
   159  	}
   160  
   161  	patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client())
   162  	defer func() {
   163  		summarizeOpts := summarizeOptions{
   164  			patcher,
   165  			handlers,
   166  			recErr,
   167  		}
   168  		_, recErr = r.summarize(ctx, summarizeOpts)
   169  
   170  		resultOpts := resultOptions{
   171  			startTime,
   172  			handlers,
   173  			recErr,
   174  		}
   175  		r.recordResults(ctx, resultOpts)
   176  	}()
   177  
   178  	if err := r.reconcile(ctx, handlers); err != nil {
   179  		return ctrl.Result{}, err
   180  	}
   181  	log.V(0).Info("etcd installed successfully", "eoaudit", "")
   182  	// set the Installed condition to true once the node has been successfully installed
   183  	// and added to the etcd cluster as a full member
   184  	conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledSuccessReason, "%s", v1etcd.InstalledSuccessMessage)
   185  	return ctrl.Result{}, nil
   186  }
   187  
   188  // setup sets up the Secret and EtcdMember handlers for the reconciler
   189  func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) {
   190  	handlers := &Handlers{}
   191  	if err := r.setSecretHandler(ctx, req, handlers); err != nil {
   192  		return nil, err
   193  	}
   194  	if err := r.setMemberHandler(ctx, req, handlers); err != nil {
   195  		return nil, err
   196  	}
   197  	return handlers, nil
   198  }
   199  
   200  // setSecretHandler sets the Secret handler for the reconciler and retrieves
   201  // the latest Secret object from the kubernetes API server
   202  func (r *Reconciler) setSecretHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
   203  	handlers.secret = resources.NewSecretHandlerBuilder().
   204  		WithClient(r.KubeRetryClient).
   205  		WithKey(req.NamespacedName).
   206  		HandlesSecret().
   207  		Named(req.Name).
   208  		InNamespace(operatorNamespace).
   209  		Build()
   210  	// reconcile the local copy of the Secret with updated data from the remote copy
   211  	err := handlers.secret.ReconcileLocal(ctx)
   212  	if client.IgnoreNotFound(err) != nil {
   213  		return fmt.Errorf("failed to retrieve node: %w", err)
   214  	}
   215  	// if the Secret exists, set Found to true. This is used as a behavioural flag
   216  	if err == nil {
   217  		handlers.secret.Found = true
   218  	}
   219  	return nil
   220  }
   221  
   222  // setMemberHandler sets the member handler for the reconciler and retrieves
   223  // the latest EtcdMember object from the kubernetes API server
   224  func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error {
   225  	handlers.member = resources.NewEtcdMemberHandlerBuilder().
   226  		WithClient(r.KubeRetryClient).
   227  		WithKey(req.NamespacedName).
   228  		HandlesEtcdMember().
   229  		Named(req.Name).
   230  		Build()
   231  	// reconcile the local copy of the EtcdMember with updated data from the remote copy
   232  	err := handlers.member.ReconcileLocal(ctx)
   233  	if client.IgnoreNotFound(err) != nil {
   234  		return fmt.Errorf("failed to retrieve EtcdMember: %w", err)
   235  	}
   236  	// if the EtcdMember exists, set Found to true. This is used as a behavioural flag
   237  	if err == nil {
   238  		handlers.member.Found = true
   239  	}
   240  	return nil
   241  }
   242  
   243  // checkPreconditions checks the preconditions for the reconciler to ensure the EtcdMember
   244  // and Secret still exists and that the EtcdMember is not suspended.
   245  func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) (bool, error) {
   246  	log := fog.FromContext(ctx)
   247  	exists, err := afero.Exists(r.Fs, lanOutageFlagFile) // TODO: use a method from LAN outage package
   248  	if err != nil {
   249  		return false, err
   250  	}
   251  	if exists {
   252  		log.V(0).Info("node is in LAN outage mode")
   253  		return false, nil
   254  	}
   255  	// if either the Secret or EtcdMember do not exist, we do not want
   256  	// to reconcile
   257  	if !handlers.secret.Found {
   258  		log.V(0).Info("Secret not found")
   259  		return false, nil
   260  	}
   261  	if !handlers.member.Found {
   262  		log.V(0).Info("EtcdMember not found")
   263  		return false, nil
   264  	}
   265  
   266  	if handlers.member.IsSuspended() {
   267  		log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true")
   268  		return false, nil
   269  	}
   270  
   271  	if handlers.member.IsInstalled() {
   272  		log.V(0).Info("etcd already installed")
   273  		return false, nil
   274  	}
   275  	return true, nil
   276  }
   277  
   278  // setReconciling sets the 'Reconciling' condition on the EtcdMember
   279  func (r *Reconciler) setReconciling(ctx context.Context, handlers *Handlers) error {
   280  	log := fog.FromContext(ctx)
   281  	// set the Reconciling condition to "True". This will be unset after
   282  	// successful reconciliation
   283  	return handlers.member.WithReconcileRemote(ctx, func(e *v1etcd.EtcdMember) {
   284  		if _, ok := e.GetCondition(v1etcd.Reconciling); ok {
   285  			return
   286  		}
   287  		log.V(1).Info("setting 'Reconciling' condition")
   288  		conditions.MarkTrue(e, v1etcd.Reconciling, v1etcd.InstalledReconcilingReason, "%s", v1etcd.InstalledReconcilingMessage)
   289  	})
   290  }
   291  
   292  // reconcile creates the secret containing the required certificates for etcd and then patches
   293  // the EtcdMember conditions with the result of the reconciliation. If a secret already exists,
   294  // it will first be deleted.
   295  func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error {
   296  	if err := r.reconcileFiles(handlers); err != nil {
   297  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledSecretFailedMessage)
   298  		return fmt.Errorf("%s: %w", v1etcd.InstalledSecretFailedMessage, err)
   299  	}
   300  
   301  	if err := r.WithDefaultEtcdRetryClient(ctx); err != nil {
   302  		return fmt.Errorf("failed to setup etcd retry client: %w", err)
   303  	}
   304  	defer r.EtcdRetryClient.Close()
   305  
   306  	if err := handlers.member.ReconcileMembershipStatus(ctx, r.EtcdRetryClient); err != nil {
   307  		return fmt.Errorf("failed to update membership status: %w", err)
   308  	}
   309  
   310  	memberID, err := r.addMemberAsLearner(ctx, handlers)
   311  	if err != nil {
   312  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledLearnerFailedMessage)
   313  		return fmt.Errorf("%s: %w", v1etcd.InstalledLearnerFailedMessage, err)
   314  	}
   315  
   316  	if err := r.withFirewall(ctx, handlers, r.configureEtcd); err != nil {
   317  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledManifestFailedMessage)
   318  		return fmt.Errorf("%s: %w", v1etcd.InstalledManifestFailedMessage, err)
   319  	}
   320  
   321  	if err := r.promoteLearner(ctx, memberID); err != nil {
   322  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledPromoteFailedMessage)
   323  		return fmt.Errorf("%s: %w", v1etcd.InstalledPromoteFailedMessage, err)
   324  	}
   325  	// delete the secret once it has been used to ensure another reconciliation
   326  	// does not take place
   327  	if err := client.IgnoreNotFound(handlers.secret.DeleteRemote(ctx)); err != nil {
   328  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Installed, v1etcd.InstalledFailedReason, "%s", v1etcd.InstalledDeleteFailedMessage)
   329  		return fmt.Errorf("%s: %w", v1etcd.InstalledDeleteFailedMessage, err)
   330  	}
   331  	return nil
   332  }
   333  
   334  // summarize summarizes the result of the reconcile and patches the EtcdMember object
   335  func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) {
   336  	s := edgereconcile.NewSummarizer(opts.patcher)
   337  	return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember,
   338  		edgereconcile.WithConditions(r.Conditions),
   339  		edgereconcile.WithResult(edgereconcile.ResultEmpty),
   340  		edgereconcile.WithError(opts.recErr),
   341  		edgereconcile.WithIgnoreNotFound(),
   342  		edgereconcile.WithProcessors(
   343  			edgereconcile.RecordReconcileReq,
   344  			edgereconcile.RecordResult,
   345  			UnsetReconciling,
   346  			UnsetInProgress,
   347  		),
   348  		edgereconcile.WithFieldOwner(r.Name),
   349  	)
   350  }
   351  
   352  // UnsetInProgress is a ResultProcessor that unsets the InProgress condition
   353  // on an EtcdMember
   354  func UnsetInProgress(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) {
   355  	log := fog.FromContext(ctx)
   356  	etcdMember, ok := obj.(*v1etcd.EtcdMember)
   357  	if !ok {
   358  		return
   359  	}
   360  	if err == nil {
   361  		log.V(1).Info("removing 'InProgress' condition")
   362  		conditions.Delete(etcdMember, v1etcd.InProgress)
   363  	}
   364  }
   365  
   366  // UnsetReconciling is a ResultProcessor that unsets the Reconciling condition
   367  // on an EtcdMember
   368  func UnsetReconciling(ctx context.Context, _ record.EventRecorder, obj conditions.Setter, _ edgereconcile.Result, err error) {
   369  	log := fog.FromContext(ctx)
   370  	etcdMember, ok := obj.(*v1etcd.EtcdMember)
   371  	if !ok {
   372  		return
   373  	}
   374  	if err == nil {
   375  		log.V(1).Info("removing 'Reconciling' condition")
   376  		conditions.Delete(etcdMember, v1etcd.Reconciling)
   377  	}
   378  }
   379  
   380  // recordResults takes the results of the reconcile and records the metrics for them
   381  func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) {
   382  	r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime)
   383  	r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember)
   384  	r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember)
   385  }
   386  

View as plain text