...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/server_controller.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  
    11  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    12  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    13  	"edge-infra.dev/pkg/k8s/meta/status"
    14  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    15  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    16  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    17  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    18  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    19  	"edge-infra.dev/pkg/k8s/runtime/patch"
    20  	"edge-infra.dev/pkg/k8s/runtime/sap"
    21  	unstructuredutil "edge-infra.dev/pkg/k8s/unstructured"
    22  
    23  	corev1 "k8s.io/api/core/v1"
    24  	netv1 "k8s.io/api/networking/v1"
    25  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	"k8s.io/client-go/dynamic"
    28  	kuberecorder "k8s.io/client-go/tools/record"
    29  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    30  	ctrl "sigs.k8s.io/controller-runtime"
    31  	"sigs.k8s.io/controller-runtime/pkg/builder"
    32  	"sigs.k8s.io/controller-runtime/pkg/client"
    33  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    34  )
    35  
    36  type CouchServerReconciler struct {
    37  	client.Client
    38  	NodeResourcePredicate
    39  	kuberecorder.EventRecorder
    40  	ResourceManager *sap.ResourceManager
    41  	Name            string
    42  	Config          *Config
    43  	Metrics         metrics.Metrics
    44  	patchOptions    []patch.Option
    45  }
    46  
    47  type Payload struct {
    48  	Action string `json:"action"`
    49  }
    50  
    51  type ServerSetupResponse struct {
    52  	State  string `json:"state,omitempty"`
    53  	Error  string `json:"error,omitempty"`
    54  	Reason string `json:"reason,omitempty"`
    55  }
    56  
    57  var (
    58  	ErrPodsNotReady = errors.New("pods arent ready")
    59  
    60  	serverConditions = reconcile.Conditions{
    61  		Target: status.ReadyCondition,
    62  		Owned: []string{
    63  			dsapi.ServerSetupSucceededReason,
    64  			status.ReadyCondition,
    65  			status.ReconcilingCondition,
    66  			status.StalledCondition,
    67  		},
    68  		Summarize: []string{
    69  			dsapi.ServerSetupSucceededReason,
    70  			status.StalledCondition,
    71  		},
    72  		NegativePolarity: []string{
    73  			status.ReconcilingCondition,
    74  			status.StalledCondition,
    75  		},
    76  	}
    77  )
    78  
    79  // SetupWithManager sets up CouchServerReconciler with the manager
    80  func (r *CouchServerReconciler) SetupWithManager(mgr ctrl.Manager) error {
    81  	r.patchOptions = getPatchOptions(serverConditions.Owned, r.Name)
    82  	d, err := dynamic.NewForConfig(mgr.GetConfig())
    83  	if err != nil {
    84  		return fmt.Errorf("fail to create dynamic client: %w", err)
    85  	}
    86  	r.ResourceManager = sap.NewResourceManager(
    87  		r.Client,
    88  		watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()),
    89  		sap.Owner{Field: r.Name},
    90  	)
    91  
    92  	b := ctrl.NewControllerManagedBy(mgr).
    93  		For(&dsapi.CouchDBServer{}, r.serverPredicates())
    94  	return b.Owns(&corev1.Secret{}).Complete(r)
    95  }
    96  
    97  func (r *CouchServerReconciler) serverPredicates() builder.Predicates {
    98  	return builder.WithPredicates(
    99  		predicate.GenerationChangedPredicate{},
   100  		predicate.NewPredicateFuncs(func(obj client.Object) bool {
   101  			if r.Config.IsDSDS() {
   102  				return r.ShouldReconcile(r.Config, obj)
   103  			}
   104  			return true
   105  		}))
   106  }
   107  
   108  func (r *CouchServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
   109  	reconcileStart := time.Now()
   110  
   111  	log := ctrl.LoggerFrom(ctx)
   112  
   113  	server := &dsapi.CouchDBServer{}
   114  	if err := r.Get(ctx, req.NamespacedName, server); err != nil {
   115  		return ctrl.Result{}, client.IgnoreNotFound(err)
   116  	}
   117  	server.WithRetry(r.Config.RequeueTime)
   118  	server.WithInterval(r.Config.PollingInterval)
   119  
   120  	log = log.WithValues("type", server.Type())
   121  	ctx = logr.NewContext(ctx, log)
   122  
   123  	patcher := patch.NewSerialPatcher(server, r.Client)
   124  
   125  	if err := reconcile.Progressing(ctx, server, patcher, r.patchOptions...); err != nil {
   126  		log.Error(err, "unable to update status")
   127  		return ctrl.Result{}, err
   128  	}
   129  
   130  	recResult := reconcile.ResultEmpty
   131  	var recErr recerr.Error
   132  
   133  	defer func() {
   134  		summarizer := reconcile.NewSummarizer(patcher)
   135  		res, err = summarizer.SummarizeAndPatch(ctx, server, []reconcile.SummarizeOption{
   136  			reconcile.WithConditions(serverConditions),
   137  			reconcile.WithResult(recResult),
   138  			reconcile.WithError(recErr),
   139  			reconcile.WithIgnoreNotFound(),
   140  			reconcile.WithProcessors(
   141  				reconcile.RecordResult,
   142  			),
   143  			reconcile.WithFieldOwner(r.Name),
   144  			reconcile.WithEventRecorder(r.EventRecorder),
   145  		}...)
   146  		r.Metrics.RecordDuration(ctx, server, reconcileStart)
   147  		r.Metrics.RecordReadiness(ctx, server)
   148  	}()
   149  
   150  	if recErr = r.reconcile(ctx, server); recErr != nil {
   151  		recErr.ToCondition(server, dsapi.ServerSetupSucceededReason)
   152  		err = recErr
   153  		return
   154  	}
   155  	recResult = reconcile.ResultSuccess
   156  	conditions.MarkTrue(server, dsapi.ServerSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDB server")
   157  	log.Info("Successfully set up CouchDB Server")
   158  
   159  	return
   160  }
   161  
   162  // reconcile handles couchdb server set up
   163  func (r *CouchServerReconciler) reconcile(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error {
   164  	log := logr.FromContextOrDiscard(ctx)
   165  
   166  	changeSet := sap.NewChangeSet()
   167  
   168  	if server.IsCloud() { // nolint
   169  		err := r.waitForNetworkingSetup(ctx, server)
   170  		if err != nil {
   171  			log.Error(err, "waiting for network setup")
   172  			return err
   173  		}
   174  
   175  		cs, err := r.applyServerManifests(ctx, server)
   176  		if err != nil {
   177  			log.Error(err, "fail to apply generated manifests from CouchDBServer")
   178  			return err
   179  		}
   180  		changeSet.Add(*cs)
   181  
   182  		// check that the admin secret exists or create a new admin secret
   183  		cs, creds, err := r.reconcileAdminCreds(ctx, server)
   184  		if err != nil {
   185  			log.Error(err, "fail to reconcile admin creds")
   186  			return err
   187  		}
   188  		changeSet.Add(*cs)
   189  
   190  		err = r.couchDBPodsReady(ctx, server)
   191  		if err != nil {
   192  			// the cloud servers should always be up
   193  			log.Error(err, "couchdb cloud server pods fail to be ready")
   194  			return err
   195  		}
   196  
   197  		// send the finish_cluster command to complete the server creation for non-single node clusters
   198  		if err := r.sendFinishClusterCommand(server, creds); err != nil {
   199  			return recerr.New(fmt.Errorf("failed to setup cluster: %w", err), dsapi.FinishClusterFailedReason)
   200  		}
   201  	} else {
   202  		cs, err := r.applyServerManifests(ctx, server)
   203  		if err != nil {
   204  			log.Error(err, "fail to apply generated manifests from CouchDBServer")
   205  			return err
   206  		}
   207  		changeSet.Add(*cs)
   208  
   209  		// TODO create a new secret per server
   210  		cs, _, err = r.reconcileAdminCreds(ctx, server)
   211  		if err != nil {
   212  			log.Error(err, "fail to reconcile store admin creds")
   213  			return err
   214  		}
   215  		changeSet.Add(*cs)
   216  
   217  		err = r.couchDBPodsReady(ctx, server)
   218  		if err != nil {
   219  			if !couchDBNotReadyOrNotFound(err) {
   220  				log.Error(err, "couchdb pods for store server fail to be ready")
   221  			}
   222  			return err
   223  		}
   224  	}
   225  	i := inventory.New(inventory.FromSapChangeSet(changeSet))
   226  	if err := r.prune(ctx, server, i); err != nil {
   227  		return recerr.New(err, dsapi.PruneFailed)
   228  	}
   229  	server.Status.Inventory = i
   230  	return nil
   231  }
   232  
   233  // prune delete resources in inventory that are no longer created
   234  func (r *CouchServerReconciler) prune(ctx context.Context, server *dsapi.CouchDBServer, i *inventory.ResourceInventory) error {
   235  	log := logr.FromContextOrDiscard(ctx)
   236  	if server.Status.Inventory != nil {
   237  		diff, err := inventory.Diff(server.Status.Inventory, i)
   238  		if err != nil {
   239  			return nil
   240  		}
   241  		if len(diff) > 0 {
   242  			opt := sap.DefaultDeleteOptions()
   243  			opt.Exclusions = map[string]string{
   244  				couchdb.IgnoreDeletionLabel: couchdb.LabelValueTrue,
   245  				// Resources with SubstitutionLabel are managed by CouchDBPersistence
   246  				couchdb.SubstitutionLabel: couchdb.LabelValueTrue,
   247  			}
   248  			changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, opt)
   249  			if err != nil {
   250  				return err
   251  			}
   252  			log.Info("pruned objects", "changeset", changeSet.ToMap())
   253  		}
   254  	}
   255  	return nil
   256  }
   257  
   258  // applyServerManifests dynamically generate couchdb server ini configuration as configmap
   259  func (r *CouchServerReconciler) applyServerManifests(ctx context.Context, server *dsapi.CouchDBServer) (*sap.ChangeSetEntry, recerr.Error) {
   260  	log := logr.FromContextOrDiscard(ctx)
   261  
   262  	serverCopy := server.DeepCopy()
   263  	// cloud couch server UUIDs are unique per banner
   264  	if !server.IsCloud() { //nolint complexity
   265  		uid, err := server.GetNodeUID()
   266  		if err != nil {
   267  			log.Error(err, "could not retrieve/create couchdb uuid")
   268  			return nil, recerr.New(err, status.DependencyInvalidReason)
   269  		}
   270  		uid = dsapi.CleanUID(uid)
   271  		serverCopy.Spec.Base.UUID = &uid
   272  	}
   273  
   274  	configmap, err := ConfigMap(*serverCopy)
   275  	if err != nil {
   276  		log.Error(err, "unable to build couchdb manifests")
   277  		return nil, recerr.NewStalled(fmt.Errorf("unable to build couchdb manifests"), dsapi.ServerSetupFailedReason)
   278  	}
   279  	couchdbManifests, err := unstructuredutil.ToUnstructured(configmap)
   280  	if err != nil {
   281  		log.Error(err, "unable to build couchdb manifests")
   282  		return nil, recerr.NewStalled(fmt.Errorf("unable to build couchdb manifests"), dsapi.ServerSetupFailedReason)
   283  	}
   284  
   285  	opt := sap.DefaultApplyOptions()
   286  	opt.Force = true
   287  	changeSet, err := r.ResourceManager.Apply(ctx, couchdbManifests, opt)
   288  	if err != nil {
   289  		log.Error(err, "resource manager unable to apply couchdb manifests")
   290  		return nil, recerr.New(err, dsapi.ServerSetupFailedReason)
   291  	}
   292  
   293  	return changeSet, nil
   294  }
   295  
   296  // reconcileAdminCreds creates the sever admin credentials for couchdb servers
   297  func (r *CouchServerReconciler) reconcileAdminCreds(ctx context.Context, server *dsapi.CouchDBServer) (*sap.ChangeSetEntry, *couchdb.AdminCredentials, recerr.Error) {
   298  	// do nothing if creds not given in spec
   299  	if server.Spec.Admin.Credentials == (corev1.SecretReference{}) {
   300  		return nil, nil, recerr.NewStalled(fmt.Errorf("Admin.Credentials missing, cannot start server"), dsapi.CookieCreationFailedReason)
   301  	}
   302  
   303  	// get or generate credentials from/to specified secret if missing
   304  	c := server.Spec.Admin.Credentials
   305  	nsn := types.NamespacedName{
   306  		Name:      c.Name,
   307  		Namespace: server.Namespace,
   308  	}
   309  	var cs *sap.ChangeSetEntry
   310  	creds := &couchdb.AdminCredentials{}
   311  	secret, err := creds.FromSecret(ctx, r.Client, nsn)
   312  	if err != nil { //nolint: nestif
   313  		secret, err := creds.ToSecret(ctx, r.Client, nsn)
   314  		if err != nil {
   315  			return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
   316  		}
   317  		un, err := unstructuredutil.ToUnstructured(secret)
   318  		if err != nil {
   319  			return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
   320  		}
   321  		cs, err = r.ResourceManager.Apply(ctx, un, sap.ApplyOptions{})
   322  		if err != nil {
   323  			return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
   324  		}
   325  	} else {
   326  		// server admin secret can't be deleted, so update it
   327  		if secret.Labels == nil {
   328  			secret.Labels = map[string]string{}
   329  		}
   330  		if secret.Labels[couchdb.IgnoreDeletionLabel] != couchdb.LabelValueTrue {
   331  			p := client.MergeFrom(secret.DeepCopy())
   332  			secret.Labels[couchdb.IgnoreDeletionLabel] = couchdb.LabelValueTrue
   333  			err = r.Client.Patch(ctx, secret, p)
   334  			if err != nil {
   335  				return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
   336  			}
   337  		}
   338  		cs, err = ExistingChangeSetEntry(secret)
   339  		if err != nil {
   340  			return nil, nil, recerr.New(err, dsapi.AdminCreationFailedReason)
   341  		}
   342  	}
   343  	return cs, creds, nil
   344  }
   345  
   346  // couchDBPodsReady checks that couchdb StatefulSet pods are ready
   347  func (r *CouchServerReconciler) couchDBPodsReady(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error {
   348  	// wait for the pods to come up
   349  	// TODO: poll for these pods via the statefulset
   350  	// https://github.com/kubernetes-sigs/cli-utils/tree/master/pkg/kstatus/polling
   351  	// https://pkg.go.dev/sigs.k8s.io/cli-utils/pkg/kstatus
   352  	// https://pkg.go.dev/sigs.k8s.io/cli-utils@v0.29.2/pkg/kstatus/polling
   353  
   354  	sts, err := server.GetStatefulSetName()
   355  	if err != nil {
   356  		return recerr.New(err, status.DependencyInvalidReason)
   357  	}
   358  
   359  	podNum := server.Spec.Cluster.Nodes
   360  	if podNum == 0 {
   361  		podNum = 1
   362  	}
   363  
   364  	for i := 0; i < podNum; i++ {
   365  		podName := fmt.Sprintf("%s-%d", sts, i)
   366  		ready, err := CheckPod(ctx, r.Client, server.Namespace, podName)
   367  		switch {
   368  		case err != nil && kerrors.IsNotFound(err):
   369  			return recerr.NewWait(ErrPodsNotReady, status.DependencyNotFoundReason, r.Config.ServerNotReady)
   370  		case err != nil:
   371  			return recerr.New(err, status.DependencyInvalidReason)
   372  		}
   373  		if !ready {
   374  			return recerr.NewWait(ErrPodsNotReady, status.DependencyNotReadyReason, r.Config.ServerNotReady)
   375  		}
   376  	}
   377  	cc, err := couchDBServerClient(ctx, r.Client, r.Config, server)
   378  	if err != nil {
   379  		return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
   380  	}
   381  	defer cc.Close(ctx)
   382  	pong, err := cc.Client.Ping(ctx)
   383  	if err != nil {
   384  		return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
   385  	}
   386  	if !pong {
   387  		return recerr.NewWait(ErrPodsNotReady, status.DependencyNotReadyReason, r.Config.ServerNotReady)
   388  	}
   389  	return nil
   390  }
   391  
   392  // sendFinishClusterCommand for cloud couchdb in cluster mode send `finish_cluster` command
   393  func (r *CouchServerReconciler) sendFinishClusterCommand(server *dsapi.CouchDBServer, c *couchdb.AdminCredentials) error {
   394  	if !server.Spec.Cluster.AutoFinish {
   395  		return nil
   396  	}
   397  	// https://docs.couchdb.org/en/3.2.0/api/server/common.html#cluster-setup
   398  
   399  	url := couchdb.FormatFinishClusterURI(string(c.Username), string(c.Password), server.Spec.URI, fmt.Sprint(r.Config.CouchDBPort))
   400  
   401  	// check if the server was already finished
   402  	finished, err := checkFinishStatus(url)
   403  	if err != nil {
   404  		return err
   405  	}
   406  	// if it is already finished return
   407  	if finished {
   408  		return nil
   409  	}
   410  
   411  	// attempt to finish the server
   412  	resp := &ServerSetupResponse{}
   413  	err = httpRequest("POST", url, Payload{Action: "finish_cluster"}, &resp)
   414  	if err != nil {
   415  		return err
   416  	}
   417  
   418  	// check if it was finished
   419  	finished, err = checkFinishStatus(url)
   420  	if err != nil {
   421  		return err
   422  	}
   423  	// if its not finished return a new error so we requeue and try again
   424  	if !finished {
   425  		return fmt.Errorf("error finishing the server")
   426  	}
   427  
   428  	// otherwise its finished so return without error
   429  	return nil
   430  }
   431  
   432  // waitForNetworkingSetup check for ingress in ready state for cloud couchdb
   433  func (r *CouchServerReconciler) waitForNetworkingSetup(ctx context.Context, server *dsapi.CouchDBServer) recerr.Error {
   434  	// skip if empty
   435  	if server.Spec.Ingress == (dsapi.Ingress{}) {
   436  		return nil
   437  	}
   438  
   439  	log := ctrl.LoggerFrom(ctx)
   440  
   441  	ingress := &netv1.Ingress{}
   442  	err := r.Client.Get(ctx, types.NamespacedName{
   443  		Name:      couchdb.CouchIngressName, // TODO add server.Spec.Ingress.Name
   444  		Namespace: server.Namespace,
   445  	}, ingress)
   446  	if err != nil {
   447  		log.Error(err, "failed to get Ingress")
   448  		return recerr.New(fmt.Errorf("failed to get Ingress: %w", err), dsapi.ServerSetupFailedReason)
   449  	}
   450  
   451  	for _, ing := range ingress.Status.LoadBalancer.Ingress {
   452  		if ing.IP != "" {
   453  			return nil
   454  		}
   455  	}
   456  
   457  	return recerr.NewWait(fmt.Errorf("ingress not ready, IP address not found"), status.DependencyNotReadyReason, r.Config.IngressNotReady)
   458  }
   459  

View as plain text