// Package config provides functionality for parsing the command line arguments // and environment variables for the etcdmanager package config import ( "context" "errors" "flag" "fmt" "net" "os" "path/filepath" "strconv" "time" "github.com/peterbourgon/ff/v3" "github.com/spf13/afero" clientv3 "go.etcd.io/etcd/client/v3" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/lib/fog" v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1" "edge-infra.dev/pkg/sds/lib/containerd" "edge-infra.dev/pkg/sds/lib/dbus/systemd" "edge-infra.dev/pkg/sds/lib/etcd/client" etcdretryclient "edge-infra.dev/pkg/sds/lib/etcd/client/retry" kuberetryclient "edge-infra.dev/pkg/sds/lib/k8s/retryclient" kubeclienttypes "edge-infra.dev/pkg/sds/lib/k8s/retryclient/types" "edge-infra.dev/pkg/sds/lib/os/file" ) const ( DefaultContainerdSocket = "/run/containerd/containerd.sock" DefaultKubeconfig = "/etc/kubernetes/admin.conf" DefaultMaxUnhealthyDuration = 10 * time.Minute DefaultLogLevel = "INFO" DefaultControlplaneEtcdHost = "127.0.0.1" DefaultControlplaneEtcdPort = "2379" DefaultKubeClientRequestTimeout = 3 * time.Second DefaultKubeClientInitialBackoff = 2 * time.Second DefaultKubeClientBackoffFactor = 1 DefaultKubeClientMaxRetries = 1 DefaultBlockingKubeClientRequestTimeout = 5 * time.Second DefaultBlockingKubeClientInitialBackoff = 15 * time.Second DefaultBlockingKubeClientBackoffFactor = 1 DefaultBlockingKubeClientMaxRetries = 20 DefaultEtcdClientRequestTimeout = 3 * time.Second DefaultEtcdClientInitialBackoff = 2 * time.Second DefaultEtcdClientBackoffFactor = 1 DefaultEtcdClientMaxRetries = 1 ) var ( // logMap represents the different log levels provided by the fog package logMap = map[string]int{ "DEBUG": fog.DEBUG, "INFO": fog.INFO, "WARN": fog.WARN, "ERROR": fog.ERROR, "DPANIC": fog.DPANIC, "PANIC": fog.PANIC, "FATAL": fog.FATAL, } ) //go:generate mockgen -destination=./mocks/mock_config.go -package=mocks edge-infra.dev/pkg/sds/etcd/manager/internal/config Config type Config interface { BlockingKubeRetryClient() (kubeclienttypes.Retrier, error) KubeRetryClient() (kubeclienttypes.Retrier, error) EtcdRetryClient() (etcdretryclient.Retrier, error) ContainerdClient() (containerd.Client, error) SystemdConnection(context.Context) (systemd.Connection, error) Fs() afero.Fs Endpoint() string MaxUnhealthy() time.Duration LogLevel() int } // config represents the configuration for the etcdmanager type config struct { fs afero.Fs settings clients } // settings represents the configuration settings for the etcdmanager type settings struct { kubeconfig string containerdSocket string controlplaneEtcd address kubeClientSettings clientConfig blockingKubeClientSettings clientConfig etcdClientSettings clientConfig maxUnhealthy time.Duration logLevel string } // clientConfig represents the settings for retry clients type clientConfig struct { requestTimeout time.Duration initialBackoff time.Duration backoffFactor float64 maxRetries int } // address represents the address of the client targets type address struct { host string port string } // clients represents the clients used by the etcdmanager type clients struct { kube kubeclienttypes.Retrier blockingKube kubeclienttypes.Retrier } // New creates a new Config and initializes it func New() (Config, error) { cfg := &config{} if err := cfg.Initialize(); err != nil { return nil, err } return cfg, nil } // Initialize updates the config fields with the command line arguments and environment variables func (c *config) Initialize() error { fs := flag.NewFlagSet("run", flag.ExitOnError) c.bindFlags(fs) // parse the config from the command line arguments and environment variables if err := ff.Parse(fs, os.Args[2:], ff.WithEnvVarNoPrefix()); err != nil { return fmt.Errorf("failed to parse command line arguments and environment variables: %w", err) } // validate the found config values if err := c.validate(); err != nil { return fmt.Errorf("invalid command line arguments or environment variables: %w", err) } return nil } // bindFlags binds the command line arguments and environment variables to the config fields func (c *config) bindFlags(fs *flag.FlagSet) { fs.StringVar(&c.settings.kubeconfig, "kubeconfig", DefaultKubeconfig, "path to kubeconfig file (must be in /etc/kubernetes)") fs.StringVar(&c.settings.containerdSocket, "containerd-socket", DefaultContainerdSocket, "path to the containerd socket") fs.DurationVar(&c.settings.maxUnhealthy, "max-unhealthy-duration", DefaultMaxUnhealthyDuration, "maximum duration an etcd cluster can be unhealthy before a fix is attempted") fs.StringVar(&c.settings.logLevel, "log-level", DefaultLogLevel, "must be one of DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL") c.bindControlPlaneEtcdFlags(fs) c.bindKubeClientFlags(fs) c.bindBlockingKubeClientFlags(fs) c.bindEtcdClientFlags(fs) } // bindControlPlaneEtcdFlags binds the control plane etcd address variables to the config fields func (c *config) bindControlPlaneEtcdFlags(fs *flag.FlagSet) { fs.StringVar(&c.settings.controlplaneEtcd.host, "controlplane-etcd-host", DefaultControlplaneEtcdHost, "controlplane etcd host") fs.StringVar(&c.settings.controlplaneEtcd.port, "controlplane-etcd-port", DefaultControlplaneEtcdPort, "controlplane etcd port") } // bindKubeClientFlags binds the kube retry client settings to the config fields func (c *config) bindKubeClientFlags(fs *flag.FlagSet) { fs.DurationVar(&c.settings.kubeClientSettings.requestTimeout, "kube-client-request-timeout", DefaultKubeClientRequestTimeout, "timeout for Kubernetes client requests") fs.DurationVar(&c.settings.kubeClientSettings.initialBackoff, "kube-client-initial-backoff", DefaultKubeClientInitialBackoff, "initial backoff period between Kubernetes client request attempts") fs.Float64Var(&c.settings.kubeClientSettings.backoffFactor, "kube-client-backoff-factor", DefaultKubeClientBackoffFactor, "factor by which the backoff period increases after each retry") fs.IntVar(&c.settings.kubeClientSettings.maxRetries, "kube-client-max-retries", DefaultKubeClientMaxRetries, "max amount of times to retry a failing Kubernetes client request") } // bindBlockingKubeClientFlags binds the blocking kube retry client settings to the config fields func (c *config) bindBlockingKubeClientFlags(fs *flag.FlagSet) { fs.DurationVar(&c.settings.blockingKubeClientSettings.requestTimeout, "blocking-kube-client-request-timeout", DefaultBlockingKubeClientRequestTimeout, "timeout for blocking Kubernetes client requests") fs.DurationVar(&c.settings.blockingKubeClientSettings.initialBackoff, "blocking-kube-client-initial-backoff", DefaultBlockingKubeClientInitialBackoff, "initial backoff period between blocking Kubernetes client request attempts") fs.Float64Var(&c.settings.blockingKubeClientSettings.backoffFactor, "blocking-kube-client-backoff-factor", DefaultBlockingKubeClientBackoffFactor, "factor by which the backoff period increases after each retry") fs.IntVar(&c.settings.blockingKubeClientSettings.maxRetries, "blocking-kube-client-max-retries", DefaultBlockingKubeClientMaxRetries, "max amount of times to retry a failing blocking Kubernetes client request") } // bindEtcdClientFlags binds the etcd retry client settings to the config fields func (c *config) bindEtcdClientFlags(fs *flag.FlagSet) { fs.DurationVar(&c.settings.etcdClientSettings.requestTimeout, "etcd-client-request-timeout", DefaultEtcdClientRequestTimeout, "timeout for etcd client requests") fs.DurationVar(&c.settings.etcdClientSettings.initialBackoff, "etcd-client-initial-backoff", DefaultEtcdClientInitialBackoff, "initial backoff period between etcd client request attempts") fs.Float64Var(&c.settings.etcdClientSettings.backoffFactor, "etcd-client-backoff-factor", DefaultEtcdClientBackoffFactor, "factor by which the backoff period increases after each retry") fs.IntVar(&c.settings.etcdClientSettings.maxRetries, "etcd-client-max-retries", DefaultEtcdClientMaxRetries, "max amount of times to retry a failing etcd client request") } // validate ensures that the config fields are valid for the etcd manager requirements func (c *config) validate() error { if dir := filepath.Dir(c.settings.kubeconfig); dir != "/etc/kubernetes" { return fmt.Errorf("invalid kubeconfig path: %s", dir) } if _, ok := logMap[c.settings.logLevel]; !ok { return fmt.Errorf("invalid log level: %s", c.settings.logLevel) } if _, err := net.LookupIP(c.settings.controlplaneEtcd.host); err != nil { return fmt.Errorf("invalid etcd hostname: %v", err) } port, err := strconv.Atoi(c.settings.controlplaneEtcd.port) if err != nil { return fmt.Errorf("invalid etcd port: %v", err) } if port < 1 || port > 65535 { return errors.New("invalid etcd port: port value must be between 1 and 65535") } return nil } // EtcdRetryClient returns a new etcd retry client. func (c *config) EtcdRetryClient() (etcdretryclient.Retrier, error) { client, err := newEtcdClient(c.settings.controlplaneEtcd, c.settings.etcdClientSettings.requestTimeout) if err != nil { return nil, err } config := etcdretryclient.Config{ RequestTimeout: c.settings.etcdClientSettings.requestTimeout, InitialBackoff: c.settings.etcdClientSettings.initialBackoff, BackoffFactor: c.settings.etcdClientSettings.backoffFactor, MaxRetries: c.settings.etcdClientSettings.maxRetries, } return etcdretryclient.New(*client, config), nil } // newEtcdClient creates a new etcd client with the default TLS config func newEtcdClient(addr address, timeout time.Duration) (*clientv3.Client, error) { tlsConfig, err := client.NewTLSConfig(file.New()) if err != nil { return nil, err } clientURL := net.JoinHostPort(addr.host, addr.port) // Setup the etcd client using the controlplane etcd host and port to be used in the retry client operations client, err := client.New(tlsConfig, timeout, clientURL) if err != nil { return nil, fmt.Errorf("failed to create etcd client: %w", err) } return client, nil } // KubeRetryClient creates a kube retry client if one does not already exist and // returns it func (c *config) KubeRetryClient() (kubeclienttypes.Retrier, error) { if c.clients.kube == nil { client, err := newKubeClient(c.settings.kubeconfig) if err != nil { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } config := kuberetryclient.Config{ RequestTimeout: c.settings.kubeClientSettings.requestTimeout, InitialBackoff: c.settings.kubeClientSettings.initialBackoff, BackoffFactor: c.settings.kubeClientSettings.backoffFactor, MaxRetries: c.settings.kubeClientSettings.maxRetries, } c.clients.kube = kuberetryclient.New(client, client, config) } return c.clients.kube, nil } // BlockingKubeRetryClient creates a blocking kube retry client if one does not already exist and // returns it func (c *config) BlockingKubeRetryClient() (kubeclienttypes.Retrier, error) { if c.clients.blockingKube == nil { var ( startTime = time.Now() client ctrlclient.Client err error ) for { if client, err = newKubeClient(c.settings.kubeconfig); err == nil || time.Since(startTime) > 5*time.Minute { break } time.Sleep(10 * time.Second) } if err != nil { return nil, fmt.Errorf("failed to create kubernetes client: %w", err) } config := kuberetryclient.Config{ RequestTimeout: c.settings.blockingKubeClientSettings.requestTimeout, InitialBackoff: c.settings.blockingKubeClientSettings.initialBackoff, BackoffFactor: c.settings.blockingKubeClientSettings.backoffFactor, MaxRetries: c.settings.blockingKubeClientSettings.maxRetries, } c.clients.blockingKube = kuberetryclient.New(client, client, config) } return c.clients.blockingKube, nil } // newKubeClient creates a new Kubernetes client with the EtcdMember custom resource // added to the default scheme. func newKubeClient(kubeconfig string) (ctrlclient.Client, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, fmt.Errorf("failed to create Kubernetes config: %w", err) } opts := ctrlclient.Options{ Scheme: createScheme(), } return ctrlclient.New(config, opts) } // createScheme creates a new scheme with the EtcdMember type registered func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1etcd.AddToScheme(scheme)) return scheme } // ContainerdClient returns a new containerd client. func (c *config) ContainerdClient() (containerd.Client, error) { return containerd.NewClient(c.settings.containerdSocket) } // SystemdConnection returns a new systemd connection. func (c *config) SystemdConnection(ctx context.Context) (systemd.Connection, error) { return systemd.NewConnection(ctx) } // Fs creates the filesystem if one has does not already exist and returns it func (c *config) Fs() afero.Fs { if c.fs == nil { c.fs = afero.NewOsFs() } return c.fs } // Endpoint returns the endpoint of the controlplane etcd member, which acts as the // client target for the etcd manager to determine the health of the cluster from // the controlplane's perspective func (c *config) Endpoint() string { return net.JoinHostPort(c.settings.controlplaneEtcd.host, c.settings.controlplaneEtcd.port) } // MaxUnhealthy returns the maximum duration an etcd cluster can be unhealthy func (c *config) MaxUnhealthy() time.Duration { return c.settings.maxUnhealthy } // LogLevel returns the integer log level for the etcdmanager func (c *config) LogLevel() int { return logMap[c.settings.logLevel] }