// Package operator configures and manages etcd members package operator import ( "context" "fmt" "os" "os/signal" "github.com/spf13/afero" "golang.org/x/sys/unix" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" kruntime "k8s.io/apimachinery/pkg/runtime" k8stypes "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" k8smanager "sigs.k8s.io/controller-runtime/pkg/manager" edgecontroller "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/lib/fog" v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1" "edge-infra.dev/pkg/sds/etcd/operator/constants" "edge-infra.dev/pkg/sds/etcd/operator/internal/config" "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/inform" "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install" "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/lifecycle" "edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/provision" v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" nodemeta "edge-infra.dev/pkg/sds/ien/node" ) // +kubebuilder:rbac:groups="",namespace=etcd-operator,resources=secrets,verbs=get;create;delete // +kubebuilder:rbac:groups="",namespace=etcd-operator,resources=configmaps,verbs=get;create // +kubebuilder:rbac:groups=resilience.edge.ncr.com,resources=etcdmembers,verbs=get;list;watch;create;patch;delete // +kubebuilder:rbac:groups=resilience.edge.ncr.com,resources=etcdmembers/status,verbs=get;create;patch;delete // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=secrets;configmaps,verbs=list;watch // +kubebuilder:rbac:groups="",resourceNames=kubeadm-config,resources=configmaps,verbs=get // +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch // TODO: Remove this permission when removing tigera-operator node affinity change // +kubebuilder:rbac:groups="apps",resourceNames=tigera-operator,namespace=tigera-operator,resources=deployments,verbs=get;update const ( worker = "worker" controlPlane = "controlplane" ) const ( MemberLifecycle = "MemberLifecycleReconciler" MemberInformer = "MemberInformerReconciler" MemberProvision = "MemberProvisionReconciler" Secret = "SecretReconciler" ) // ReconcilerSet is a map of reconcilers sets for different node types type ReconcilerSet map[string]NodeReconcilerSet // NodeReconcilerSet is a map of reconcilers type NodeReconcilerSet map[string]Reconciler // Reconciler is the minimum implementation to be a reconciler type Reconciler interface { SetupWithManager(config.Config, *v1etcd.EtcdMemberList) error Reconcile(context.Context, ctrl.Request) (ctrl.Result, error) } // reconcilerSet is the set of reconcilers for each node type var reconcilerSet = ReconcilerSet{ controlPlane: NodeReconcilerSet{ MemberLifecycle: &lifecycle.Reconciler{}, MemberInformer: &inform.Reconciler{}, MemberProvision: &provision.Reconciler{}, }, worker: NodeReconcilerSet{ Secret: &install.Reconciler{}, }, } // Run is the main entrypoint to the etcd operator func Run(opts ...edgecontroller.Option) error { log := fog.New().WithName("etcdoperator") ctx := context.Background() ctx = fog.IntoContext(ctx, log) if err := afero.NewOsFs().RemoveAll(constants.OperatorFilewallFilepath); err != nil { fog.FromContext(ctx).Error(err, "failed to cleanup Kubernetes API Server firewall on shutdown") } mgr, err := createControllerManager(ctx, reconcilerSet, opts...) if err != nil { return fmt.Errorf("error creating controller manager: %v", err) } if err := runTigeraOperatorOnControlplane(ctx, mgr); err != nil { return err } return mgr.Start(SetupSignalHandler(ctx)) } // createControllerManager creates a new controller manager with the // given reconcilers func createControllerManager(ctx context.Context, reconcilerSet ReconcilerSet, o ...edgecontroller.Option) (ctrl.Manager, error) { log := fog.FromContext(ctx) ctlCfg, opts := edgecontroller.ProcessOptions(o...) opts.Scheme = createScheme() mgr, err := ctrl.NewManager(ctlCfg, opts) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } roleLabel, err := getNodeRole(mgr) if err != nil { return nil, fmt.Errorf("failed to get node role: %v", err) } // retrieve only the reconcilers for this node role reconcilers, ok := reconcilerSet[roleLabel] if !ok { return nil, fmt.Errorf("unsupported node role: %v", roleLabel) } log.V(0).Info(fmt.Sprintf("registering %s controllers", roleLabel)) if err := registerReconcilers(ctx, reconcilers, mgr); err != nil { return nil, fmt.Errorf("failed to register controllers: %v", err) } return mgr, nil } // getNodeRole gets the node role from the node label func getNodeRole(mgr k8smanager.Manager) (string, error) { node := &corev1.Node{} // get the current node name from the environment key := k8stypes.NamespacedName{ Name: os.Getenv("NODE_NAME"), } if err := mgr.GetAPIReader().Get(context.Background(), key, node); err != nil { return "", err } // get the node role label. Should be one of 'controlplane' or 'worker' label, ok := node.GetLabels()[nodemeta.RoleLabel] if !ok { return "", fmt.Errorf("node %v does not have label %v", node.GetName(), nodemeta.RoleLabel) } return label, nil } // registerControllers registers the controllers with the manager func registerReconcilers(ctx context.Context, reconcilers NodeReconcilerSet, mgr ctrl.Manager) error { log := fog.FromContext(ctx) initialMembers := &v1etcd.EtcdMemberList{} if err := mgr.GetAPIReader().List(ctx, initialMembers); err != nil { return err } for name, reconciler := range reconcilers { cfg := createDefaultCfg(name, mgr) log.V(0).Info("registering controller", "name", name) if err := reconciler.SetupWithManager(cfg, initialMembers); err != nil { return fmt.Errorf("failed to register %s: %v", name, err) } } return nil } // createDefaultCfg creates a default reconciler config func createDefaultCfg(name string, mgr ctrl.Manager) config.Config { cfg := config.Config{ Name: name, NodeName: os.Getenv("NODE_NAME"), Mgr: mgr, Fs: afero.NewOsFs(), } cfg.WithDefaultKubeRetryClient() return cfg } // createScheme creates a new scheme with the EtcdMember type registered func createScheme() *kruntime.Scheme { scheme := kruntime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) // register the EtcdMember type utilruntime.Must(v1etcd.AddToScheme(scheme)) return scheme } // TODO: Monitor requirement. // runTigeraOperatorOnControlplane updates the tigera-operator Deployment so // that it runs on the controlplane only. func runTigeraOperatorOnControlplane(ctx context.Context, mgr ctrl.Manager) error { cfg := createDefaultCfg("tigeraOperatorNodeAffinity", mgr) key := k8stypes.NamespacedName{ Name: "tigera-operator", Namespace: "tigera-operator", } deployment := &appsv1.Deployment{} return cfg.KubeRetryClient.IgnoreCache().SafeUpdate(ctx, key, deployment, addControlplaneNodeAffinity) } // TODO: Monitor requirement. // addControlplaneNodeAffinity adds a node affinity to the Deployment so that it // only schedules pods to the controlplane. func addControlplaneNodeAffinity(_ context.Context, obj client.Object) error { deployment := obj.(*appsv1.Deployment) deployment.Spec.Template.Spec.Affinity = &corev1.Affinity{ NodeAffinity: &corev1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ NodeSelectorTerms: []corev1.NodeSelectorTerm{ { MatchExpressions: []corev1.NodeSelectorRequirement{ { Key: nodemeta.RoleLabel, Operator: corev1.NodeSelectorOpIn, Values: []string{string(v1ien.ControlPlane)}, }, }, }, }, }, }, } return nil } var onlyOneSignalHandler = make(chan struct{}) var shutdownSignals = []os.Signal{os.Interrupt, unix.SIGTERM} // SetupSignalHandler registers for SIGTERM and SIGINT. A context is returned // which is canceled on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. func SetupSignalHandler(ctx context.Context) context.Context { close(onlyOneSignalHandler) // panics when called twice ctx, cancel := context.WithCancel(ctx) c := make(chan os.Signal, 2) signal.Notify(c, shutdownSignals...) go func() { <-c cancel() <-c if err := afero.NewOsFs().RemoveAll(constants.OperatorFilewallFilepath); err != nil { fog.FromContext(ctx).Error(err, "failed to cleanup Kubernetes API Server firewall on shutdown") } os.Exit(1) // second signal. Exit directly. }() return ctx }