...

Source file src/edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/inform/inform.go

Documentation: edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/inform

     1  package inform
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	corev1 "k8s.io/api/core/v1"
     8  	ctrl "sigs.k8s.io/controller-runtime"
     9  	"sigs.k8s.io/controller-runtime/pkg/builder"
    10  	"sigs.k8s.io/controller-runtime/pkg/client"
    11  	"sigs.k8s.io/controller-runtime/pkg/controller"
    12  	"sigs.k8s.io/controller-runtime/pkg/event"
    13  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    14  
    15  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    16  	edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    17  	"edge-infra.dev/pkg/k8s/runtime/patch"
    18  	"edge-infra.dev/pkg/lib/fog"
    19  	v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
    20  	"edge-infra.dev/pkg/sds/etcd/operator/internal/config"
    21  	"edge-infra.dev/pkg/sds/etcd/operator/internal/metrics"
    22  	"edge-infra.dev/pkg/sds/etcd/operator/internal/resources"
    23  )
    24  
    25  var operatorNamespace = "etcd-operator"
    26  
    27  // Conditions defines the relation between conditions and the
    28  // Reconciler
    29  var Conditions = edgereconcile.Conditions{
    30  	Target: v1etcd.Health,
    31  	Owned: []string{
    32  		v1etcd.Health,
    33  	},
    34  	Summarize: []string{
    35  		v1etcd.Health,
    36  	},
    37  	NegativePolarity: []string{},
    38  }
    39  
    40  type Reconciler struct {
    41  	config.Config
    42  	edgereconcile.Conditions
    43  	*metrics.Metrics
    44  }
    45  
    46  type summarizeOptions struct {
    47  	patcher  *patch.SerialPatcher
    48  	handlers *Handlers
    49  	recErr   error
    50  }
    51  
    52  type resultOptions struct {
    53  	handlers *Handlers
    54  }
    55  
    56  // handlers holds the handlers to manage the kubernetes resources
    57  // that the Reconciler depends on
    58  type Handlers struct {
    59  	member *resources.EtcdMemberHandler
    60  }
    61  
    62  // SetupWithManager builds the controller for the manager
    63  func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error {
    64  	r.Config = cfg
    65  	r.Conditions = Conditions
    66  	r.Metrics = metrics.New(r.Mgr, "inform")
    67  	r.Metrics.Custom.Run(initialMembers)
    68  	opts := controller.Options{
    69  		MaxConcurrentReconciles: 2,
    70  	}
    71  
    72  	return ctrl.NewControllerManagedBy(r.Mgr).
    73  		For(&v1etcd.EtcdMember{}, builder.WithPredicates(predicate.ResourceVersionChangedPredicate{})).
    74  		WithEventFilter(r.createEventFilter()).
    75  		WithOptions(opts).
    76  		Complete(r)
    77  }
    78  
    79  // createEventFilter filters kubernetes events so that we only reconcile on EtcdMember
    80  // create events
    81  func (r *Reconciler) createEventFilter() predicate.Predicate {
    82  	return predicate.Funcs{
    83  		CreateFunc: func(_ event.CreateEvent) bool {
    84  			return true
    85  		},
    86  		UpdateFunc: func(_ event.UpdateEvent) bool {
    87  			return false
    88  		},
    89  		DeleteFunc: func(_ event.DeleteEvent) bool {
    90  			return false
    91  		},
    92  	}
    93  }
    94  
    95  // Reconcile is the top-level reconciliation function for EtcdMember object create events.
    96  // The setup is carried out and then the EtcdMember health state is reconciled with the
    97  // actual health of the etcd member.
    98  //
    99  // The EtcdMemberHealthy condition will be set to false when the EtcdMember is unhealthy.
   100  // If the EtcdMember is still unhealthy after the max unhealthy duration specified in the
   101  // EtcdMember spec has passed, then the ReconfigurationRequired condition will be set to
   102  // true.
   103  //
   104  // The reconciler will requeue every 60s to keep the EtcdMember resource in sync with the
   105  // etcd member. If the EtcdMember no longer exists, then the reconciliation will terminate
   106  func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   107  	log := fog.FromContext(ctx).WithName(r.Name)
   108  	ctx = fog.IntoContext(ctx, log)
   109  	log.V(1).Info("reconciling EtcdMember health status")
   110  
   111  	if err := r.WithDefaultEtcdRetryClient(ctx); err != nil {
   112  		return ctrl.Result{}, fmt.Errorf("failed to setup etcd retry client: %w", err)
   113  	}
   114  	defer r.EtcdRetryClient.Close()
   115  
   116  	handlers, err := r.generateHandlers(ctx, req)
   117  	if err != nil {
   118  		return ctrl.Result{}, err
   119  	}
   120  
   121  	if pass := r.checkPreconditions(ctx, handlers); !pass {
   122  		log.V(0).Info("preconditions not met")
   123  		return ctrl.Result{}, nil
   124  	}
   125  
   126  	patcher := patch.NewSerialPatcher(handlers.member.EtcdMember, handlers.member.Client.Client())
   127  	defer func() {
   128  		summarizeOpts := summarizeOptions{
   129  			patcher,
   130  			handlers,
   131  			recErr,
   132  		}
   133  		_, recErr = r.summarize(ctx, summarizeOpts)
   134  
   135  		resultOpts := resultOptions{
   136  			handlers,
   137  		}
   138  		r.recordResults(resultOpts)
   139  	}()
   140  
   141  	if err := r.reconcile(ctx, handlers); err != nil {
   142  		log.Error(err, "failure in EtcdMember informer reconciliation")
   143  	}
   144  
   145  	// requeue often to keep the EtcdMember resource in sync with the etcd member.
   146  	// Will default to 60s if not set in EtcdMember Spec
   147  	return ctrl.Result{RequeueAfter: handlers.member.RequeueAfter()}, nil
   148  }
   149  
   150  // setMemberHandler sets the member handler for the reconciler and retrieves
   151  // the latest EtcdMember object from the kubernetes API server
   152  func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) {
   153  	handlers := &Handlers{}
   154  	handlers.member = resources.NewEtcdMemberHandlerBuilder().
   155  		WithClient(r.KubeRetryClient).
   156  		WithKey(req.NamespacedName).
   157  		HandlesEtcdMember().
   158  		Named(req.Name).
   159  		Build()
   160  	// reconcile the local copy of the EtcdMember with updated data from the remote copy
   161  	err := handlers.member.ReconcileLocal(ctx)
   162  	if client.IgnoreNotFound(err) != nil {
   163  		return nil, fmt.Errorf("failed to retrieve EtcdMember: %w", err)
   164  	}
   165  	// if the EtcdMember exists, set Found to true. This is used as a behavioural flag
   166  	if err == nil {
   167  		handlers.member.Found = true
   168  	}
   169  	return handlers, nil
   170  }
   171  
   172  // checkPreconditions checks the preconditions for the reconciler to ensure the EtcdMember
   173  // still exists and is not suspended
   174  func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) bool {
   175  	log := fog.FromContext(ctx)
   176  	// If the EtcdMember no longer exists, then we do not need to reconcile
   177  	if !handlers.member.Found {
   178  		return false
   179  	}
   180  
   181  	if handlers.member.IsSuspended() {
   182  		log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true")
   183  		return false
   184  	}
   185  	return true
   186  }
   187  
   188  // reconcile reconciles the EtcdMember health state with the actual health of the etcd member
   189  func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error {
   190  	if !r.isEtcdHealthy(ctx, handlers) && handlers.member.ReconfigurationRequired() {
   191  		return r.KubeRetryClient.SafeDelete(ctx, handlers.member.EtcdMember)
   192  	}
   193  
   194  	secretLost, err := r.isSecretLost(ctx, handlers)
   195  	if err != nil {
   196  		return err
   197  	}
   198  	if secretLost {
   199  		return r.KubeRetryClient.SafeDelete(ctx, handlers.member.EtcdMember)
   200  	}
   201  
   202  	return nil
   203  }
   204  
   205  func (r *Reconciler) isEtcdHealthy(ctx context.Context, handlers *Handlers) bool {
   206  	resp, err := r.EtcdRetryClient.SafeStatus(ctx, handlers.member.EtcdMember.Spec.ClientURL())
   207  	if err != nil {
   208  		// if the etcd member is not reachable, then mark it as unhealthy.
   209  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Health, v1etcd.HealthFailedReason, "%s", v1etcd.HealthNoResponseMessage)
   210  		return false
   211  	}
   212  	if len(resp.Errors) != 0 {
   213  		// if the etcd member reports back with an error, then mark it as unhealthy
   214  		conditions.MarkFalse(handlers.member.EtcdMember, v1etcd.Health, v1etcd.HealthFailedReason, "%s", v1etcd.HealthFailedResponseMessage)
   215  		return false
   216  	}
   217  
   218  	conditions.MarkTrue(handlers.member.EtcdMember, v1etcd.Health, v1etcd.HealthSuccessReason, "%s", v1etcd.HealthSuccessMessage)
   219  	return true
   220  }
   221  
   222  func (r *Reconciler) isSecretLost(ctx context.Context, handlers *Handlers) (bool, error) {
   223  	if !conditions.IsTrue(handlers.member, v1etcd.Provisioned) {
   224  		return false, nil
   225  	}
   226  	if conditions.IsTrue(handlers.member, v1etcd.Installed) {
   227  		return false, nil
   228  	}
   229  
   230  	key := handlers.member.Key
   231  	key.Namespace = operatorNamespace
   232  	err := r.KubeRetryClient.SafeGet(ctx, key, &corev1.Secret{})
   233  	if client.IgnoreNotFound(err) != nil {
   234  		return false, err
   235  	}
   236  	if err != nil {
   237  		return true, nil
   238  	}
   239  	return false, nil
   240  }
   241  
   242  // summarize summarizes the result of the reconcile and patches the EtcdMember object
   243  func (r *Reconciler) summarize(ctx context.Context, opts summarizeOptions) (ctrl.Result, error) {
   244  	s := edgereconcile.NewSummarizer(opts.patcher)
   245  	return s.SummarizeAndPatch(ctx, opts.handlers.member.EtcdMember,
   246  		edgereconcile.WithConditions(r.Conditions),
   247  		edgereconcile.WithResult(edgereconcile.ResultRequeue),
   248  		edgereconcile.WithError(opts.recErr),
   249  		edgereconcile.WithIgnoreNotFound(),
   250  		edgereconcile.WithProcessors(
   251  			edgereconcile.RecordReconcileReq,
   252  			edgereconcile.RecordResult,
   253  		),
   254  		edgereconcile.WithFieldOwner(r.Name),
   255  	)
   256  }
   257  
   258  // recordResults takes the results of the reconcile and records the metrics for them
   259  func (r *Reconciler) recordResults(opts resultOptions) {
   260  	conditions := []string{
   261  		v1etcd.Health,
   262  		v1etcd.Provisioned,
   263  		v1etcd.Installed,
   264  		v1etcd.Ready,
   265  	}
   266  	for _, c := range conditions {
   267  		condition, _ := opts.handlers.member.EtcdMember.GetCondition(c)
   268  		r.Metrics.RecordCondition(opts.handlers.member.EtcdMember, c, condition.Status, !opts.handlers.member.EtcdMember.GetDeletionTimestamp().IsZero())
   269  	}
   270  }
   271  

View as plain text