1
16
17 package etcd
18
19 import (
20 "fmt"
21 "net"
22 "os"
23 "path/filepath"
24 "strconv"
25 "strings"
26 "time"
27
28 "github.com/pkg/errors"
29
30 v1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/resource"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/klog/v2"
34 utilsnet "k8s.io/utils/net"
35
36 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
37 kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
38 "k8s.io/kubernetes/cmd/kubeadm/app/features"
39 "k8s.io/kubernetes/cmd/kubeadm/app/images"
40 kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
41 etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
42 staticpodutil "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
43 "k8s.io/kubernetes/cmd/kubeadm/app/util/users"
44 )
45
46 const (
47 etcdVolumeName = "etcd-data"
48 certsVolumeName = "etcd-certs"
49 etcdHealthyCheckInterval = 5 * time.Second
50 etcdHealthyCheckRetries = 8
51 )
52
53
54
55
56 func CreateLocalEtcdStaticPodManifestFile(manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, isDryRun bool) error {
57 if cfg.Etcd.External != nil {
58 return errors.New("etcd static pod manifest cannot be generated for cluster using external etcd")
59 }
60
61 if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, []etcdutil.Member{}, isDryRun); err != nil {
62 return err
63 }
64
65 klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
66 return nil
67 }
68
69
70 func CheckLocalEtcdClusterStatus(client clientset.Interface, certificatesDir string) error {
71 klog.V(1).Info("[etcd] Checking etcd cluster health")
72
73
74 klog.V(1).Info("creating etcd client that connects to etcd pods")
75 etcdClient, err := etcdutil.NewFromCluster(client, certificatesDir)
76 if err != nil {
77 return err
78 }
79
80
81 err = etcdClient.CheckClusterHealth()
82 if err != nil {
83 return errors.Wrap(err, "etcd cluster is not healthy")
84 }
85
86 return nil
87 }
88
89
90
91 func RemoveStackedEtcdMemberFromCluster(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
92
93 klog.V(1).Info("[etcd] creating etcd client that connects to etcd pods")
94 etcdClient, err := etcdutil.NewFromCluster(client, cfg.CertificatesDir)
95 if err != nil {
96 return err
97 }
98
99 members, err := etcdClient.ListMembers()
100 if err != nil {
101 return err
102 }
103
104
105 if len(members) == 1 {
106 etcdClientAddress := etcdutil.GetClientURL(&cfg.LocalAPIEndpoint)
107 for _, endpoint := range etcdClient.Endpoints {
108 if endpoint == etcdClientAddress {
109 klog.V(1).Info("[etcd] This is the only remaining etcd member in the etcd cluster, skip removing it")
110 return nil
111 }
112 }
113 }
114
115
116 etcdPeerAddress := etcdutil.GetPeerURL(&cfg.LocalAPIEndpoint)
117
118 klog.V(2).Infof("[etcd] get the member id from peer: %s", etcdPeerAddress)
119 id, err := etcdClient.GetMemberID(etcdPeerAddress)
120 if err != nil {
121 if errors.Is(etcdutil.ErrNoMemberIDForPeerURL, err) {
122 klog.V(5).Infof("[etcd] member was already removed, because no member id exists for peer %s", etcdPeerAddress)
123 return nil
124 }
125 return err
126 }
127
128 klog.V(1).Infof("[etcd] removing etcd member: %s, id: %d", etcdPeerAddress, id)
129 members, err = etcdClient.RemoveMember(id)
130 if err != nil {
131 return err
132 }
133 klog.V(1).Infof("[etcd] Updated etcd member list: %v", members)
134
135 return nil
136 }
137
138
139
140
141 func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifestDir, patchesDir string, nodeName string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, isDryRun bool, certificatesDir string) error {
142
143 klog.V(1).Info("creating etcd client that connects to etcd pods")
144 etcdClient, err := etcdutil.NewFromCluster(client, certificatesDir)
145 if err != nil {
146 return err
147 }
148
149 etcdPeerAddress := etcdutil.GetPeerURL(endpoint)
150
151 var cluster []etcdutil.Member
152 if isDryRun {
153 fmt.Printf("[etcd] Would add etcd member: %s\n", etcdPeerAddress)
154 } else {
155 klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
156 if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
157 cluster, err = etcdClient.AddMemberAsLearner(nodeName, etcdPeerAddress)
158 } else {
159 cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
160 }
161 if err != nil {
162 return err
163 }
164 fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster")
165 klog.V(1).Infof("Updated etcd member list: %v", cluster)
166 }
167
168 fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd)
169
170 if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, cluster, isDryRun); err != nil {
171 return err
172 }
173
174 if isDryRun {
175 fmt.Println("[etcd] Would wait for the new etcd member to join the cluster")
176 return nil
177 }
178
179 if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
180 learnerID, err := etcdClient.GetMemberID(etcdPeerAddress)
181 if err != nil {
182 return err
183 }
184 err = etcdClient.MemberPromote(learnerID)
185 if err != nil {
186 return err
187 }
188 }
189
190 fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
191 if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
192 return err
193 }
194
195 return nil
196 }
197
198
199
200 func GetEtcdPodSpec(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member) v1.Pod {
201 pathType := v1.HostPathDirectoryOrCreate
202 etcdMounts := map[string]v1.Volume{
203 etcdVolumeName: staticpodutil.NewVolume(etcdVolumeName, cfg.Etcd.Local.DataDir, &pathType),
204 certsVolumeName: staticpodutil.NewVolume(certsVolumeName, cfg.CertificatesDir+"/etcd", &pathType),
205 }
206 componentHealthCheckTimeout := kubeadmapi.GetActiveTimeouts().ControlPlaneComponentHealthCheck
207
208
209 probeHostname, probePort, probeScheme := staticpodutil.GetEtcdProbeEndpoint(&cfg.Etcd, utilsnet.IsIPv6String(endpoint.AdvertiseAddress))
210 return staticpodutil.ComponentPod(
211 v1.Container{
212 Name: kubeadmconstants.Etcd,
213 Command: getEtcdCommand(cfg, endpoint, nodeName, initialCluster),
214 Image: images.GetEtcdImage(cfg),
215 ImagePullPolicy: v1.PullIfNotPresent,
216
217 VolumeMounts: []v1.VolumeMount{
218 staticpodutil.NewVolumeMount(etcdVolumeName, cfg.Etcd.Local.DataDir, false),
219 staticpodutil.NewVolumeMount(certsVolumeName, cfg.CertificatesDir+"/etcd", false),
220 },
221 Resources: v1.ResourceRequirements{
222 Requests: v1.ResourceList{
223 v1.ResourceCPU: resource.MustParse("100m"),
224 v1.ResourceMemory: resource.MustParse("100Mi"),
225 },
226 },
227 LivenessProbe: staticpodutil.LivenessProbe(probeHostname, "/health?exclude=NOSPACE&serializable=true", probePort, probeScheme),
228 StartupProbe: staticpodutil.StartupProbe(probeHostname, "/health?serializable=false", probePort, probeScheme, componentHealthCheckTimeout),
229 Env: kubeadmutil.MergeKubeadmEnvVars(cfg.Etcd.Local.ExtraEnvs),
230 },
231 etcdMounts,
232
233 map[string]string{kubeadmconstants.EtcdAdvertiseClientUrlsAnnotationKey: etcdutil.GetClientURL(endpoint)},
234 )
235 }
236
237
238 func getEtcdCommand(cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member) []string {
239
240 etcdLocalhostAddress := "127.0.0.1"
241 if utilsnet.IsIPv6String(endpoint.AdvertiseAddress) {
242 etcdLocalhostAddress = "::1"
243 }
244 defaultArguments := []kubeadmapi.Arg{
245 {Name: "name", Value: nodeName},
246
247
248 {Name: "experimental-initial-corrupt-check", Value: "true"},
249 {Name: "listen-client-urls", Value: fmt.Sprintf("%s,%s", etcdutil.GetClientURLByIP(etcdLocalhostAddress), etcdutil.GetClientURL(endpoint))},
250 {Name: "advertise-client-urls", Value: etcdutil.GetClientURL(endpoint)},
251 {Name: "listen-peer-urls", Value: etcdutil.GetPeerURL(endpoint)},
252 {Name: "initial-advertise-peer-urls", Value: etcdutil.GetPeerURL(endpoint)},
253 {Name: "data-dir", Value: cfg.Etcd.Local.DataDir},
254 {Name: "cert-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerCertName)},
255 {Name: "key-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdServerKeyName)},
256 {Name: "trusted-ca-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdCACertName)},
257 {Name: "client-cert-auth", Value: "true"},
258 {Name: "peer-cert-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdPeerCertName)},
259 {Name: "peer-key-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdPeerKeyName)},
260 {Name: "peer-trusted-ca-file", Value: filepath.Join(cfg.CertificatesDir, kubeadmconstants.EtcdCACertName)},
261 {Name: "peer-client-cert-auth", Value: "true"},
262 {Name: "snapshot-count", Value: "10000"},
263 {Name: "listen-metrics-urls", Value: fmt.Sprintf("http://%s", net.JoinHostPort(etcdLocalhostAddress, strconv.Itoa(kubeadmconstants.EtcdMetricsPort)))},
264 {Name: "experimental-watch-progress-notify-interval", Value: "5s"},
265 }
266
267 if len(initialCluster) == 0 {
268 defaultArguments = kubeadmapi.SetArgValues(defaultArguments, "initial-cluster", fmt.Sprintf("%s=%s", nodeName, etcdutil.GetPeerURL(endpoint)), 1)
269 } else {
270
271 endpoints := []string{}
272 for _, member := range initialCluster {
273 endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL))
274 }
275
276 defaultArguments = kubeadmapi.SetArgValues(defaultArguments, "initial-cluster", strings.Join(endpoints, ","), 1)
277 defaultArguments = kubeadmapi.SetArgValues(defaultArguments, "initial-cluster-state", "existing", 1)
278 }
279
280 command := []string{"etcd"}
281 command = append(command, kubeadmutil.ArgumentsToCommand(defaultArguments, cfg.Etcd.Local.ExtraArgs)...)
282 return command
283 }
284
285 func prepareAndWriteEtcdStaticPod(manifestDir string, patchesDir string, cfg *kubeadmapi.ClusterConfiguration, endpoint *kubeadmapi.APIEndpoint, nodeName string, initialCluster []etcdutil.Member, isDryRun bool) error {
286
287 spec := GetEtcdPodSpec(cfg, endpoint, nodeName, initialCluster)
288
289 var usersAndGroups *users.UsersAndGroups
290 var err error
291 if features.Enabled(cfg.FeatureGates, features.RootlessControlPlane) {
292 if isDryRun {
293 fmt.Printf("[etcd] Would create users and groups for %q to run as non-root\n", kubeadmconstants.Etcd)
294 fmt.Printf("[etcd] Would update static pod manifest for %q to run run as non-root\n", kubeadmconstants.Etcd)
295 } else {
296 usersAndGroups, err = staticpodutil.GetUsersAndGroups()
297 if err != nil {
298 return errors.Wrap(err, "failed to create users and groups")
299 }
300
301 if usersAndGroups != nil {
302 if err := staticpodutil.RunComponentAsNonRoot(kubeadmconstants.Etcd, &spec, usersAndGroups, cfg); err != nil {
303 return errors.Wrapf(err, "failed to run component %q as non-root", kubeadmconstants.Etcd)
304 }
305 }
306 }
307 }
308
309
310 if patchesDir != "" {
311 patchedSpec, err := staticpodutil.PatchStaticPod(&spec, patchesDir, os.Stdout)
312 if err != nil {
313 return errors.Wrapf(err, "failed to patch static Pod manifest file for %q", kubeadmconstants.Etcd)
314 }
315 spec = *patchedSpec
316 }
317
318
319 if err := staticpodutil.WriteStaticPodToDisk(kubeadmconstants.Etcd, manifestDir, spec); err != nil {
320 return err
321 }
322
323 return nil
324 }
325
View as plain text