...

Source file src/edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install/files.go

Documentation: edge-infra.dev/pkg/sds/etcd/operator/internal/reconcilers/install

     1  package install
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"path/filepath"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/containerd/containerd/namespaces"
    12  	"github.com/spf13/afero"
    13  	"gopkg.in/yaml.v3"
    14  	corev1 "k8s.io/api/core/v1"
    15  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    16  	"k8s.io/apimachinery/pkg/runtime"
    17  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    18  	kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme"
    19  	kubeadmapiv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
    20  	kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
    21  	etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
    22  	"k8s.io/kubernetes/cmd/kubeadm/app/util/config/strict"
    23  	etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
    24  	"sigs.k8s.io/controller-runtime/pkg/client"
    25  
    26  	"edge-infra.dev/pkg/lib/fog"
    27  	"edge-infra.dev/pkg/sds/etcd/operator/constants"
    28  	"edge-infra.dev/pkg/sds/etcd/operator/internal/tar"
    29  	"edge-infra.dev/pkg/sds/lib/containerd"
    30  	"edge-infra.dev/pkg/sds/lib/dbus/systemd"
    31  	"edge-infra.dev/pkg/sds/lib/etcd/server"
    32  	"edge-infra.dev/pkg/sds/lib/k8s/manifest"
    33  )
    34  
    35  // reconcileFiles extracts the required certificates from the secret and writes them to the
    36  // filesystem
    37  func (r *Reconciler) reconcileFiles(handlers *Handlers) error {
    38  	reader := tar.NewReader()
    39  	content := handlers.secret.Data["etcd-tar-gz"]
    40  	if err := reader.SetBytes(content); err != nil {
    41  		return fmt.Errorf("failed to set bytes on tar reader: %w", err)
    42  	}
    43  
    44  	files, err := reader.Extract()
    45  	if err != nil {
    46  		return fmt.Errorf("failed to extract tar archive: %w", err)
    47  	}
    48  	// write each file found in the Secret tar archive to the filesystem
    49  	for _, file := range files {
    50  		if err := r.writeFile(file); err != nil {
    51  			return fmt.Errorf("failed to write file (%s): %w", file.Name, err)
    52  		}
    53  	}
    54  
    55  	return nil
    56  }
    57  
    58  // writeFile writes the given file to the filesystem
    59  func (r *Reconciler) writeFile(file tar.File) error {
    60  	dir := filepath.Dir(file.Name)
    61  	if err := r.Fs.MkdirAll(dir, 0755); err != nil {
    62  		return fmt.Errorf("failed to create directory (%s): %w", dir, err)
    63  	}
    64  
    65  	return afero.WriteFile(r.Fs, file.Name, file.Bytes, file.Mode)
    66  }
    67  
    68  func (r *Reconciler) withFirewall(ctx context.Context, handlers *Handlers, fn func(context.Context, *Handlers) error) (retErr error) {
    69  	fs := afero.NewOsFs()
    70  	err := afero.WriteFile(fs, constants.OperatorFilewallFilepath, []byte(OperatorFilewall), 0644)
    71  	if err != nil {
    72  		return err
    73  	}
    74  	defer func() {
    75  		time.Sleep(20 * time.Second)
    76  		if err := fs.RemoveAll(constants.OperatorFilewallFilepath); err != nil {
    77  			if retErr == nil {
    78  				retErr = err
    79  				return
    80  			}
    81  			fog.FromContext(ctx).Error(err, "failed to remove etcd operator firewall rule")
    82  		}
    83  	}()
    84  	time.Sleep(5 * time.Second)
    85  	return fn(ctx, handlers)
    86  }
    87  
    88  // configureEtcd configures the etcd manifest using the kubeadm-config ConfigMap and
    89  // creates the manifest file on the filesystem
    90  func (r *Reconciler) configureEtcd(ctx context.Context, handlers *Handlers) error {
    91  	configMap := &corev1.ConfigMap{}
    92  	clusterConfKey := client.ObjectKey{Name: kubeadmconstants.KubeadmConfigConfigMap, Namespace: metav1.NamespaceSystem}
    93  	if err := r.KubeRetryClient.SafeGet(ctx, clusterConfKey, configMap); err != nil {
    94  		return fmt.Errorf("failed to retrieve kubeadm config map: %w", err)
    95  	}
    96  	manifestPath := filepath.Join(os.Getenv("STATIC_MANIFEST_PATH"), os.Getenv("ETCD_MANIFEST_FILENAME"))
    97  
    98  	client, err := containerd.NewClient(os.Getenv("CONTAINERD_SOCK_PATH"))
    99  	if err != nil {
   100  		return fmt.Errorf("failed to created containerd client: %w", err)
   101  	}
   102  	defer client.Close()
   103  	ctx = namespaces.WithNamespace(ctx, "k8s.io")
   104  
   105  	conn, err := systemd.NewConnection(ctx)
   106  	if err != nil {
   107  		return fmt.Errorf("failed to open systemd connection: %w", err)
   108  	}
   109  	defer conn.Close()
   110  
   111  	m := manifest.New(r.Fs, manifestPath, &corev1.Pod{}, 0)
   112  	// create the etcd manifest, while stopping the current etcd pod container
   113  	// if there is one and restarting the kubelet. This ensures the kubelet
   114  	// picks up the new manifest
   115  	return server.WithRestart(ctx, conn, client, func() error {
   116  		return m.WithCreate(func(obj runtime.Object) error {
   117  			pod, ok := obj.(*corev1.Pod)
   118  			if !ok {
   119  				return fmt.Errorf("current content of the manifest is not a valid pod")
   120  			}
   121  			return r.newEtcd(ctx, configMap, pod, handlers)
   122  		})
   123  	})
   124  }
   125  
   126  // newEtcd creates a new etcd pod manifest using the kubeadm-config ConfigMap and the
   127  // current etcd members
   128  func (r *Reconciler) newEtcd(ctx context.Context, configMap *corev1.ConfigMap, pod *corev1.Pod, handlers *Handlers) error {
   129  	initCfg, err := initConfiguration(configMap)
   130  	if err != nil {
   131  		return fmt.Errorf("failed to retrieve init configuration: %w", err)
   132  	}
   133  
   134  	members, err := r.initialCluster(ctx, handlers)
   135  	if err != nil {
   136  		return fmt.Errorf("failed to retrieve current etcd members: %w", err)
   137  	}
   138  
   139  	// replace the etcd image tag with the one found in container-versions.yaml
   140  	initCfg.ClusterConfiguration.Etcd.Local.ImageMeta.ImageTag, err = r.etcdImageTag()
   141  	if err != nil {
   142  		return err
   143  	}
   144  
   145  	endpoint := &kubeadmapi.APIEndpoint{AdvertiseAddress: handlers.member.Spec.Address.Host}
   146  	manifest := etcdphase.GetEtcdPodSpec(&initCfg.ClusterConfiguration, endpoint, handlers.member.Name, members)
   147  	manifest.DeepCopyInto(pod)
   148  	// if the EtcdMember was not already a member of the etcd cluster when
   149  	// reconciliation began, remove the old etcd data directory to ensure
   150  	// no old cluster state is persisted
   151  	if !handlers.member.IsMember {
   152  		if err := r.Fs.RemoveAll(filepath.Join(initCfg.ClusterConfiguration.Etcd.Local.DataDir, os.Getenv("ETCD_DATA_DIRNAME"))); err != nil {
   153  			return fmt.Errorf("failed to remove old etcd data directory: %w", err)
   154  		}
   155  	}
   156  	return nil
   157  }
   158  
   159  // etcdImageTag returns the image tag for the etcd image
   160  func (r *Reconciler) etcdImageTag() (string, error) {
   161  	containerVersions, err := afero.ReadFile(r.Fs, os.Getenv("CONTAINER_VERSIONS_PATH"))
   162  	if err != nil {
   163  		return "", fmt.Errorf("failed to read %s: %w", os.Getenv("CONTAINER_VERSIONS_PATH"), err)
   164  	}
   165  	var imageVersions containerImageVersions
   166  	if err := yaml.Unmarshal(containerVersions, &imageVersions); err != nil {
   167  		return "", fmt.Errorf("failed to parse container versions: %w", err)
   168  	}
   169  
   170  	etcdImage, ok := imageVersions.Containers["etcd"]
   171  	if !ok {
   172  		return "", fmt.Errorf("failed to find etcd image version in container versions: path checked: %s, found: %s", os.Getenv("CONTAINER_VERSIONS_PATH"), etcdImage)
   173  	}
   174  	// retrieve the tag only from the image version
   175  	imageParts := strings.Split(etcdImage, ":")
   176  	if len(imageParts) != 2 {
   177  		return "", fmt.Errorf("failed to parse etcd image version")
   178  	}
   179  	return imageParts[1], nil
   180  }
   181  
   182  // initConfiguration retrieves the kubeadm-config ConfigMap and converts it to an
   183  // InitConfiguration
   184  func initConfiguration(configMap *corev1.ConfigMap) (*kubeadmapi.InitConfiguration, error) {
   185  	versionedInitcfg := &kubeadmapiv1.InitConfiguration{}
   186  	// create default kubeadm InitConfiguration
   187  	kubeadmscheme.Scheme.Default(versionedInitcfg)
   188  	initCfg := &kubeadmapi.InitConfiguration{}
   189  	// convert the InitConfiguration to the internal type
   190  	if err := kubeadmscheme.Scheme.Convert(versionedInitcfg, initCfg, nil); err != nil {
   191  		return nil, fmt.Errorf("failed to convert InitConfiguration: %w", err)
   192  	}
   193  
   194  	clusterConfigurationData, ok := configMap.Data[kubeadmconstants.ClusterConfigurationConfigMapKey]
   195  	if !ok {
   196  		return nil, fmt.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", kubeadmconstants.ClusterConfigurationConfigMapKey)
   197  	}
   198  	// verify the ClusterConfiguration
   199  	if err := strict.VerifyUnmarshalStrict([]*runtime.Scheme{kubeadmscheme.Scheme},
   200  		kubeadmapiv1.SchemeGroupVersion.WithKind(kubeadmconstants.ClusterConfigurationKind),
   201  		[]byte(clusterConfigurationData)); err != nil {
   202  		return nil, fmt.Errorf("failed to verify ClusterConfiguration: %w", err)
   203  	}
   204  	// decode the ClusterConfiguration into the InitConfiguration
   205  	if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterConfigurationData), &initCfg.ClusterConfiguration); err != nil {
   206  		return nil, fmt.Errorf("failed to decode ClusterConfiguration: %w", err)
   207  	}
   208  	return initCfg, nil
   209  }
   210  
   211  // initialCluster retrieves the current etcd members and adds the current node to the
   212  // slice of members
   213  func (r *Reconciler) initialCluster(ctx context.Context, handlers *Handlers) ([]etcdutil.Member, error) {
   214  	resp, err := r.EtcdRetryClient.SafeMemberList(ctx)
   215  	if err != nil {
   216  		return nil, fmt.Errorf("failed to retrieve etcd members: %w", err)
   217  	}
   218  
   219  	var members []etcdutil.Member
   220  	// for each member in the etcd cluster
   221  	for _, member := range resp.Members {
   222  		// if the member name is not empty, and the member name does not match
   223  		// the current EtcdMember's name, add the member to the slice of members
   224  		if member.Name != "" && member.Name != handlers.member.Name {
   225  			members = append(members, etcdutil.Member{
   226  				Name:    member.Name,
   227  				PeerURL: member.PeerURLs[0],
   228  			})
   229  		}
   230  	}
   231  	// add the current EtcdMember to the member list
   232  	members = append(members, etcdutil.Member{
   233  		Name:    handlers.member.Name,
   234  		PeerURL: handlers.member.PeerURL(),
   235  	})
   236  	return members, nil
   237  }
   238  

View as plain text