// Package config provides functionality for the configuring the etcd operator's reconcilers package config import ( "context" "fmt" "net" "strings" "time" "github.com/spf13/afero" clientv3 "go.etcd.io/etcd/client/v3" v1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" calico "edge-infra.dev/pkg/k8s/net/calico" nodemeta "edge-infra.dev/pkg/sds/ien/node" "edge-infra.dev/pkg/sds/lib/etcd/client" etcdretryclient "edge-infra.dev/pkg/sds/lib/etcd/client/retry" kuberetryclient "edge-infra.dev/pkg/sds/lib/k8s/retryclient" kubeclienttypes "edge-infra.dev/pkg/sds/lib/k8s/retryclient/types" "edge-infra.dev/pkg/sds/lib/os/file" ) const ( Worker = "worker" ControlPlane = "controlplane" ) // Config contains basic configuration for the reconciler and embeds the clients type Config struct { Name string NodeName string Mgr ctrl.Manager EtcdRetryClient etcdretryclient.Retrier KubeRetryClient kubeclienttypes.Retrier Fs afero.Fs } // WithDefaultKubeRetryClient sets the default retry client for the config with the // manager's client and reader. func (c *Config) WithDefaultKubeRetryClient() { c.KubeRetryClient = kuberetryclient.New(c.Mgr.GetClient(), c.Mgr.GetAPIReader(), kuberetryclient.Config{}) } // WithDefaultEtcdRetryClient sets the default retry client for the config. The // default retry client will retry requests every 2 seconds, each request will // timeout after 2 seconds, and the client will retry for a total of 2 seconds func (c *Config) WithDefaultEtcdRetryClient(ctx context.Context) error { client, err := createEtcdClient(ctx, c.Mgr.GetClient()) if err != nil { return err } config := etcdretryclient.Config{} c.EtcdRetryClient = etcdretryclient.New(*client, config) return nil } // createEtcdClient creates a new etcd client using the manager's kubernetes client // to get the etcd IP address func createEtcdClient(ctx context.Context, kubeClient k8sclient.Client) (*clientv3.Client, error) { config, err := client.NewTLSConfig(file.New()) if err != nil { return nil, fmt.Errorf("failed to create etcd tls config: %w", err) } address, err := getEtcdAddress(ctx, kubeClient) if err != nil { return nil, fmt.Errorf("failed to get etcd IP address: %w", err) } clientURL := net.JoinHostPort(address, "2379") return client.New(config, 5*time.Second, clientURL) } // getEtcdAddress gets the IP address of the controlplane's etcd pod func getEtcdAddress(ctx context.Context, client k8sclient.Client) (string, error) { // ([]*net.IP, error) { nodeList := &v1.NodeList{} if err := client.List(ctx, nodeList); err != nil { return "", err } // find the controlplane node and return the IP address for _, node := range nodeList.Items { if strings.Contains(node.GetLabels()[nodemeta.RoleLabel], ControlPlane) { IPAddress, err := calico.ParseNodeIPs(node) if err != nil { return "", err } return IPAddress[0].String(), nil } } return "", fmt.Errorf("failed to fine a node with label %v: %v", nodemeta.RoleLabel, ControlPlane) }