1 package install
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "path/filepath"
8 "strings"
9 "time"
10
11 "github.com/containerd/containerd/namespaces"
12 "github.com/spf13/afero"
13 "gopkg.in/yaml.v3"
14 corev1 "k8s.io/api/core/v1"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/apimachinery/pkg/runtime"
17 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
18 kubeadmscheme "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/scheme"
19 kubeadmapiv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1beta3"
20 kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
21 etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
22 "k8s.io/kubernetes/cmd/kubeadm/app/util/config/strict"
23 etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
24 "sigs.k8s.io/controller-runtime/pkg/client"
25
26 "edge-infra.dev/pkg/lib/fog"
27 "edge-infra.dev/pkg/sds/etcd/operator/constants"
28 "edge-infra.dev/pkg/sds/etcd/operator/internal/tar"
29 "edge-infra.dev/pkg/sds/lib/containerd"
30 "edge-infra.dev/pkg/sds/lib/dbus/systemd"
31 "edge-infra.dev/pkg/sds/lib/etcd/server"
32 "edge-infra.dev/pkg/sds/lib/k8s/manifest"
33 )
34
35
36
37 func (r *Reconciler) reconcileFiles(handlers *Handlers) error {
38 reader := tar.NewReader()
39 content := handlers.secret.Data["etcd-tar-gz"]
40 if err := reader.SetBytes(content); err != nil {
41 return fmt.Errorf("failed to set bytes on tar reader: %w", err)
42 }
43
44 files, err := reader.Extract()
45 if err != nil {
46 return fmt.Errorf("failed to extract tar archive: %w", err)
47 }
48
49 for _, file := range files {
50 if err := r.writeFile(file); err != nil {
51 return fmt.Errorf("failed to write file (%s): %w", file.Name, err)
52 }
53 }
54
55 return nil
56 }
57
58
59 func (r *Reconciler) writeFile(file tar.File) error {
60 dir := filepath.Dir(file.Name)
61 if err := r.Fs.MkdirAll(dir, 0755); err != nil {
62 return fmt.Errorf("failed to create directory (%s): %w", dir, err)
63 }
64
65 return afero.WriteFile(r.Fs, file.Name, file.Bytes, file.Mode)
66 }
67
68 func (r *Reconciler) withFirewall(ctx context.Context, handlers *Handlers, fn func(context.Context, *Handlers) error) (retErr error) {
69 fs := afero.NewOsFs()
70 err := afero.WriteFile(fs, constants.OperatorFilewallFilepath, []byte(OperatorFilewall), 0644)
71 if err != nil {
72 return err
73 }
74 defer func() {
75 time.Sleep(20 * time.Second)
76 if err := fs.RemoveAll(constants.OperatorFilewallFilepath); err != nil {
77 if retErr == nil {
78 retErr = err
79 return
80 }
81 fog.FromContext(ctx).Error(err, "failed to remove etcd operator firewall rule")
82 }
83 }()
84 time.Sleep(5 * time.Second)
85 return fn(ctx, handlers)
86 }
87
88
89
90 func (r *Reconciler) configureEtcd(ctx context.Context, handlers *Handlers) error {
91 configMap := &corev1.ConfigMap{}
92 clusterConfKey := client.ObjectKey{Name: kubeadmconstants.KubeadmConfigConfigMap, Namespace: metav1.NamespaceSystem}
93 if err := r.KubeRetryClient.SafeGet(ctx, clusterConfKey, configMap); err != nil {
94 return fmt.Errorf("failed to retrieve kubeadm config map: %w", err)
95 }
96 manifestPath := filepath.Join(os.Getenv("STATIC_MANIFEST_PATH"), os.Getenv("ETCD_MANIFEST_FILENAME"))
97
98 client, err := containerd.NewClient(os.Getenv("CONTAINERD_SOCK_PATH"))
99 if err != nil {
100 return fmt.Errorf("failed to created containerd client: %w", err)
101 }
102 defer client.Close()
103 ctx = namespaces.WithNamespace(ctx, "k8s.io")
104
105 conn, err := systemd.NewConnection(ctx)
106 if err != nil {
107 return fmt.Errorf("failed to open systemd connection: %w", err)
108 }
109 defer conn.Close()
110
111 m := manifest.New(r.Fs, manifestPath, &corev1.Pod{}, 0)
112
113
114
115 return server.WithRestart(ctx, conn, client, func() error {
116 return m.WithCreate(func(obj runtime.Object) error {
117 pod, ok := obj.(*corev1.Pod)
118 if !ok {
119 return fmt.Errorf("current content of the manifest is not a valid pod")
120 }
121 return r.newEtcd(ctx, configMap, pod, handlers)
122 })
123 })
124 }
125
126
127
128 func (r *Reconciler) newEtcd(ctx context.Context, configMap *corev1.ConfigMap, pod *corev1.Pod, handlers *Handlers) error {
129 initCfg, err := initConfiguration(configMap)
130 if err != nil {
131 return fmt.Errorf("failed to retrieve init configuration: %w", err)
132 }
133
134 members, err := r.initialCluster(ctx, handlers)
135 if err != nil {
136 return fmt.Errorf("failed to retrieve current etcd members: %w", err)
137 }
138
139
140 initCfg.ClusterConfiguration.Etcd.Local.ImageMeta.ImageTag, err = r.etcdImageTag()
141 if err != nil {
142 return err
143 }
144
145 endpoint := &kubeadmapi.APIEndpoint{AdvertiseAddress: handlers.member.Spec.Address.Host}
146 manifest := etcdphase.GetEtcdPodSpec(&initCfg.ClusterConfiguration, endpoint, handlers.member.Name, members)
147 manifest.DeepCopyInto(pod)
148
149
150
151 if !handlers.member.IsMember {
152 if err := r.Fs.RemoveAll(filepath.Join(initCfg.ClusterConfiguration.Etcd.Local.DataDir, os.Getenv("ETCD_DATA_DIRNAME"))); err != nil {
153 return fmt.Errorf("failed to remove old etcd data directory: %w", err)
154 }
155 }
156 return nil
157 }
158
159
160 func (r *Reconciler) etcdImageTag() (string, error) {
161 containerVersions, err := afero.ReadFile(r.Fs, os.Getenv("CONTAINER_VERSIONS_PATH"))
162 if err != nil {
163 return "", fmt.Errorf("failed to read %s: %w", os.Getenv("CONTAINER_VERSIONS_PATH"), err)
164 }
165 var imageVersions containerImageVersions
166 if err := yaml.Unmarshal(containerVersions, &imageVersions); err != nil {
167 return "", fmt.Errorf("failed to parse container versions: %w", err)
168 }
169
170 etcdImage, ok := imageVersions.Containers["etcd"]
171 if !ok {
172 return "", fmt.Errorf("failed to find etcd image version in container versions: path checked: %s, found: %s", os.Getenv("CONTAINER_VERSIONS_PATH"), etcdImage)
173 }
174
175 imageParts := strings.Split(etcdImage, ":")
176 if len(imageParts) != 2 {
177 return "", fmt.Errorf("failed to parse etcd image version")
178 }
179 return imageParts[1], nil
180 }
181
182
183
184 func initConfiguration(configMap *corev1.ConfigMap) (*kubeadmapi.InitConfiguration, error) {
185 versionedInitcfg := &kubeadmapiv1.InitConfiguration{}
186
187 kubeadmscheme.Scheme.Default(versionedInitcfg)
188 initCfg := &kubeadmapi.InitConfiguration{}
189
190 if err := kubeadmscheme.Scheme.Convert(versionedInitcfg, initCfg, nil); err != nil {
191 return nil, fmt.Errorf("failed to convert InitConfiguration: %w", err)
192 }
193
194 clusterConfigurationData, ok := configMap.Data[kubeadmconstants.ClusterConfigurationConfigMapKey]
195 if !ok {
196 return nil, fmt.Errorf("unexpected error when reading kubeadm-config ConfigMap: %s key value pair missing", kubeadmconstants.ClusterConfigurationConfigMapKey)
197 }
198
199 if err := strict.VerifyUnmarshalStrict([]*runtime.Scheme{kubeadmscheme.Scheme},
200 kubeadmapiv1.SchemeGroupVersion.WithKind(kubeadmconstants.ClusterConfigurationKind),
201 []byte(clusterConfigurationData)); err != nil {
202 return nil, fmt.Errorf("failed to verify ClusterConfiguration: %w", err)
203 }
204
205 if err := runtime.DecodeInto(kubeadmscheme.Codecs.UniversalDecoder(), []byte(clusterConfigurationData), &initCfg.ClusterConfiguration); err != nil {
206 return nil, fmt.Errorf("failed to decode ClusterConfiguration: %w", err)
207 }
208 return initCfg, nil
209 }
210
211
212
213 func (r *Reconciler) initialCluster(ctx context.Context, handlers *Handlers) ([]etcdutil.Member, error) {
214 resp, err := r.EtcdRetryClient.SafeMemberList(ctx)
215 if err != nil {
216 return nil, fmt.Errorf("failed to retrieve etcd members: %w", err)
217 }
218
219 var members []etcdutil.Member
220
221 for _, member := range resp.Members {
222
223
224 if member.Name != "" && member.Name != handlers.member.Name {
225 members = append(members, etcdutil.Member{
226 Name: member.Name,
227 PeerURL: member.PeerURLs[0],
228 })
229 }
230 }
231
232 members = append(members, etcdutil.Member{
233 Name: handlers.member.Name,
234 PeerURL: handlers.member.PeerURL(),
235 })
236 return members, nil
237 }
238
View as plain text