package couchctl import ( "context" "fmt" "os" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/edge/constants/api/cluster" "edge-infra.dev/pkg/edge/constants/api/fleet" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/events" "edge-infra.dev/pkg/lib/fog" ) // +kubebuilder:rbac:groups="policy.linkerd.io",resources=servers;serverauthorizations,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="apps",resources=statefulsets/status,verbs=get // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="",resources=serviceaccounts;secrets;configmaps;services;namespaces,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="",resources=services/status;namespaces/status,verbs=get // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles;rolebindings;clusterroles;clusterrolebindings,verbs=create;get;list;update;patch;watch // +kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=roles;`clusterroles`,verbs=bind;escalate // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;patch;update // +kubebuilder:rbac:groups="datasync.edge.ncr.com",resources=*,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups="datasync.edge.ncr.com",resources=couchdbpersistences/status;couchdbservers/status;couchdbusers/status;couchdbdatabases/status;couchdbreplicationsets/status,verbs=get;update;patch;watch // +kubebuilder:rbac:groups=datasync.edge.ncr.com,resources=couchdbusers/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=edge.ncr.com,resources=persistence,verbs=create;get;list;update;patch;watch;delete // +kubebuilder:rbac:groups=edge.ncr.com,resources=persistence/status,verbs=get;watch // +kubebuilder:rbac:groups="networking.gke.io",resources=managedcertificates,verbs=get;create;list;watch;patch;update // +kubebuilder:rbac:groups="networking.gke.io",resources=managedcertificates/status,verbs=get;watch // +kubebuilder:rbac:groups="networking.gke.io",resources=frontendconfigs,verbs=get;create;list;watch;patch;update // +kubebuilder:rbac:groups="cloud.google.com",resources=backendconfigs,verbs=get;create;list;watch;patch;update // +kubebuilder:rbac:groups="networking.k8s.io",resources=ingresses,verbs=get;create;list;watch;patch;update // +kubebuilder:rbac:groups="networking.k8s.io",resources=ingresses/status,verbs=get;watch // +kubebuilder:rbac:groups="external-secrets.io",resources=externalsecrets,verbs=get;watch;create;patch;update // +kubebuilder:rbac:groups="",resources=pods;persistentvolumeclaims;persistentvolumes,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=pods/status;persistentvolumeclaims/status;persistentvolumes/status,verbs=get;watch // +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch // +kubebuilder:rbac:groups="apps",resources=statefulsets/status,verbs=get;watch // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;create // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // +kubebuilder:rbac:groups="coordination.k8s.io",resources=leases,verbs=get;list;watch;create;update;patch;delete const ( ControllerNamespace = "couchctl" ) // Run creates the manager, sets up the controller, and then starts // everything. It returns the created manager for testing purposes func Run(o ...controller.Option) error { ctrl.SetLogger(fog.New().WithValues("HOSTNAME", os.Getenv("HOSTNAME"))) log := ctrl.Log.WithName("couchctl") config, err := NewConfig() if err != nil { log.Error(err, "fail to build configuration") os.Exit(1) } ctxMgr := contextAwareManager{cfg: config} mgr, err := ctxMgr.createManager(o...) if err != nil { log.Error(err, "failed to setup controllers") return err } ple, err := persistenceLeaderElector(mgr, config) if err != nil { return fmt.Errorf("failed to create persistence leader election: %w", err) } nrp := NodeResourcePredicateFunc(func(cfg *Config, o client.Object) bool { return sameNode(cfg, o) }) replicationEvent := NewReplicationEvent(config) err = setupControllers(mgr, config, &gcpSecretManager{}, ple, nrp, replicationEvent) if err != nil { log.Error(err, "failed to setup controllers") return err } log.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { log.Error(err, "problem running manager") return err } return nil } // contextAwareManager facilitates testing between multiple environments type contextAwareManager struct { cfg *Config } func (c *contextAwareManager) Valid() error { if c.cfg.FleetType == "" || c.cfg.ClusterType == "" { return fmt.Errorf("fleet type and cluster type must be provided") } if err := fleet.IsValid(c.cfg.FleetType); err != nil { return err } return cluster.Type(c.cfg.ClusterType).IsValid() } func (c *contextAwareManager) createManager(o ...controller.Option) (ctrl.Manager, error) { if err := c.Valid(); err != nil { return nil, err } cfg, opts := controller.ProcessOptions(o...) opts.LeaderElectionID = "couchctl.edge.ncr.com" opts.Scheme = createScheme() return ctrl.NewManager(cfg, opts) } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(dsapi.AddToScheme(scheme)) utilruntime.Must(whv1.AddToScheme(scheme)) // add only for testing return scheme } // setupControllers setup controllers with manager func setupControllers(mgr ctrl.Manager, config *Config, sm secretManager, ple PersistenceLeaderElector, nrp NodeResourcePredicate, replEvent *ReplicationEvent) error { eventRecorder := events.NewRecorder(mgr, ctrl.Log, "couchctl") metrics := metrics.New( mgr, "couchctl", metrics.WithSuspend(), metrics.WithCollectors(DatabaseReplicationStatus, DatabaseDocumentCount, DatabaseDiffDocumentCount)) if err := (&CouchServerReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, Name: "servercontroller", Config: config, Metrics: metrics, NodeResourcePredicate: nrp, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchServerReconciler and set up with manager: %w", err) } if err := (&CouchDatabaseReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, Name: "databasecontroller", Config: config, Metrics: metrics, NodeResourcePredicate: nrp, ReconcileConcurrency: config.ReconcileConcurrency, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchDatabaseReconciler and set up with manager: %w", err) } if err := (&CouchUserReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, SecretManager: sm, Name: "usercontroller", Config: config, Metrics: metrics, NodeResourcePredicate: nrp, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchUserReconciler and set up with manager: %w", err) } if config.IsStore() { //nolint complexity if err := (&CouchDBDesignDocReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, Name: "designdoccontroller", Metrics: metrics, Config: config, NodeResourcePredicate: nrp, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchDBDesignDocReconciler and set up with manager: %w", err) } if err := (&CouchIndexReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, Name: "indexcontroller", Metrics: metrics, Config: config, NodeResourcePredicate: nrp, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchIndexReconciler and set up with manager: %w", err) } if err := (&CouchReplicationReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, Name: "replicationcontroller", Metrics: metrics, Config: config, NodeResourcePredicate: nrp, replicationEvent: replEvent, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchReplicationReconciler and set up with manager: %w", err) } if config.IsGeneric() || (config.IsDSDS() && config.IsCPNode()) { if err := (&CouchDBPersistenceReconciler{ Client: mgr.GetClient(), EventRecorder: eventRecorder, Name: "persistencecontroller", Config: config, Metrics: metrics, LeaderElector: NodeMemoryElector{}, PersistenceLeaderElector: ple, }).SetupWithManager(mgr); err != nil { return fmt.Errorf("failed to create CouchDBPersistenceReconciler and set up with manager: %w", err) } } } return nil } // persistenceLeaderElector is a helper function to create a PersistenceLeaderElector func persistenceLeaderElector(mgr ctrl.Manager, cfg *Config) (PersistenceLeaderElector, error) { needLeaderElection := cfg.IsDSDS() if needLeaderElection && cfg.IsCPNode() { cc, err := coordinationv1client.NewForConfig(mgr.GetConfig()) if err != nil { return nil, err } le := NewLeaderElection(cc, cfg.CouchCTLNamespace, "persistence.couchctl.edge.ncr.com", cfg.NodeUID) go func() { le.OnNewLeader(context.Background(), nil) }() return le, nil } // No leader election needed, if not dsds or not cp node return PersistenceLeaderElectorFunc(func() bool { return !needLeaderElection }), nil }