...

Source file src/edge-infra.dev/pkg/sds/etcd/manager/internal/config/config.go

Documentation: edge-infra.dev/pkg/sds/etcd/manager/internal/config

     1  // Package config provides functionality for parsing the command line arguments
     2  // and environment variables for the etcdmanager
     3  package config
     4  
     5  import (
     6  	"context"
     7  	"errors"
     8  	"flag"
     9  	"fmt"
    10  	"net"
    11  	"os"
    12  	"path/filepath"
    13  	"strconv"
    14  	"time"
    15  
    16  	"github.com/peterbourgon/ff/v3"
    17  	"github.com/spf13/afero"
    18  	clientv3 "go.etcd.io/etcd/client/v3"
    19  	"k8s.io/apimachinery/pkg/runtime"
    20  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    21  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    22  	"k8s.io/client-go/tools/clientcmd"
    23  	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
    24  
    25  	"edge-infra.dev/pkg/lib/fog"
    26  	v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1"
    27  	"edge-infra.dev/pkg/sds/lib/containerd"
    28  	"edge-infra.dev/pkg/sds/lib/dbus/systemd"
    29  	"edge-infra.dev/pkg/sds/lib/etcd/client"
    30  	etcdretryclient "edge-infra.dev/pkg/sds/lib/etcd/client/retry"
    31  	kuberetryclient "edge-infra.dev/pkg/sds/lib/k8s/retryclient"
    32  	kubeclienttypes "edge-infra.dev/pkg/sds/lib/k8s/retryclient/types"
    33  	"edge-infra.dev/pkg/sds/lib/os/file"
    34  )
    35  
    36  const (
    37  	DefaultContainerdSocket = "/run/containerd/containerd.sock"
    38  
    39  	DefaultKubeconfig           = "/etc/kubernetes/admin.conf"
    40  	DefaultMaxUnhealthyDuration = 10 * time.Minute
    41  	DefaultLogLevel             = "INFO"
    42  
    43  	DefaultControlplaneEtcdHost = "127.0.0.1"
    44  	DefaultControlplaneEtcdPort = "2379"
    45  
    46  	DefaultKubeClientRequestTimeout = 3 * time.Second
    47  	DefaultKubeClientInitialBackoff = 2 * time.Second
    48  	DefaultKubeClientBackoffFactor  = 1
    49  	DefaultKubeClientMaxRetries     = 1
    50  
    51  	DefaultBlockingKubeClientRequestTimeout = 5 * time.Second
    52  	DefaultBlockingKubeClientInitialBackoff = 15 * time.Second
    53  	DefaultBlockingKubeClientBackoffFactor  = 1
    54  	DefaultBlockingKubeClientMaxRetries     = 20
    55  
    56  	DefaultEtcdClientRequestTimeout = 3 * time.Second
    57  	DefaultEtcdClientInitialBackoff = 2 * time.Second
    58  	DefaultEtcdClientBackoffFactor  = 1
    59  	DefaultEtcdClientMaxRetries     = 1
    60  )
    61  
    62  var (
    63  	// logMap represents the different log levels provided by the fog package
    64  	logMap = map[string]int{
    65  		"DEBUG":  fog.DEBUG,
    66  		"INFO":   fog.INFO,
    67  		"WARN":   fog.WARN,
    68  		"ERROR":  fog.ERROR,
    69  		"DPANIC": fog.DPANIC,
    70  		"PANIC":  fog.PANIC,
    71  		"FATAL":  fog.FATAL,
    72  	}
    73  )
    74  
    75  //go:generate mockgen -destination=./mocks/mock_config.go -package=mocks edge-infra.dev/pkg/sds/etcd/manager/internal/config Config
    76  type Config interface {
    77  	BlockingKubeRetryClient() (kubeclienttypes.Retrier, error)
    78  	KubeRetryClient() (kubeclienttypes.Retrier, error)
    79  	EtcdRetryClient() (etcdretryclient.Retrier, error)
    80  	ContainerdClient() (containerd.Client, error)
    81  	SystemdConnection(context.Context) (systemd.Connection, error)
    82  	Fs() afero.Fs
    83  	Endpoint() string
    84  	MaxUnhealthy() time.Duration
    85  	LogLevel() int
    86  }
    87  
    88  // config represents the configuration for the etcdmanager
    89  type config struct {
    90  	fs afero.Fs
    91  	settings
    92  	clients
    93  }
    94  
    95  // settings represents the configuration settings for the etcdmanager
    96  type settings struct {
    97  	kubeconfig                 string
    98  	containerdSocket           string
    99  	controlplaneEtcd           address
   100  	kubeClientSettings         clientConfig
   101  	blockingKubeClientSettings clientConfig
   102  	etcdClientSettings         clientConfig
   103  	maxUnhealthy               time.Duration
   104  	logLevel                   string
   105  }
   106  
   107  // clientConfig represents the settings for retry clients
   108  type clientConfig struct {
   109  	requestTimeout time.Duration
   110  	initialBackoff time.Duration
   111  	backoffFactor  float64
   112  	maxRetries     int
   113  }
   114  
   115  // address represents the address of the client targets
   116  type address struct {
   117  	host string
   118  	port string
   119  }
   120  
   121  // clients represents the clients used by the etcdmanager
   122  type clients struct {
   123  	kube         kubeclienttypes.Retrier
   124  	blockingKube kubeclienttypes.Retrier
   125  }
   126  
   127  // New creates a new Config and initializes it
   128  func New() (Config, error) {
   129  	cfg := &config{}
   130  	if err := cfg.Initialize(); err != nil {
   131  		return nil, err
   132  	}
   133  
   134  	return cfg, nil
   135  }
   136  
   137  // Initialize updates the config fields with the command line arguments and environment variables
   138  func (c *config) Initialize() error {
   139  	fs := flag.NewFlagSet("run", flag.ExitOnError)
   140  	c.bindFlags(fs)
   141  	// parse the config from the command line arguments and environment variables
   142  	if err := ff.Parse(fs, os.Args[2:], ff.WithEnvVarNoPrefix()); err != nil {
   143  		return fmt.Errorf("failed to parse command line arguments and environment variables: %w", err)
   144  	}
   145  	// validate the found config values
   146  	if err := c.validate(); err != nil {
   147  		return fmt.Errorf("invalid command line arguments or environment variables: %w", err)
   148  	}
   149  	return nil
   150  }
   151  
   152  // bindFlags binds the command line arguments and environment variables to the config fields
   153  func (c *config) bindFlags(fs *flag.FlagSet) {
   154  	fs.StringVar(&c.settings.kubeconfig,
   155  		"kubeconfig",
   156  		DefaultKubeconfig,
   157  		"path to kubeconfig file (must be in /etc/kubernetes)")
   158  	fs.StringVar(&c.settings.containerdSocket,
   159  		"containerd-socket",
   160  		DefaultContainerdSocket,
   161  		"path to the containerd socket")
   162  	fs.DurationVar(&c.settings.maxUnhealthy,
   163  		"max-unhealthy-duration",
   164  		DefaultMaxUnhealthyDuration,
   165  		"maximum duration an etcd cluster can be unhealthy before a fix is attempted")
   166  	fs.StringVar(&c.settings.logLevel,
   167  		"log-level",
   168  		DefaultLogLevel,
   169  		"must be one of DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL")
   170  
   171  	c.bindControlPlaneEtcdFlags(fs)
   172  	c.bindKubeClientFlags(fs)
   173  	c.bindBlockingKubeClientFlags(fs)
   174  	c.bindEtcdClientFlags(fs)
   175  }
   176  
   177  // bindControlPlaneEtcdFlags binds the control plane etcd address variables to the config fields
   178  func (c *config) bindControlPlaneEtcdFlags(fs *flag.FlagSet) {
   179  	fs.StringVar(&c.settings.controlplaneEtcd.host,
   180  		"controlplane-etcd-host",
   181  		DefaultControlplaneEtcdHost,
   182  		"controlplane etcd host")
   183  	fs.StringVar(&c.settings.controlplaneEtcd.port,
   184  		"controlplane-etcd-port",
   185  		DefaultControlplaneEtcdPort,
   186  		"controlplane etcd port")
   187  }
   188  
   189  // bindKubeClientFlags binds the kube retry client settings to the config fields
   190  func (c *config) bindKubeClientFlags(fs *flag.FlagSet) {
   191  	fs.DurationVar(&c.settings.kubeClientSettings.requestTimeout,
   192  		"kube-client-request-timeout",
   193  		DefaultKubeClientRequestTimeout,
   194  		"timeout for Kubernetes client requests")
   195  	fs.DurationVar(&c.settings.kubeClientSettings.initialBackoff,
   196  		"kube-client-initial-backoff",
   197  		DefaultKubeClientInitialBackoff,
   198  		"initial backoff period between Kubernetes client request attempts")
   199  	fs.Float64Var(&c.settings.kubeClientSettings.backoffFactor,
   200  		"kube-client-backoff-factor",
   201  		DefaultKubeClientBackoffFactor,
   202  		"factor by which the backoff period increases after each retry")
   203  	fs.IntVar(&c.settings.kubeClientSettings.maxRetries,
   204  		"kube-client-max-retries",
   205  		DefaultKubeClientMaxRetries,
   206  		"max amount of times to retry a failing Kubernetes client request")
   207  }
   208  
   209  // bindBlockingKubeClientFlags binds the blocking kube retry client settings to the config fields
   210  func (c *config) bindBlockingKubeClientFlags(fs *flag.FlagSet) {
   211  	fs.DurationVar(&c.settings.blockingKubeClientSettings.requestTimeout,
   212  		"blocking-kube-client-request-timeout",
   213  		DefaultBlockingKubeClientRequestTimeout,
   214  		"timeout for blocking Kubernetes client requests")
   215  	fs.DurationVar(&c.settings.blockingKubeClientSettings.initialBackoff,
   216  		"blocking-kube-client-initial-backoff",
   217  		DefaultBlockingKubeClientInitialBackoff,
   218  		"initial backoff period between blocking Kubernetes client request attempts")
   219  	fs.Float64Var(&c.settings.blockingKubeClientSettings.backoffFactor,
   220  		"blocking-kube-client-backoff-factor",
   221  		DefaultBlockingKubeClientBackoffFactor,
   222  		"factor by which the backoff period increases after each retry")
   223  	fs.IntVar(&c.settings.blockingKubeClientSettings.maxRetries,
   224  		"blocking-kube-client-max-retries",
   225  		DefaultBlockingKubeClientMaxRetries,
   226  		"max amount of times to retry a failing blocking Kubernetes client request")
   227  }
   228  
   229  // bindEtcdClientFlags binds the etcd retry client settings to the config fields
   230  func (c *config) bindEtcdClientFlags(fs *flag.FlagSet) {
   231  	fs.DurationVar(&c.settings.etcdClientSettings.requestTimeout,
   232  		"etcd-client-request-timeout",
   233  		DefaultEtcdClientRequestTimeout,
   234  		"timeout for etcd client requests")
   235  	fs.DurationVar(&c.settings.etcdClientSettings.initialBackoff,
   236  		"etcd-client-initial-backoff",
   237  		DefaultEtcdClientInitialBackoff,
   238  		"initial backoff period between etcd client request attempts")
   239  	fs.Float64Var(&c.settings.etcdClientSettings.backoffFactor,
   240  		"etcd-client-backoff-factor",
   241  		DefaultEtcdClientBackoffFactor,
   242  		"factor by which the backoff period increases after each retry")
   243  	fs.IntVar(&c.settings.etcdClientSettings.maxRetries,
   244  		"etcd-client-max-retries",
   245  		DefaultEtcdClientMaxRetries,
   246  		"max amount of times to retry a failing etcd client request")
   247  }
   248  
   249  // validate ensures that the config fields are valid for the etcd manager requirements
   250  func (c *config) validate() error {
   251  	if dir := filepath.Dir(c.settings.kubeconfig); dir != "/etc/kubernetes" {
   252  		return fmt.Errorf("invalid kubeconfig path: %s", dir)
   253  	}
   254  
   255  	if _, ok := logMap[c.settings.logLevel]; !ok {
   256  		return fmt.Errorf("invalid log level: %s", c.settings.logLevel)
   257  	}
   258  
   259  	if _, err := net.LookupIP(c.settings.controlplaneEtcd.host); err != nil {
   260  		return fmt.Errorf("invalid etcd hostname: %v", err)
   261  	}
   262  
   263  	port, err := strconv.Atoi(c.settings.controlplaneEtcd.port)
   264  	if err != nil {
   265  		return fmt.Errorf("invalid etcd port: %v", err)
   266  	}
   267  	if port < 1 || port > 65535 {
   268  		return errors.New("invalid etcd port: port value must be between 1 and 65535")
   269  	}
   270  
   271  	return nil
   272  }
   273  
   274  // EtcdRetryClient returns a new etcd retry client.
   275  func (c *config) EtcdRetryClient() (etcdretryclient.Retrier, error) {
   276  	client, err := newEtcdClient(c.settings.controlplaneEtcd, c.settings.etcdClientSettings.requestTimeout)
   277  	if err != nil {
   278  		return nil, err
   279  	}
   280  	config := etcdretryclient.Config{
   281  		RequestTimeout: c.settings.etcdClientSettings.requestTimeout,
   282  		InitialBackoff: c.settings.etcdClientSettings.initialBackoff,
   283  		BackoffFactor:  c.settings.etcdClientSettings.backoffFactor,
   284  		MaxRetries:     c.settings.etcdClientSettings.maxRetries,
   285  	}
   286  
   287  	return etcdretryclient.New(*client, config), nil
   288  }
   289  
   290  // newEtcdClient creates a new etcd client with the default TLS config
   291  func newEtcdClient(addr address, timeout time.Duration) (*clientv3.Client, error) {
   292  	tlsConfig, err := client.NewTLSConfig(file.New())
   293  	if err != nil {
   294  		return nil, err
   295  	}
   296  
   297  	clientURL := net.JoinHostPort(addr.host, addr.port)
   298  	// Setup the etcd client using the controlplane etcd host and port to be used in the retry client operations
   299  	client, err := client.New(tlsConfig, timeout, clientURL)
   300  	if err != nil {
   301  		return nil, fmt.Errorf("failed to create etcd client: %w", err)
   302  	}
   303  
   304  	return client, nil
   305  }
   306  
   307  // KubeRetryClient creates a kube retry client if one does not already exist and
   308  // returns it
   309  func (c *config) KubeRetryClient() (kubeclienttypes.Retrier, error) {
   310  	if c.clients.kube == nil {
   311  		client, err := newKubeClient(c.settings.kubeconfig)
   312  		if err != nil {
   313  			return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
   314  		}
   315  
   316  		config := kuberetryclient.Config{
   317  			RequestTimeout: c.settings.kubeClientSettings.requestTimeout,
   318  			InitialBackoff: c.settings.kubeClientSettings.initialBackoff,
   319  			BackoffFactor:  c.settings.kubeClientSettings.backoffFactor,
   320  			MaxRetries:     c.settings.kubeClientSettings.maxRetries,
   321  		}
   322  		c.clients.kube = kuberetryclient.New(client, client, config)
   323  	}
   324  	return c.clients.kube, nil
   325  }
   326  
   327  // BlockingKubeRetryClient creates a blocking kube retry client if one does not already exist and
   328  // returns it
   329  func (c *config) BlockingKubeRetryClient() (kubeclienttypes.Retrier, error) {
   330  	if c.clients.blockingKube == nil {
   331  		var (
   332  			startTime = time.Now()
   333  			client    ctrlclient.Client
   334  			err       error
   335  		)
   336  		for {
   337  			if client, err = newKubeClient(c.settings.kubeconfig); err == nil || time.Since(startTime) > 5*time.Minute {
   338  				break
   339  			}
   340  			time.Sleep(10 * time.Second)
   341  		}
   342  		if err != nil {
   343  			return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
   344  		}
   345  
   346  		config := kuberetryclient.Config{
   347  			RequestTimeout: c.settings.blockingKubeClientSettings.requestTimeout,
   348  			InitialBackoff: c.settings.blockingKubeClientSettings.initialBackoff,
   349  			BackoffFactor:  c.settings.blockingKubeClientSettings.backoffFactor,
   350  			MaxRetries:     c.settings.blockingKubeClientSettings.maxRetries,
   351  		}
   352  		c.clients.blockingKube = kuberetryclient.New(client, client, config)
   353  	}
   354  	return c.clients.blockingKube, nil
   355  }
   356  
   357  // newKubeClient creates a new Kubernetes client with the EtcdMember custom resource
   358  // added to the default scheme.
   359  func newKubeClient(kubeconfig string) (ctrlclient.Client, error) {
   360  	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
   361  	if err != nil {
   362  		return nil, fmt.Errorf("failed to create Kubernetes config: %w", err)
   363  	}
   364  
   365  	opts := ctrlclient.Options{
   366  		Scheme: createScheme(),
   367  	}
   368  	return ctrlclient.New(config, opts)
   369  }
   370  
   371  // createScheme creates a new scheme with the EtcdMember type registered
   372  func createScheme() *runtime.Scheme {
   373  	scheme := runtime.NewScheme()
   374  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   375  	utilruntime.Must(v1etcd.AddToScheme(scheme))
   376  	return scheme
   377  }
   378  
   379  // ContainerdClient returns a new containerd client.
   380  func (c *config) ContainerdClient() (containerd.Client, error) {
   381  	return containerd.NewClient(c.settings.containerdSocket)
   382  }
   383  
   384  // SystemdConnection returns a new systemd connection.
   385  func (c *config) SystemdConnection(ctx context.Context) (systemd.Connection, error) {
   386  	return systemd.NewConnection(ctx)
   387  }
   388  
   389  // Fs creates the filesystem if one has does not already exist and returns it
   390  func (c *config) Fs() afero.Fs {
   391  	if c.fs == nil {
   392  		c.fs = afero.NewOsFs()
   393  	}
   394  
   395  	return c.fs
   396  }
   397  
   398  // Endpoint returns the endpoint of the controlplane etcd member, which acts as the
   399  // client target for the etcd manager to determine the health of the cluster from
   400  // the controlplane's perspective
   401  func (c *config) Endpoint() string {
   402  	return net.JoinHostPort(c.settings.controlplaneEtcd.host, c.settings.controlplaneEtcd.port)
   403  }
   404  
   405  // MaxUnhealthy returns the maximum duration an etcd cluster can be unhealthy
   406  func (c *config) MaxUnhealthy() time.Duration {
   407  	return c.settings.maxUnhealthy
   408  }
   409  
   410  // LogLevel returns the integer log level for the etcdmanager
   411  func (c *config) LogLevel() int {
   412  	return logMap[c.settings.logLevel]
   413  }
   414  

View as plain text