...

Source file src/edge-infra.dev/pkg/sds/etcd/operator/controller.go

Documentation: edge-infra.dev/pkg/sds/etcd/operator

     1  // Package operator configures and manages etcd members
     2  package operator
     3  
     4  import (
     5  	"context"
     6  	"fmt"
     7  	"os"
     8  	"os/signal"
     9  
    10  	"github.com/spf13/afero"
    11  	"golang.org/x/sys/unix"
    12  	appsv1 "k8s.io/api/apps/v1"
    13  	corev1 "k8s.io/api/core/v1"
    14  	kruntime "k8s.io/apimachinery/pkg/runtime"
    15  	k8stypes "k8s.io/apimachinery/pkg/types"
    16  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    17  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    18  	ctrl "sigs.k8s.io/controller-runtime"
    19  	"sigs.k8s.io/controller-runtime/pkg/client"
    20  	k8smanager "sigs.k8s.io/controller-runtime/pkg/manager"
    21  
    22  	edgecontroller "edge-infra.dev/pkg/k8s/runtime/controller"
    23  	"edge-infra.dev/pkg/lib/fog"
    24  	v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
    25  	"edge-infra.dev/pkg/sds/etcd/operator/constants"
    26  	"edge-infra.dev/pkg/sds/etcd/operator/internal/config"
    27  	"edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/inform"
    28  	"edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install"
    29  	"edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/lifecycle"
    30  	"edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/provision"
    31  	v1ien "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
    32  	nodemeta "edge-infra.dev/pkg/sds/ien/node"
    33  )
    34  
    35  // +kubebuilder:rbac:groups="",namespace=etcd-operator,resources=secrets,verbs=get;create;delete
    36  // +kubebuilder:rbac:groups="",namespace=etcd-operator,resources=configmaps,verbs=get;create
    37  
    38  // +kubebuilder:rbac:groups=resilience.edge.ncr.com,resources=etcdmembers,verbs=get;list;watch;create;patch;delete
    39  // +kubebuilder:rbac:groups=resilience.edge.ncr.com,resources=etcdmembers/status,verbs=get;create;patch;delete
    40  // +kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
    41  // +kubebuilder:rbac:groups="",resources=secrets;configmaps,verbs=list;watch
    42  // +kubebuilder:rbac:groups="",resourceNames=kubeadm-config,resources=configmaps,verbs=get
    43  
    44  // +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch
    45  
    46  // TODO: Remove this permission when removing tigera-operator node affinity change
    47  // +kubebuilder:rbac:groups="apps",resourceNames=tigera-operator,namespace=tigera-operator,resources=deployments,verbs=get;update
    48  
    49  const (
    50  	worker       = "worker"
    51  	controlPlane = "controlplane"
    52  )
    53  
    54  const (
    55  	MemberLifecycle = "MemberLifecycleReconciler"
    56  	MemberInformer  = "MemberInformerReconciler"
    57  	MemberProvision = "MemberProvisionReconciler"
    58  	Secret          = "SecretReconciler"
    59  )
    60  
    61  // ReconcilerSet is a map of reconcilers sets for different node types
    62  type ReconcilerSet map[string]NodeReconcilerSet
    63  
    64  // NodeReconcilerSet is a map of reconcilers
    65  type NodeReconcilerSet map[string]Reconciler
    66  
    67  // Reconciler is the minimum implementation to be a reconciler
    68  type Reconciler interface {
    69  	SetupWithManager(config.Config, *v1etcd.EtcdMemberList) error
    70  	Reconcile(context.Context, ctrl.Request) (ctrl.Result, error)
    71  }
    72  
    73  // reconcilerSet is the set of reconcilers for each node type
    74  var reconcilerSet = ReconcilerSet{
    75  	controlPlane: NodeReconcilerSet{
    76  		MemberLifecycle: &lifecycle.Reconciler{},
    77  		MemberInformer:  &inform.Reconciler{},
    78  		MemberProvision: &provision.Reconciler{},
    79  	},
    80  	worker: NodeReconcilerSet{
    81  		Secret: &install.Reconciler{},
    82  	},
    83  }
    84  
    85  // Run is the main entrypoint to the etcd operator
    86  func Run(opts ...edgecontroller.Option) error {
    87  	log := fog.New().WithName("etcdoperator")
    88  	ctx := context.Background()
    89  	ctx = fog.IntoContext(ctx, log)
    90  
    91  	if err := afero.NewOsFs().RemoveAll(constants.OperatorFilewallFilepath); err != nil {
    92  		fog.FromContext(ctx).Error(err, "failed to cleanup Kubernetes API Server firewall on shutdown")
    93  	}
    94  
    95  	mgr, err := createControllerManager(ctx, reconcilerSet, opts...)
    96  	if err != nil {
    97  		return fmt.Errorf("error creating controller manager: %v", err)
    98  	}
    99  
   100  	if err := runTigeraOperatorOnControlplane(ctx, mgr); err != nil {
   101  		return err
   102  	}
   103  	return mgr.Start(SetupSignalHandler(ctx))
   104  }
   105  
   106  // createControllerManager creates a new controller manager with the
   107  // given reconcilers
   108  func createControllerManager(ctx context.Context, reconcilerSet ReconcilerSet, o ...edgecontroller.Option) (ctrl.Manager, error) {
   109  	log := fog.FromContext(ctx)
   110  	ctlCfg, opts := edgecontroller.ProcessOptions(o...)
   111  	opts.Scheme = createScheme()
   112  
   113  	mgr, err := ctrl.NewManager(ctlCfg, opts)
   114  	if err != nil {
   115  		return nil, fmt.Errorf("failed to create controller manager: %v", err)
   116  	}
   117  
   118  	roleLabel, err := getNodeRole(mgr)
   119  	if err != nil {
   120  		return nil, fmt.Errorf("failed to get node role: %v", err)
   121  	}
   122  	// retrieve only the reconcilers for this node role
   123  	reconcilers, ok := reconcilerSet[roleLabel]
   124  	if !ok {
   125  		return nil, fmt.Errorf("unsupported node role: %v", roleLabel)
   126  	}
   127  
   128  	log.V(0).Info(fmt.Sprintf("registering %s controllers", roleLabel))
   129  	if err := registerReconcilers(ctx, reconcilers, mgr); err != nil {
   130  		return nil, fmt.Errorf("failed to register controllers: %v", err)
   131  	}
   132  
   133  	return mgr, nil
   134  }
   135  
   136  // getNodeRole gets the node role from the node label
   137  func getNodeRole(mgr k8smanager.Manager) (string, error) {
   138  	node := &corev1.Node{}
   139  	// get the current node name from the environment
   140  	key := k8stypes.NamespacedName{
   141  		Name: os.Getenv("NODE_NAME"),
   142  	}
   143  	if err := mgr.GetAPIReader().Get(context.Background(), key, node); err != nil {
   144  		return "", err
   145  	}
   146  	// get the node role label. Should be one of 'controlplane' or 'worker'
   147  	label, ok := node.GetLabels()[nodemeta.RoleLabel]
   148  	if !ok {
   149  		return "", fmt.Errorf("node %v does not have label %v", node.GetName(), nodemeta.RoleLabel)
   150  	}
   151  
   152  	return label, nil
   153  }
   154  
   155  // registerControllers registers the controllers with the manager
   156  func registerReconcilers(ctx context.Context, reconcilers NodeReconcilerSet, mgr ctrl.Manager) error {
   157  	log := fog.FromContext(ctx)
   158  	initialMembers := &v1etcd.EtcdMemberList{}
   159  	if err := mgr.GetAPIReader().List(ctx, initialMembers); err != nil {
   160  		return err
   161  	}
   162  
   163  	for name, reconciler := range reconcilers {
   164  		cfg := createDefaultCfg(name, mgr)
   165  
   166  		log.V(0).Info("registering controller", "name", name)
   167  		if err := reconciler.SetupWithManager(cfg, initialMembers); err != nil {
   168  			return fmt.Errorf("failed to register %s: %v", name, err)
   169  		}
   170  	}
   171  	return nil
   172  }
   173  
   174  // createDefaultCfg creates a default reconciler config
   175  func createDefaultCfg(name string, mgr ctrl.Manager) config.Config {
   176  	cfg := config.Config{
   177  		Name:     name,
   178  		NodeName: os.Getenv("NODE_NAME"),
   179  		Mgr:      mgr,
   180  		Fs:       afero.NewOsFs(),
   181  	}
   182  	cfg.WithDefaultKubeRetryClient()
   183  
   184  	return cfg
   185  }
   186  
   187  // createScheme creates a new scheme with the EtcdMember type registered
   188  func createScheme() *kruntime.Scheme {
   189  	scheme := kruntime.NewScheme()
   190  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   191  	// register the EtcdMember type
   192  	utilruntime.Must(v1etcd.AddToScheme(scheme))
   193  	return scheme
   194  }
   195  
   196  // TODO: Monitor requirement.
   197  // runTigeraOperatorOnControlplane updates the tigera-operator Deployment so
   198  // that it runs on the controlplane only.
   199  func runTigeraOperatorOnControlplane(ctx context.Context, mgr ctrl.Manager) error {
   200  	cfg := createDefaultCfg("tigeraOperatorNodeAffinity", mgr)
   201  	key := k8stypes.NamespacedName{
   202  		Name:      "tigera-operator",
   203  		Namespace: "tigera-operator",
   204  	}
   205  	deployment := &appsv1.Deployment{}
   206  	return cfg.KubeRetryClient.IgnoreCache().SafeUpdate(ctx, key, deployment, addControlplaneNodeAffinity)
   207  }
   208  
   209  // TODO: Monitor requirement.
   210  // addControlplaneNodeAffinity adds a node affinity to the Deployment so that it
   211  // only schedules pods to the controlplane.
   212  func addControlplaneNodeAffinity(_ context.Context, obj client.Object) error {
   213  	deployment := obj.(*appsv1.Deployment)
   214  	deployment.Spec.Template.Spec.Affinity = &corev1.Affinity{
   215  		NodeAffinity: &corev1.NodeAffinity{
   216  			RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
   217  				NodeSelectorTerms: []corev1.NodeSelectorTerm{
   218  					{
   219  						MatchExpressions: []corev1.NodeSelectorRequirement{
   220  							{
   221  								Key:      nodemeta.RoleLabel,
   222  								Operator: corev1.NodeSelectorOpIn,
   223  								Values:   []string{string(v1ien.ControlPlane)},
   224  							},
   225  						},
   226  					},
   227  				},
   228  			},
   229  		},
   230  	}
   231  	return nil
   232  }
   233  
   234  var onlyOneSignalHandler = make(chan struct{})
   235  var shutdownSignals = []os.Signal{os.Interrupt, unix.SIGTERM}
   236  
   237  // SetupSignalHandler registers for SIGTERM and SIGINT. A context is returned
   238  // which is canceled on one of these signals. If a second signal is caught, the program
   239  // is terminated with exit code 1.
   240  func SetupSignalHandler(ctx context.Context) context.Context {
   241  	close(onlyOneSignalHandler) // panics when called twice
   242  
   243  	ctx, cancel := context.WithCancel(ctx)
   244  
   245  	c := make(chan os.Signal, 2)
   246  	signal.Notify(c, shutdownSignals...)
   247  	go func() {
   248  		<-c
   249  		cancel()
   250  		<-c
   251  		if err := afero.NewOsFs().RemoveAll(constants.OperatorFilewallFilepath); err != nil {
   252  			fog.FromContext(ctx).Error(err, "failed to cleanup Kubernetes API Server firewall on shutdown")
   253  		}
   254  		os.Exit(1) // second signal. Exit directly.
   255  	}()
   256  
   257  	return ctx
   258  }
   259  

View as plain text