...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/couchctl.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  
     8  	"k8s.io/apimachinery/pkg/runtime"
     9  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    10  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    11  	coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
    12  	ctrl "sigs.k8s.io/controller-runtime"
    13  	"sigs.k8s.io/controller-runtime/pkg/client"
    14  
    15  	"edge-infra.dev/pkg/edge/constants/api/cluster"
    16  	"edge-infra.dev/pkg/edge/constants/api/fleet"
    17  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    18  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    19  	"edge-infra.dev/pkg/k8s/runtime/controller"
    20  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    21  	"edge-infra.dev/pkg/k8s/runtime/events"
    22  	"edge-infra.dev/pkg/lib/fog"
    23  )
    24  
    25  // +kubebuilder:rbac:groups="policy.linkerd.io",resources=servers;serverauthorizations,verbs=create;get;list;update;patch;watch;delete
    26  // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=create;get;list;update;patch;watch;delete
    27  // +kubebuilder:rbac:groups="apps",resources=statefulsets/status,verbs=get
    28  // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors,verbs=create;get;list;update;patch;watch;delete
    29  // +kubebuilder:rbac:groups="",resources=serviceaccounts;secrets;configmaps;services;namespaces,verbs=create;get;list;update;patch;watch;delete
    30  // +kubebuilder:rbac:groups="",resources=services/status;namespaces/status,verbs=get
    31  // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles;rolebindings;clusterroles;clusterrolebindings,verbs=create;get;list;update;patch;watch
    32  // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles;`clusterroles`,verbs=bind;escalate
    33  // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;patch;update
    34  // +kubebuilder:rbac:groups="datasync.edge.ncr.com",resources=*,verbs=create;get;list;update;patch;watch;delete
    35  // +kubebuilder:rbac:groups="datasync.edge.ncr.com",resources=couchdbpersistences/status;couchdbservers/status;couchdbusers/status;couchdbdatabases/status;couchdbreplicationsets/status,verbs=get;update;patch;watch
    36  // +kubebuilder:rbac:groups=datasync.edge.ncr.com,resources=couchdbusers/finalizers,verbs=get;create;update;patch;delete
    37  // +kubebuilder:rbac:groups=edge.ncr.com,resources=persistence,verbs=create;get;list;update;patch;watch;delete
    38  // +kubebuilder:rbac:groups=edge.ncr.com,resources=persistence/status,verbs=get;watch
    39  // +kubebuilder:rbac:groups="networking.gke.io",resources=managedcertificates,verbs=get;create;list;watch;patch;update
    40  // +kubebuilder:rbac:groups="networking.gke.io",resources=managedcertificates/status,verbs=get;watch
    41  // +kubebuilder:rbac:groups="networking.gke.io",resources=frontendconfigs,verbs=get;create;list;watch;patch;update
    42  // +kubebuilder:rbac:groups="cloud.google.com",resources=backendconfigs,verbs=get;create;list;watch;patch;update
    43  // +kubebuilder:rbac:groups="networking.k8s.io",resources=ingresses,verbs=get;create;list;watch;patch;update
    44  // +kubebuilder:rbac:groups="networking.k8s.io",resources=ingresses/status,verbs=get;watch
    45  // +kubebuilder:rbac:groups="external-secrets.io",resources=externalsecrets,verbs=get;watch;create;patch;update
    46  // +kubebuilder:rbac:groups="",resources=pods;persistentvolumeclaims;persistentvolumes,verbs=get;list;watch
    47  // +kubebuilder:rbac:groups="",resources=pods/status;persistentvolumeclaims/status;persistentvolumes/status,verbs=get;watch
    48  // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch
    49  // +kubebuilder:rbac:groups="apps",resources=statefulsets/status,verbs=get;watch
    50  // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;create
    51  // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
    52  // +kubebuilder:rbac:groups="coordination.k8s.io",resources=leases,verbs=get;list;watch;create;update;patch;delete
    53  
    54  const (
    55  	ControllerNamespace = "couchctl"
    56  )
    57  
    58  // Run creates the manager, sets up the controller, and then starts
    59  // everything. It returns the created manager for testing purposes
    60  func Run(o ...controller.Option) error {
    61  	ctrl.SetLogger(fog.New().WithValues("HOSTNAME", os.Getenv("HOSTNAME")))
    62  	log := ctrl.Log.WithName("couchctl")
    63  
    64  	config, err := NewConfig()
    65  	if err != nil {
    66  		log.Error(err, "fail to build configuration")
    67  		os.Exit(1)
    68  	}
    69  
    70  	ctxMgr := contextAwareManager{cfg: config}
    71  	mgr, err := ctxMgr.createManager(o...)
    72  	if err != nil {
    73  		log.Error(err, "failed to setup controllers")
    74  		return err
    75  	}
    76  
    77  	ple, err := persistenceLeaderElector(mgr, config)
    78  	if err != nil {
    79  		return fmt.Errorf("failed to create persistence leader election: %w", err)
    80  	}
    81  
    82  	nrp := NodeResourcePredicateFunc(func(cfg *Config, o client.Object) bool { return sameNode(cfg, o) })
    83  	replicationEvent := NewReplicationEvent(config)
    84  	err = setupControllers(mgr, config, &gcpSecretManager{}, ple, nrp, replicationEvent)
    85  	if err != nil {
    86  		log.Error(err, "failed to setup controllers")
    87  		return err
    88  	}
    89  
    90  	log.Info("starting manager")
    91  	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    92  		log.Error(err, "problem running manager")
    93  		return err
    94  	}
    95  
    96  	return nil
    97  }
    98  
    99  // contextAwareManager facilitates testing between multiple environments
   100  type contextAwareManager struct {
   101  	cfg *Config
   102  }
   103  
   104  func (c *contextAwareManager) Valid() error {
   105  	if c.cfg.FleetType == "" || c.cfg.ClusterType == "" {
   106  		return fmt.Errorf("fleet type and cluster type must be provided")
   107  	}
   108  	if err := fleet.IsValid(c.cfg.FleetType); err != nil {
   109  		return err
   110  	}
   111  	return cluster.Type(c.cfg.ClusterType).IsValid()
   112  }
   113  
   114  func (c *contextAwareManager) createManager(o ...controller.Option) (ctrl.Manager, error) {
   115  	if err := c.Valid(); err != nil {
   116  		return nil, err
   117  	}
   118  	cfg, opts := controller.ProcessOptions(o...)
   119  	opts.LeaderElectionID = "couchctl.edge.ncr.com"
   120  	opts.Scheme = createScheme()
   121  	return ctrl.NewManager(cfg, opts)
   122  }
   123  
   124  func createScheme() *runtime.Scheme {
   125  	scheme := runtime.NewScheme()
   126  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   127  	utilruntime.Must(dsapi.AddToScheme(scheme))
   128  	utilruntime.Must(whv1.AddToScheme(scheme)) // add only for testing
   129  	return scheme
   130  }
   131  
   132  // setupControllers setup controllers with manager
   133  func setupControllers(mgr ctrl.Manager, config *Config, sm secretManager, ple PersistenceLeaderElector, nrp NodeResourcePredicate, replEvent *ReplicationEvent) error {
   134  	eventRecorder := events.NewRecorder(mgr, ctrl.Log, "couchctl")
   135  	metrics := metrics.New(
   136  		mgr,
   137  		"couchctl",
   138  		metrics.WithSuspend(),
   139  		metrics.WithCollectors(DatabaseReplicationStatus, DatabaseDocumentCount, DatabaseDiffDocumentCount))
   140  
   141  	if err := (&CouchServerReconciler{
   142  		Client:                mgr.GetClient(),
   143  		EventRecorder:         eventRecorder,
   144  		Name:                  "servercontroller",
   145  		Config:                config,
   146  		Metrics:               metrics,
   147  		NodeResourcePredicate: nrp,
   148  	}).SetupWithManager(mgr); err != nil {
   149  		return fmt.Errorf("failed to create CouchServerReconciler and set up with manager: %w", err)
   150  	}
   151  
   152  	if err := (&CouchDatabaseReconciler{
   153  		Client:                mgr.GetClient(),
   154  		EventRecorder:         eventRecorder,
   155  		Name:                  "databasecontroller",
   156  		Config:                config,
   157  		Metrics:               metrics,
   158  		NodeResourcePredicate: nrp,
   159  		ReconcileConcurrency:  config.ReconcileConcurrency,
   160  	}).SetupWithManager(mgr); err != nil {
   161  		return fmt.Errorf("failed to create CouchDatabaseReconciler and set up with manager: %w", err)
   162  	}
   163  
   164  	if err := (&CouchUserReconciler{
   165  		Client:                mgr.GetClient(),
   166  		EventRecorder:         eventRecorder,
   167  		SecretManager:         sm,
   168  		Name:                  "usercontroller",
   169  		Config:                config,
   170  		Metrics:               metrics,
   171  		NodeResourcePredicate: nrp,
   172  	}).SetupWithManager(mgr); err != nil {
   173  		return fmt.Errorf("failed to create CouchUserReconciler and set up with manager: %w", err)
   174  	}
   175  
   176  	if config.IsStore() { //nolint complexity
   177  		if err := (&CouchDBDesignDocReconciler{
   178  			Client:                mgr.GetClient(),
   179  			EventRecorder:         eventRecorder,
   180  			Name:                  "designdoccontroller",
   181  			Metrics:               metrics,
   182  			Config:                config,
   183  			NodeResourcePredicate: nrp,
   184  		}).SetupWithManager(mgr); err != nil {
   185  			return fmt.Errorf("failed to create CouchDBDesignDocReconciler and set up with manager: %w", err)
   186  		}
   187  
   188  		if err := (&CouchIndexReconciler{
   189  			Client:                mgr.GetClient(),
   190  			EventRecorder:         eventRecorder,
   191  			Name:                  "indexcontroller",
   192  			Metrics:               metrics,
   193  			Config:                config,
   194  			NodeResourcePredicate: nrp,
   195  		}).SetupWithManager(mgr); err != nil {
   196  			return fmt.Errorf("failed to create CouchIndexReconciler and set up with manager: %w", err)
   197  		}
   198  
   199  		if err := (&CouchReplicationReconciler{
   200  			Client:                mgr.GetClient(),
   201  			EventRecorder:         eventRecorder,
   202  			Name:                  "replicationcontroller",
   203  			Metrics:               metrics,
   204  			Config:                config,
   205  			NodeResourcePredicate: nrp,
   206  			replicationEvent:      replEvent,
   207  		}).SetupWithManager(mgr); err != nil {
   208  			return fmt.Errorf("failed to create CouchReplicationReconciler and set up with manager: %w", err)
   209  		}
   210  
   211  		if config.IsGeneric() || (config.IsDSDS() && config.IsCPNode()) {
   212  			if err := (&CouchDBPersistenceReconciler{
   213  				Client:                   mgr.GetClient(),
   214  				EventRecorder:            eventRecorder,
   215  				Name:                     "persistencecontroller",
   216  				Config:                   config,
   217  				Metrics:                  metrics,
   218  				LeaderElector:            NodeMemoryElector{},
   219  				PersistenceLeaderElector: ple,
   220  			}).SetupWithManager(mgr); err != nil {
   221  				return fmt.Errorf("failed to create CouchDBPersistenceReconciler and set up with manager: %w", err)
   222  			}
   223  		}
   224  	}
   225  	return nil
   226  }
   227  
   228  // persistenceLeaderElector is a helper function to create a PersistenceLeaderElector
   229  func persistenceLeaderElector(mgr ctrl.Manager, cfg *Config) (PersistenceLeaderElector, error) {
   230  	needLeaderElection := cfg.IsDSDS()
   231  	if needLeaderElection && cfg.IsCPNode() {
   232  		cc, err := coordinationv1client.NewForConfig(mgr.GetConfig())
   233  		if err != nil {
   234  			return nil, err
   235  		}
   236  		le := NewLeaderElection(cc,
   237  			cfg.CouchCTLNamespace,
   238  			"persistence.couchctl.edge.ncr.com",
   239  			cfg.NodeUID)
   240  		go func() {
   241  			le.OnNewLeader(context.Background(), nil)
   242  		}()
   243  		return le, nil
   244  	}
   245  	// No leader election needed, if not dsds or not cp node
   246  	return PersistenceLeaderElectorFunc(func() bool { return !needLeaderElection }), nil
   247  }
   248  

View as plain text