package install import ( "context" "fmt" "os" "path/filepath" "strings" "time" "github.com/containerd/containerd/namespaces" "github.com/spf13/afero" "gopkg.in/yaml.v3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme" kubeadmapiv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3" kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd" "k8s.io/kubernetes/cmd/kubeadm/app/util/config/strict" etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/etcd/operator/constants" "edge-infra.dev/pkg/sds/etcd/operator/internal/tar" "edge-infra.dev/pkg/sds/lib/containerd" "edge-infra.dev/pkg/sds/lib/dbus/systemd" "edge-infra.dev/pkg/sds/lib/etcd/server" "edge-infra.dev/pkg/sds/lib/k8s/manifest" ) // reconcileFiles extracts the required certificates from the secret and writes them to the // filesystem func (r *Reconciler) reconcileFiles(handlers *Handlers) error { reader := tar.NewReader() content := handlers.secret.Data["etcd-tar-gz"] if err := reader.SetBytes(content); err != nil { return fmt.Errorf("failed to set bytes on tar reader: %w", err) } files, err := reader.Extract() if err != nil { return fmt.Errorf("failed to extract tar archive: %w", err) } // write each file found in the Secret tar archive to the filesystem for _, file := range files { if err := r.writeFile(file); err != nil { return fmt.Errorf("failed to write file (%s): %w", file.Name, err) } } return nil } // writeFile writes the given file to the filesystem func (r *Reconciler) writeFile(file tar.File) error { dir := filepath.Dir(file.Name) if err := r.Fs.MkdirAll(dir, 0755); err != nil { return fmt.Errorf("failed to create directory (%s): %w", dir, err) } return afero.WriteFile(r.Fs, file.Name, file.Bytes, file.Mode) } func (r *Reconciler) withFirewall(ctx context.Context, handlers *Handlers, fn func(context.Context, *Handlers) error) (retErr error) { fs := afero.NewOsFs() err := afero.WriteFile(fs, constants.OperatorFilewallFilepath, []byte(OperatorFilewall), 0644) if err != nil { return err } defer func() { time.Sleep(20 * time.Second) if err := fs.RemoveAll(constants.OperatorFilewallFilepath); err != nil { if retErr == nil { retErr = err return } fog.FromContext(ctx).Error(err, "failed to remove etcd operator firewall rule") } }() time.Sleep(5 * time.Second) return fn(ctx, handlers) } // configureEtcd configures the etcd manifest using the kubeadm-config ConfigMap and // creates the manifest file on the filesystem func (r *Reconciler) configureEtcd(ctx context.Context, handlers *Handlers) error { configMap := &corev1.ConfigMap{} clusterConfKey := client.ObjectKey{Name: kubeadmconstants.KubeadmConfigConfigMap, Namespace: metav1.NamespaceSystem} if err := r.KubeRetryClient.SafeGet(ctx, clusterConfKey, configMap); err != nil { return fmt.Errorf("failed to retrieve kubeadm config map: %w", err) } manifestPath := filepath.Join(os.Getenv("STATIC_MANIFEST_PATH"), os.Getenv("ETCD_MANIFEST_FILENAME")) client, err := containerd.NewClient(os.Getenv("CONTAINERD_SOCK_PATH")) if err != nil { return fmt.Errorf("failed to created containerd client: %w", err) } defer client.Close() ctx = namespaces.WithNamespace(ctx, "k8s.io") conn, err := systemd.NewConnection(ctx) if err != nil { return fmt.Errorf("failed to open systemd connection: %w", err) } defer conn.Close() m := manifest.New(r.Fs, manifestPath, &corev1.Pod{}, 0) // create the etcd manifest, while stopping the current etcd pod container // if there is one and restarting the kubelet. This ensures the kubelet // picks up the new manifest return server.WithRestart(ctx, conn, client, func() error { return m.WithCreate(func(obj runtime.Object) error { pod, ok := obj.(*corev1.Pod) if !ok { return fmt.Errorf("current content of the manifest is not a valid pod") } return r.newEtcd(ctx, configMap, pod, handlers) }) }) } // newEtcd creates a new etcd pod manifest using the kubeadm-config ConfigMap and the // current etcd members func (r *Reconciler) newEtcd(ctx context.Context, configMap *corev1.ConfigMap, pod *corev1.Pod, handlers *Handlers) error { initCfg, err := initConfiguration(configMap) if err != nil { return fmt.Errorf("failed to retrieve init configuration: %w", err) } members, err := r.initialCluster(ctx, handlers) if err != nil { return fmt.Errorf("failed to retrieve current etcd members: %w", err) } // replace the etcd image tag with the one found in container-versions.yaml initCfg.ClusterConfiguration.Etcd.Local.ImageMeta.ImageTag, err = r.etcdImageTag() if err != nil { return err } endpoint := &kubeadmapi.APIEndpoint{AdvertiseAddress: handlers.member.Spec.Address.Host} manifest := etcdphase.GetEtcdPodSpec(&initCfg.ClusterConfiguration, endpoint, handlers.member.Name, members) manifest.DeepCopyInto(pod) // if the EtcdMember was not already a member of the etcd cluster when // reconciliation began, remove the old etcd data directory to ensure // no old cluster state is persisted if !handlers.member.IsMember { if err := r.Fs.RemoveAll(filepath.Join(initCfg.ClusterConfiguration.Etcd.Local.DataDir, os.Getenv("ETCD_DATA_DIRNAME"))); err != nil { return fmt.Errorf("failed to remove old etcd data directory: %w", err) } } return nil } // etcdImageTag returns the image tag for the etcd image func (r *Reconciler) etcdImageTag() (string, error) { containerVersions, err := afero.ReadFile(r.Fs, os.Getenv("CONTAINER_VERSIONS_PATH")) if err != nil { return "", fmt.Errorf("failed to read %s: %w", os.Getenv("CONTAINER_VERSIONS_PATH"), err) } var imageVersions containerImageVersions if err := yaml.Unmarshal(containerVersions, &imageVersions); err != nil { return "", fmt.Errorf("failed to parse container versions: %w", err) } etcdImage, ok := imageVersions.Containers["etcd"] if !ok { return "", fmt.Errorf("failed to find etcd image version in container versions: path checked: %s, found: %s", os.Getenv("CONTAINER_VERSIONS_PATH"), etcdImage) } // retrieve the tag only from the image version imageParts := strings.Split(etcdImage, ":") if len(imageParts) != 2 { return "", fmt.Errorf("failed to parse etcd image version") } return imageParts[1], nil } // initConfiguration retrieves the kubeadm-config ConfigMap and converts it to an // InitConfiguration func initConfiguration(configMap *corev1.ConfigMap) (*kubeadmapi.InitConfiguration, error) { versionedInitcfg := &kubeadmapiv1.InitConfiguration{} // create default kubeadm InitConfiguration kubeadmscheme.Scheme.Default(versionedInitcfg) initCfg := &kubeadmapi.InitConfiguration{} // convert the InitConfiguration to the internal type if err := kubeadmscheme.Scheme.Convert(versionedInitcfg, initCfg, nil); err != nil { return nil, fmt.Errorf("failed to convert InitConfiguration: %w", err) } clusterConfigurationData, ok := configMap.Data[kubeadmconstants.ClusterConfigurationConfigMapKey] if !ok { return nil, fmt.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", kubeadmconstants.ClusterConfigurationConfigMapKey) } // verify the ClusterConfiguration if err := strict.VerifyUnmarshalStrict([]*runtime.Scheme{kubeadmscheme.Scheme}, kubeadmapiv1.SchemeGroupVersion.WithKind(kubeadmconstants.ClusterConfigurationKind), []byte(clusterConfigurationData)); err != nil { return nil, fmt.Errorf("failed to verify ClusterConfiguration: %w", err) } // decode the ClusterConfiguration into the InitConfiguration if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterConfigurationData), &initCfg.ClusterConfiguration); err != nil { return nil, fmt.Errorf("failed to decode ClusterConfiguration: %w", err) } return initCfg, nil } // initialCluster retrieves the current etcd members and adds the current node to the // slice of members func (r *Reconciler) initialCluster(ctx context.Context, handlers *Handlers) ([]etcdutil.Member, error) { resp, err := r.EtcdRetryClient.SafeMemberList(ctx) if err != nil { return nil, fmt.Errorf("failed to retrieve etcd members: %w", err) } var members []etcdutil.Member // for each member in the etcd cluster for _, member := range resp.Members { // if the member name is not empty, and the member name does not match // the current EtcdMember's name, add the member to the slice of members if member.Name != "" && member.Name != handlers.member.Name { members = append(members, etcdutil.Member{ Name: member.Name, PeerURL: member.PeerURLs[0], }) } } // add the current EtcdMember to the member list members = append(members, etcdutil.Member{ Name: handlers.member.Name, PeerURL: handlers.member.PeerURL(), }) return members, nil }