1
16
17 package etcd
18
19 import (
20 "context"
21 "crypto/tls"
22 "fmt"
23 "net"
24 "net/url"
25 "path/filepath"
26 "strconv"
27 "strings"
28 "time"
29
30 "github.com/pkg/errors"
31 "go.etcd.io/etcd/api/v3/etcdserverpb"
32 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
33 "go.etcd.io/etcd/client/pkg/v3/transport"
34 clientv3 "go.etcd.io/etcd/client/v3"
35 "google.golang.org/grpc"
36
37 corev1 "k8s.io/api/core/v1"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/util/wait"
40 clientset "k8s.io/client-go/kubernetes"
41 "k8s.io/klog/v2"
42
43 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
44 "k8s.io/kubernetes/cmd/kubeadm/app/constants"
45 )
46
47 const etcdTimeout = 2 * time.Second
48
49
50
51 var ErrNoMemberIDForPeerURL = errors.New("no member id found for peer URL")
52
53
54 type ClusterInterrogator interface {
55 CheckClusterHealth() error
56 WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
57 Sync() error
58 ListMembers() ([]Member, error)
59 AddMember(name string, peerAddrs string) ([]Member, error)
60 AddMemberAsLearner(name string, peerAddrs string) ([]Member, error)
61 MemberPromote(learnerID uint64) error
62 GetMemberID(peerURL string) (uint64, error)
63 RemoveMember(id uint64) ([]Member, error)
64 }
65
66 type etcdClient interface {
67
68 Close() error
69
70
71 Endpoints() []string
72
73
74 MemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
75
76
77 MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
78
79
80 MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
81
82
83 MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
84
85
86 MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)
87
88
89 Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
90
91
92 Sync(ctx context.Context) error
93 }
94
95
96 type Client struct {
97 Endpoints []string
98
99 newEtcdClient func(endpoints []string) (etcdClient, error)
100
101 listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
102 }
103
104
105 func New(endpoints []string, ca, cert, key string) (*Client, error) {
106 client := Client{Endpoints: endpoints}
107
108 var err error
109 var tlsConfig *tls.Config
110 if ca != "" || cert != "" || key != "" {
111 tlsInfo := transport.TLSInfo{
112 CertFile: cert,
113 KeyFile: key,
114 TrustedCAFile: ca,
115 }
116 tlsConfig, err = tlsInfo.ClientConfig()
117 if err != nil {
118 return nil, err
119 }
120 }
121
122 client.newEtcdClient = func(endpoints []string) (etcdClient, error) {
123 return clientv3.New(clientv3.Config{
124 Endpoints: endpoints,
125 DialTimeout: etcdTimeout,
126 DialOptions: []grpc.DialOption{
127 grpc.WithBlock(),
128 },
129 TLS: tlsConfig,
130 })
131 }
132
133 client.listMembersFunc = client.listMembers
134
135 return &client, nil
136 }
137
138
139
140
141 func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
142
143
144
145 endpoints, err := getEtcdEndpoints(client)
146 if err != nil {
147 return nil, err
148 }
149 klog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ","))
150
151
152 etcdClient, err := New(
153 endpoints,
154 filepath.Join(certificatesDir, constants.EtcdCACertName),
155 filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
156 filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
157 )
158 if err != nil {
159 return nil, errors.Wrapf(err, "error creating etcd client for %v endpoints", endpoints)
160 }
161
162
163 err = etcdClient.Sync()
164 if err != nil {
165 return nil, errors.Wrap(err, "error syncing endpoints with etcd")
166 }
167 klog.V(1).Infof("update etcd endpoints: %s", strings.Join(etcdClient.Endpoints, ","))
168
169 return etcdClient, nil
170 }
171
172
173 func getEtcdEndpoints(client clientset.Interface) ([]string, error) {
174 return getEtcdEndpointsWithRetry(client,
175 constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration)
176 }
177
178 func getEtcdEndpointsWithRetry(client clientset.Interface, interval, timeout time.Duration) ([]string, error) {
179 return getRawEtcdEndpointsFromPodAnnotation(client, interval, timeout)
180 }
181
182
183 func getRawEtcdEndpointsFromPodAnnotation(client clientset.Interface, interval, timeout time.Duration) ([]string, error) {
184 etcdEndpoints := []string{}
185 var lastErr error
186
187
188 err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true,
189 func(_ context.Context) (bool, error) {
190 var overallEtcdPodCount int
191 if etcdEndpoints, overallEtcdPodCount, lastErr = getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client); lastErr != nil {
192 return false, nil
193 }
194 if len(etcdEndpoints) == 0 || overallEtcdPodCount != len(etcdEndpoints) {
195 klog.V(4).Infof("found a total of %d etcd pods and the following endpoints: %v; retrying",
196 overallEtcdPodCount, etcdEndpoints)
197 return false, nil
198 }
199 return true, nil
200 })
201 if err != nil {
202 const message = "could not retrieve the list of etcd endpoints"
203 if lastErr != nil {
204 return []string{}, errors.Wrap(lastErr, message)
205 }
206 return []string{}, errors.Wrap(err, message)
207 }
208 return etcdEndpoints, nil
209 }
210
211
212
213
214 func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface) ([]string, int, error) {
215 klog.V(3).Infof("retrieving etcd endpoints from %q annotation in etcd Pods", constants.EtcdAdvertiseClientUrlsAnnotationKey)
216 podList, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(
217 context.TODO(),
218 metav1.ListOptions{
219 LabelSelector: fmt.Sprintf("component=%s,tier=%s", constants.Etcd, constants.ControlPlaneTier),
220 },
221 )
222 if err != nil {
223 return []string{}, 0, err
224 }
225 etcdEndpoints := []string{}
226 for _, pod := range podList.Items {
227 podIsReady := false
228 for _, c := range pod.Status.Conditions {
229 if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
230 podIsReady = true
231 break
232 }
233 }
234 if !podIsReady {
235 klog.V(3).Infof("etcd pod %q is not ready", pod.ObjectMeta.Name)
236 }
237 etcdEndpoint, ok := pod.ObjectMeta.Annotations[constants.EtcdAdvertiseClientUrlsAnnotationKey]
238 if !ok {
239 klog.V(3).Infof("etcd Pod %q is missing the %q annotation; cannot infer etcd advertise client URL using the Pod annotation", pod.ObjectMeta.Name, constants.EtcdAdvertiseClientUrlsAnnotationKey)
240 continue
241 }
242 etcdEndpoints = append(etcdEndpoints, etcdEndpoint)
243 }
244 return etcdEndpoints, len(podList.Items), nil
245 }
246
247
248 func (c *Client) Sync() error {
249
250 var cli etcdClient
251 var lastError error
252 err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
253 true, func(_ context.Context) (bool, error) {
254 var err error
255 cli, err = c.newEtcdClient(c.Endpoints)
256 if err != nil {
257 lastError = err
258 return false, nil
259 }
260 defer func() { _ = cli.Close() }()
261 ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
262 err = cli.Sync(ctx)
263 cancel()
264 if err == nil {
265 return true, nil
266 }
267 klog.V(5).Infof("Failed to sync etcd endpoints: %v", err)
268 lastError = err
269 return false, nil
270 })
271 if err != nil {
272 return lastError
273 }
274 klog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
275
276 c.Endpoints = cli.Endpoints()
277 return nil
278 }
279
280
281
282 type Member struct {
283 Name string
284 PeerURL string
285 }
286
287 func (c *Client) listMembers(timeout time.Duration) (*clientv3.MemberListResponse, error) {
288
289 var lastError error
290 var resp *clientv3.MemberListResponse
291 if timeout == 0 {
292 timeout = constants.EtcdAPICallTimeout
293 }
294 err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, timeout,
295 true, func(_ context.Context) (bool, error) {
296 cli, err := c.newEtcdClient(c.Endpoints)
297 if err != nil {
298 lastError = err
299 return false, nil
300 }
301 defer func() { _ = cli.Close() }()
302
303 ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
304 resp, err = cli.MemberList(ctx)
305 cancel()
306 if err == nil {
307 return true, nil
308 }
309 klog.V(5).Infof("Failed to get etcd member list: %v", err)
310 lastError = err
311 return false, nil
312 })
313 if err != nil {
314 return nil, lastError
315 }
316 return resp, nil
317 }
318
319
320 func (c *Client) GetMemberID(peerURL string) (uint64, error) {
321 resp, err := c.listMembersFunc(0)
322 if err != nil {
323 return 0, err
324 }
325
326 for _, member := range resp.Members {
327 if member.GetPeerURLs()[0] == peerURL {
328 return member.GetID(), nil
329 }
330 }
331 return 0, ErrNoMemberIDForPeerURL
332 }
333
334
335 func (c *Client) ListMembers() ([]Member, error) {
336 resp, err := c.listMembersFunc(0)
337 if err != nil {
338 return nil, err
339 }
340
341 ret := make([]Member, 0, len(resp.Members))
342 for _, m := range resp.Members {
343 ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
344 }
345 return ret, nil
346 }
347
348
349 func (c *Client) RemoveMember(id uint64) ([]Member, error) {
350
351 var lastError error
352 var resp *clientv3.MemberRemoveResponse
353 err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
354 true, func(_ context.Context) (bool, error) {
355 cli, err := c.newEtcdClient(c.Endpoints)
356 if err != nil {
357 lastError = err
358 return false, nil
359 }
360 defer func() { _ = cli.Close() }()
361
362 ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
363 resp, err = cli.MemberRemove(ctx, id)
364 cancel()
365 if err == nil {
366 return true, nil
367 }
368 if errors.Is(rpctypes.ErrMemberNotFound, err) {
369 klog.V(5).Infof("Member was already removed, because member %s was not found", strconv.FormatUint(id, 16))
370 return true, nil
371 }
372 klog.V(5).Infof("Failed to remove etcd member: %v", err)
373 lastError = err
374 return false, nil
375 })
376 if err != nil {
377 return nil, lastError
378 }
379
380
381 ret := []Member{}
382 if resp != nil {
383 for _, m := range resp.Members {
384 ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
385 }
386
387 }
388
389 return ret, nil
390 }
391
392
393 func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
394 return c.addMember(name, peerAddrs, false)
395 }
396
397
398 func (c *Client) AddMemberAsLearner(name string, peerAddrs string) ([]Member, error) {
399 return c.addMember(name, peerAddrs, true)
400 }
401
402
403
404
405 func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Member, error) {
406
407
408
409 parsedPeerAddrs, err := url.Parse(peerAddrs)
410 if err != nil {
411 return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
412 }
413
414 cli, err := c.newEtcdClient(c.Endpoints)
415 if err != nil {
416 return nil, err
417 }
418 defer func() { _ = cli.Close() }()
419
420
421 var (
422 lastError error
423 respMembers []*etcdserverpb.Member
424 learnerID uint64
425 resp *clientv3.MemberAddResponse
426 )
427 err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
428 true, func(_ context.Context) (bool, error) {
429 ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
430 defer cancel()
431 if isLearner {
432
433 if learnerID == 0 {
434 klog.V(1).Info("[etcd] Adding etcd member as learner")
435 resp, err = cli.MemberAddAsLearner(ctx, []string{peerAddrs})
436 if err != nil {
437 lastError = err
438 return false, nil
439 }
440 learnerID = resp.Member.ID
441 }
442 respMembers = resp.Members
443 return true, nil
444 }
445
446 resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
447 if err == nil {
448 respMembers = resp.Members
449 return true, nil
450 }
451
452
453
454 if errors.Is(err, rpctypes.ErrPeerURLExist) {
455 klog.V(5).Info("The peer URL for the added etcd member already exists. Fetching the existing etcd members")
456 var listResp *clientv3.MemberListResponse
457 listResp, err = cli.MemberList(ctx)
458 if err == nil {
459 respMembers = listResp.Members
460 return true, nil
461 }
462 }
463
464 klog.V(5).Infof("Failed to add etcd member: %v", err)
465 lastError = err
466 return false, nil
467 })
468 if err != nil {
469 return nil, lastError
470 }
471
472
473 ret := []Member{}
474 for _, m := range respMembers {
475
476
477 if peerAddrs == m.PeerURLs[0] {
478 ret = append(ret, Member{Name: name, PeerURL: peerAddrs})
479 continue
480 }
481
482 memberName := m.Name
483
484
485 if len(memberName) == 0 {
486 memberName = strconv.FormatUint(m.ID, 16)
487 }
488 ret = append(ret, Member{Name: memberName, PeerURL: m.PeerURLs[0]})
489 }
490
491
492 c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
493
494 return ret, nil
495 }
496
497
498 func (c *Client) isLearner(memberID uint64) (bool, error) {
499 resp, err := c.listMembersFunc(0)
500 if err != nil {
501 return false, err
502 }
503
504 for _, member := range resp.Members {
505 if member.ID == memberID && member.IsLearner {
506 return true, nil
507 }
508 }
509 return false, nil
510 }
511
512
513
514 func (c *Client) MemberPromote(learnerID uint64) error {
515 isLearner, err := c.isLearner(learnerID)
516 if err != nil {
517 return err
518 }
519 if !isLearner {
520 klog.V(1).Infof("[etcd] Member %s already promoted.", strconv.FormatUint(learnerID, 16))
521 return nil
522 }
523
524 klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %s", strconv.FormatUint(learnerID, 16))
525 cli, err := c.newEtcdClient(c.Endpoints)
526 if err != nil {
527 return err
528 }
529 defer func() { _ = cli.Close() }()
530
531
532
533
534
535
536
537 var (
538 lastError error
539 )
540 err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
541 true, func(_ context.Context) (bool, error) {
542 ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
543 defer cancel()
544
545 _, err = cli.MemberPromote(ctx, learnerID)
546 if err == nil {
547 klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", strconv.FormatUint(learnerID, 16))
548 return true, nil
549 }
550 klog.V(5).Infof("[etcd] Promoting the learner %s failed: %v", strconv.FormatUint(learnerID, 16), err)
551 lastError = err
552 return false, nil
553 })
554 if err != nil {
555 return lastError
556 }
557 return nil
558 }
559
560
561 func (c *Client) CheckClusterHealth() error {
562 _, err := c.getClusterStatus()
563 return err
564 }
565
566
567 func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
568 clusterStatus := make(map[string]*clientv3.StatusResponse)
569 for _, ep := range c.Endpoints {
570
571 var lastError error
572 var resp *clientv3.StatusResponse
573 err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
574 true, func(_ context.Context) (bool, error) {
575 cli, err := c.newEtcdClient(c.Endpoints)
576 if err != nil {
577 lastError = err
578 return false, nil
579 }
580 defer func() { _ = cli.Close() }()
581
582 ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
583 resp, err = cli.Status(ctx, ep)
584 cancel()
585 if err == nil {
586 return true, nil
587 }
588 klog.V(5).Infof("Failed to get etcd status for %s: %v", ep, err)
589 lastError = err
590 return false, nil
591 })
592 if err != nil {
593 return nil, lastError
594 }
595
596 clusterStatus[ep] = resp
597 }
598 return clusterStatus, nil
599 }
600
601
602 func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
603 for i := 0; i < retries; i++ {
604 if i > 0 {
605 klog.V(1).Infof("[etcd] Waiting %v until next retry\n", retryInterval)
606 time.Sleep(retryInterval)
607 }
608 klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
609 _, err := c.getClusterStatus()
610 if err != nil {
611 switch err {
612 case context.DeadlineExceeded:
613 klog.V(1).Infof("[etcd] Attempt timed out")
614 default:
615 klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err)
616 }
617 continue
618 }
619 return true, nil
620 }
621 return false, errors.New("timeout waiting for etcd cluster to be available")
622 }
623
624
625
626 func GetClientURL(localEndpoint *kubeadmapi.APIEndpoint) string {
627 return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenClientPort))
628 }
629
630
631
632 func GetPeerURL(localEndpoint *kubeadmapi.APIEndpoint) string {
633 return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenPeerPort))
634 }
635
636
637
638 func GetClientURLByIP(ip string) string {
639 return "https://" + net.JoinHostPort(ip, strconv.Itoa(constants.EtcdListenClientPort))
640 }
641
View as plain text