...

Source file src/k8s.io/kubernetes/cmd/kubeadm/app/util/etcd/etcd.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/util/etcd

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package etcd
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"fmt"
    23  	"net"
    24  	"net/url"
    25  	"path/filepath"
    26  	"strconv"
    27  	"strings"
    28  	"time"
    29  
    30  	"github.com/pkg/errors"
    31  	"go.etcd.io/etcd/api/v3/etcdserverpb"
    32  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    33  	"go.etcd.io/etcd/client/pkg/v3/transport"
    34  	clientv3 "go.etcd.io/etcd/client/v3"
    35  	"google.golang.org/grpc"
    36  
    37  	corev1 "k8s.io/api/core/v1"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	"k8s.io/klog/v2"
    42  
    43  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    44  	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
    45  )
    46  
    47  const etcdTimeout = 2 * time.Second
    48  
    49  // ErrNoMemberIDForPeerURL is returned when it is not possible to obtain a member ID
    50  // from a given peer URL
    51  var ErrNoMemberIDForPeerURL = errors.New("no member id found for peer URL")
    52  
    53  // ClusterInterrogator is an interface to get etcd cluster related information
    54  type ClusterInterrogator interface {
    55  	CheckClusterHealth() error
    56  	WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error)
    57  	Sync() error
    58  	ListMembers() ([]Member, error)
    59  	AddMember(name string, peerAddrs string) ([]Member, error)
    60  	AddMemberAsLearner(name string, peerAddrs string) ([]Member, error)
    61  	MemberPromote(learnerID uint64) error
    62  	GetMemberID(peerURL string) (uint64, error)
    63  	RemoveMember(id uint64) ([]Member, error)
    64  }
    65  
    66  type etcdClient interface {
    67  	// Close shuts down the client's etcd connections.
    68  	Close() error
    69  
    70  	// Endpoints lists the registered endpoints for the client.
    71  	Endpoints() []string
    72  
    73  	// MemberList lists the current cluster membership.
    74  	MemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
    75  
    76  	// MemberAdd adds a new member into the cluster.
    77  	MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
    78  
    79  	// MemberAddAsLearner adds a new learner member into the cluster.
    80  	MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
    81  
    82  	// MemberRemove removes an existing member from the cluster.
    83  	MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
    84  
    85  	// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
    86  	MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)
    87  
    88  	// Status gets the status of the endpoint.
    89  	Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
    90  
    91  	// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
    92  	Sync(ctx context.Context) error
    93  }
    94  
    95  // Client provides connection parameters for an etcd cluster
    96  type Client struct {
    97  	Endpoints []string
    98  
    99  	newEtcdClient func(endpoints []string) (etcdClient, error)
   100  
   101  	listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
   102  }
   103  
   104  // New creates a new EtcdCluster client
   105  func New(endpoints []string, ca, cert, key string) (*Client, error) {
   106  	client := Client{Endpoints: endpoints}
   107  
   108  	var err error
   109  	var tlsConfig *tls.Config
   110  	if ca != "" || cert != "" || key != "" {
   111  		tlsInfo := transport.TLSInfo{
   112  			CertFile:      cert,
   113  			KeyFile:       key,
   114  			TrustedCAFile: ca,
   115  		}
   116  		tlsConfig, err = tlsInfo.ClientConfig()
   117  		if err != nil {
   118  			return nil, err
   119  		}
   120  	}
   121  
   122  	client.newEtcdClient = func(endpoints []string) (etcdClient, error) {
   123  		return clientv3.New(clientv3.Config{
   124  			Endpoints:   endpoints,
   125  			DialTimeout: etcdTimeout,
   126  			DialOptions: []grpc.DialOption{
   127  				grpc.WithBlock(), // block until the underlying connection is up
   128  			},
   129  			TLS: tlsConfig,
   130  		})
   131  	}
   132  
   133  	client.listMembersFunc = client.listMembers
   134  
   135  	return &client, nil
   136  }
   137  
   138  // NewFromCluster creates an etcd client for the etcd endpoints present in etcd member list. In order to compose this information,
   139  // it will first discover at least one etcd endpoint to connect to. Once created, the client synchronizes client's endpoints with
   140  // the known endpoints from the etcd membership API, since it is the authoritative source of truth for the list of available members.
   141  func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
   142  	// Discover at least one etcd endpoint to connect to by inspecting the existing etcd pods
   143  
   144  	// Get the list of etcd endpoints
   145  	endpoints, err := getEtcdEndpoints(client)
   146  	if err != nil {
   147  		return nil, err
   148  	}
   149  	klog.V(1).Infof("etcd endpoints read from pods: %s", strings.Join(endpoints, ","))
   150  
   151  	// Creates an etcd client
   152  	etcdClient, err := New(
   153  		endpoints,
   154  		filepath.Join(certificatesDir, constants.EtcdCACertName),
   155  		filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
   156  		filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
   157  	)
   158  	if err != nil {
   159  		return nil, errors.Wrapf(err, "error creating etcd client for %v endpoints", endpoints)
   160  	}
   161  
   162  	// synchronizes client's endpoints with the known endpoints from the etcd membership.
   163  	err = etcdClient.Sync()
   164  	if err != nil {
   165  		return nil, errors.Wrap(err, "error syncing endpoints with etcd")
   166  	}
   167  	klog.V(1).Infof("update etcd endpoints: %s", strings.Join(etcdClient.Endpoints, ","))
   168  
   169  	return etcdClient, nil
   170  }
   171  
   172  // getEtcdEndpoints returns the list of etcd endpoints.
   173  func getEtcdEndpoints(client clientset.Interface) ([]string, error) {
   174  	return getEtcdEndpointsWithRetry(client,
   175  		constants.KubernetesAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration)
   176  }
   177  
   178  func getEtcdEndpointsWithRetry(client clientset.Interface, interval, timeout time.Duration) ([]string, error) {
   179  	return getRawEtcdEndpointsFromPodAnnotation(client, interval, timeout)
   180  }
   181  
   182  // getRawEtcdEndpointsFromPodAnnotation returns the list of endpoints as reported on etcd's pod annotations using the given backoff
   183  func getRawEtcdEndpointsFromPodAnnotation(client clientset.Interface, interval, timeout time.Duration) ([]string, error) {
   184  	etcdEndpoints := []string{}
   185  	var lastErr error
   186  	// Let's tolerate some unexpected transient failures from the API server or load balancers. Also, if
   187  	// static pods were not yet mirrored into the API server we want to wait for this propagation.
   188  	err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true,
   189  		func(_ context.Context) (bool, error) {
   190  			var overallEtcdPodCount int
   191  			if etcdEndpoints, overallEtcdPodCount, lastErr = getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client); lastErr != nil {
   192  				return false, nil
   193  			}
   194  			if len(etcdEndpoints) == 0 || overallEtcdPodCount != len(etcdEndpoints) {
   195  				klog.V(4).Infof("found a total of %d etcd pods and the following endpoints: %v; retrying",
   196  					overallEtcdPodCount, etcdEndpoints)
   197  				return false, nil
   198  			}
   199  			return true, nil
   200  		})
   201  	if err != nil {
   202  		const message = "could not retrieve the list of etcd endpoints"
   203  		if lastErr != nil {
   204  			return []string{}, errors.Wrap(lastErr, message)
   205  		}
   206  		return []string{}, errors.Wrap(err, message)
   207  	}
   208  	return etcdEndpoints, nil
   209  }
   210  
   211  // getRawEtcdEndpointsFromPodAnnotationWithoutRetry returns the list of etcd endpoints as reported by etcd Pod annotations,
   212  // along with the number of global etcd pods. This allows for callers to tell the difference between "no endpoints found",
   213  // and "no endpoints found and pods were listed", so they can skip retrying.
   214  func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface) ([]string, int, error) {
   215  	klog.V(3).Infof("retrieving etcd endpoints from %q annotation in etcd Pods", constants.EtcdAdvertiseClientUrlsAnnotationKey)
   216  	podList, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(
   217  		context.TODO(),
   218  		metav1.ListOptions{
   219  			LabelSelector: fmt.Sprintf("component=%s,tier=%s", constants.Etcd, constants.ControlPlaneTier),
   220  		},
   221  	)
   222  	if err != nil {
   223  		return []string{}, 0, err
   224  	}
   225  	etcdEndpoints := []string{}
   226  	for _, pod := range podList.Items {
   227  		podIsReady := false
   228  		for _, c := range pod.Status.Conditions {
   229  			if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
   230  				podIsReady = true
   231  				break
   232  			}
   233  		}
   234  		if !podIsReady {
   235  			klog.V(3).Infof("etcd pod %q is not ready", pod.ObjectMeta.Name)
   236  		}
   237  		etcdEndpoint, ok := pod.ObjectMeta.Annotations[constants.EtcdAdvertiseClientUrlsAnnotationKey]
   238  		if !ok {
   239  			klog.V(3).Infof("etcd Pod %q is missing the %q annotation; cannot infer etcd advertise client URL using the Pod annotation", pod.ObjectMeta.Name, constants.EtcdAdvertiseClientUrlsAnnotationKey)
   240  			continue
   241  		}
   242  		etcdEndpoints = append(etcdEndpoints, etcdEndpoint)
   243  	}
   244  	return etcdEndpoints, len(podList.Items), nil
   245  }
   246  
   247  // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
   248  func (c *Client) Sync() error {
   249  	// Syncs the list of endpoints
   250  	var cli etcdClient
   251  	var lastError error
   252  	err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
   253  		true, func(_ context.Context) (bool, error) {
   254  			var err error
   255  			cli, err = c.newEtcdClient(c.Endpoints)
   256  			if err != nil {
   257  				lastError = err
   258  				return false, nil
   259  			}
   260  			defer func() { _ = cli.Close() }()
   261  			ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
   262  			err = cli.Sync(ctx)
   263  			cancel()
   264  			if err == nil {
   265  				return true, nil
   266  			}
   267  			klog.V(5).Infof("Failed to sync etcd endpoints: %v", err)
   268  			lastError = err
   269  			return false, nil
   270  		})
   271  	if err != nil {
   272  		return lastError
   273  	}
   274  	klog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
   275  
   276  	c.Endpoints = cli.Endpoints()
   277  	return nil
   278  }
   279  
   280  // Member struct defines an etcd member; it is used for avoiding to spread github.com/coreos/etcd dependency
   281  // across kubeadm codebase
   282  type Member struct {
   283  	Name    string
   284  	PeerURL string
   285  }
   286  
   287  func (c *Client) listMembers(timeout time.Duration) (*clientv3.MemberListResponse, error) {
   288  	// Gets the member list
   289  	var lastError error
   290  	var resp *clientv3.MemberListResponse
   291  	if timeout == 0 {
   292  		timeout = constants.EtcdAPICallTimeout
   293  	}
   294  	err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, timeout,
   295  		true, func(_ context.Context) (bool, error) {
   296  			cli, err := c.newEtcdClient(c.Endpoints)
   297  			if err != nil {
   298  				lastError = err
   299  				return false, nil
   300  			}
   301  			defer func() { _ = cli.Close() }()
   302  
   303  			ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
   304  			resp, err = cli.MemberList(ctx)
   305  			cancel()
   306  			if err == nil {
   307  				return true, nil
   308  			}
   309  			klog.V(5).Infof("Failed to get etcd member list: %v", err)
   310  			lastError = err
   311  			return false, nil
   312  		})
   313  	if err != nil {
   314  		return nil, lastError
   315  	}
   316  	return resp, nil
   317  }
   318  
   319  // GetMemberID returns the member ID of the given peer URL
   320  func (c *Client) GetMemberID(peerURL string) (uint64, error) {
   321  	resp, err := c.listMembersFunc(0)
   322  	if err != nil {
   323  		return 0, err
   324  	}
   325  
   326  	for _, member := range resp.Members {
   327  		if member.GetPeerURLs()[0] == peerURL {
   328  			return member.GetID(), nil
   329  		}
   330  	}
   331  	return 0, ErrNoMemberIDForPeerURL
   332  }
   333  
   334  // ListMembers returns the member list.
   335  func (c *Client) ListMembers() ([]Member, error) {
   336  	resp, err := c.listMembersFunc(0)
   337  	if err != nil {
   338  		return nil, err
   339  	}
   340  
   341  	ret := make([]Member, 0, len(resp.Members))
   342  	for _, m := range resp.Members {
   343  		ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
   344  	}
   345  	return ret, nil
   346  }
   347  
   348  // RemoveMember notifies an etcd cluster to remove an existing member
   349  func (c *Client) RemoveMember(id uint64) ([]Member, error) {
   350  	// Remove an existing member from the cluster
   351  	var lastError error
   352  	var resp *clientv3.MemberRemoveResponse
   353  	err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
   354  		true, func(_ context.Context) (bool, error) {
   355  			cli, err := c.newEtcdClient(c.Endpoints)
   356  			if err != nil {
   357  				lastError = err
   358  				return false, nil
   359  			}
   360  			defer func() { _ = cli.Close() }()
   361  
   362  			ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
   363  			resp, err = cli.MemberRemove(ctx, id)
   364  			cancel()
   365  			if err == nil {
   366  				return true, nil
   367  			}
   368  			if errors.Is(rpctypes.ErrMemberNotFound, err) {
   369  				klog.V(5).Infof("Member was already removed, because member %s was not found", strconv.FormatUint(id, 16))
   370  				return true, nil
   371  			}
   372  			klog.V(5).Infof("Failed to remove etcd member: %v", err)
   373  			lastError = err
   374  			return false, nil
   375  		})
   376  	if err != nil {
   377  		return nil, lastError
   378  	}
   379  
   380  	// Returns the updated list of etcd members
   381  	ret := []Member{}
   382  	if resp != nil {
   383  		for _, m := range resp.Members {
   384  			ret = append(ret, Member{Name: m.Name, PeerURL: m.PeerURLs[0]})
   385  		}
   386  
   387  	}
   388  
   389  	return ret, nil
   390  }
   391  
   392  // AddMember adds a new member into the etcd cluster
   393  func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
   394  	return c.addMember(name, peerAddrs, false)
   395  }
   396  
   397  // AddMemberAsLearner adds a new learner member into the etcd cluster.
   398  func (c *Client) AddMemberAsLearner(name string, peerAddrs string) ([]Member, error) {
   399  	return c.addMember(name, peerAddrs, true)
   400  }
   401  
   402  // addMember notifies an existing etcd cluster that a new member is joining, and
   403  // return the updated list of members. If the member has already been added to the
   404  // cluster, this will return the existing list of etcd members.
   405  func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Member, error) {
   406  	// Parse the peer address, required to add the client URL later to the list
   407  	// of endpoints for this client. Parsing as a first operation to make sure that
   408  	// if this fails no member addition is performed on the etcd cluster.
   409  	parsedPeerAddrs, err := url.Parse(peerAddrs)
   410  	if err != nil {
   411  		return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
   412  	}
   413  
   414  	cli, err := c.newEtcdClient(c.Endpoints)
   415  	if err != nil {
   416  		return nil, err
   417  	}
   418  	defer func() { _ = cli.Close() }()
   419  
   420  	// Adds a new member to the cluster
   421  	var (
   422  		lastError   error
   423  		respMembers []*etcdserverpb.Member
   424  		learnerID   uint64
   425  		resp        *clientv3.MemberAddResponse
   426  	)
   427  	err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
   428  		true, func(_ context.Context) (bool, error) {
   429  			ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
   430  			defer cancel()
   431  			if isLearner {
   432  				// if learnerID is set, it means the etcd member is already added successfully.
   433  				if learnerID == 0 {
   434  					klog.V(1).Info("[etcd] Adding etcd member as learner")
   435  					resp, err = cli.MemberAddAsLearner(ctx, []string{peerAddrs})
   436  					if err != nil {
   437  						lastError = err
   438  						return false, nil
   439  					}
   440  					learnerID = resp.Member.ID
   441  				}
   442  				respMembers = resp.Members
   443  				return true, nil
   444  			}
   445  
   446  			resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
   447  			if err == nil {
   448  				respMembers = resp.Members
   449  				return true, nil
   450  			}
   451  
   452  			// If the error indicates that the peer already exists, exit early. In this situation, resp is nil, so
   453  			// call out to MemberList to fetch all the members before returning.
   454  			if errors.Is(err, rpctypes.ErrPeerURLExist) {
   455  				klog.V(5).Info("The peer URL for the added etcd member already exists. Fetching the existing etcd members")
   456  				var listResp *clientv3.MemberListResponse
   457  				listResp, err = cli.MemberList(ctx)
   458  				if err == nil {
   459  					respMembers = listResp.Members
   460  					return true, nil
   461  				}
   462  			}
   463  
   464  			klog.V(5).Infof("Failed to add etcd member: %v", err)
   465  			lastError = err
   466  			return false, nil
   467  		})
   468  	if err != nil {
   469  		return nil, lastError
   470  	}
   471  
   472  	// Returns the updated list of etcd members
   473  	ret := []Member{}
   474  	for _, m := range respMembers {
   475  		// If the peer address matches, this is the member we are adding.
   476  		// Use the name we passed to the function.
   477  		if peerAddrs == m.PeerURLs[0] {
   478  			ret = append(ret, Member{Name: name, PeerURL: peerAddrs})
   479  			continue
   480  		}
   481  		// Otherwise, we are processing other existing etcd members returned by AddMembers.
   482  		memberName := m.Name
   483  		// In some cases during concurrent join, some members can end up without a name.
   484  		// Use the member ID as name for those.
   485  		if len(memberName) == 0 {
   486  			memberName = strconv.FormatUint(m.ID, 16)
   487  		}
   488  		ret = append(ret, Member{Name: memberName, PeerURL: m.PeerURLs[0]})
   489  	}
   490  
   491  	// Add the new member client address to the list of endpoints
   492  	c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
   493  
   494  	return ret, nil
   495  }
   496  
   497  // isLearner returns true if the given member ID is a learner.
   498  func (c *Client) isLearner(memberID uint64) (bool, error) {
   499  	resp, err := c.listMembersFunc(0)
   500  	if err != nil {
   501  		return false, err
   502  	}
   503  
   504  	for _, member := range resp.Members {
   505  		if member.ID == memberID && member.IsLearner {
   506  			return true, nil
   507  		}
   508  	}
   509  	return false, nil
   510  }
   511  
   512  // MemberPromote promotes a member as a voting member. If the given member ID is already a voting member this method
   513  // will return early and do nothing.
   514  func (c *Client) MemberPromote(learnerID uint64) error {
   515  	isLearner, err := c.isLearner(learnerID)
   516  	if err != nil {
   517  		return err
   518  	}
   519  	if !isLearner {
   520  		klog.V(1).Infof("[etcd] Member %s already promoted.", strconv.FormatUint(learnerID, 16))
   521  		return nil
   522  	}
   523  
   524  	klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %s", strconv.FormatUint(learnerID, 16))
   525  	cli, err := c.newEtcdClient(c.Endpoints)
   526  	if err != nil {
   527  		return err
   528  	}
   529  	defer func() { _ = cli.Close() }()
   530  
   531  	// TODO: warning logs from etcd client should be removed.
   532  	// The warning logs are printed by etcd client code for several reasons, including
   533  	// 1. can not promote yet(no synced)
   534  	// 2. context deadline exceeded
   535  	// 3. peer URLs already exists
   536  	// Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited.
   537  	var (
   538  		lastError error
   539  	)
   540  	err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
   541  		true, func(_ context.Context) (bool, error) {
   542  			ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
   543  			defer cancel()
   544  
   545  			_, err = cli.MemberPromote(ctx, learnerID)
   546  			if err == nil {
   547  				klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", strconv.FormatUint(learnerID, 16))
   548  				return true, nil
   549  			}
   550  			klog.V(5).Infof("[etcd] Promoting the learner %s failed: %v", strconv.FormatUint(learnerID, 16), err)
   551  			lastError = err
   552  			return false, nil
   553  		})
   554  	if err != nil {
   555  		return lastError
   556  	}
   557  	return nil
   558  }
   559  
   560  // CheckClusterHealth returns nil for status Up or error for status Down
   561  func (c *Client) CheckClusterHealth() error {
   562  	_, err := c.getClusterStatus()
   563  	return err
   564  }
   565  
   566  // getClusterStatus returns nil for status Up (along with endpoint status response map) or error for status Down
   567  func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error) {
   568  	clusterStatus := make(map[string]*clientv3.StatusResponse)
   569  	for _, ep := range c.Endpoints {
   570  		// Gets the member status
   571  		var lastError error
   572  		var resp *clientv3.StatusResponse
   573  		err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
   574  			true, func(_ context.Context) (bool, error) {
   575  				cli, err := c.newEtcdClient(c.Endpoints)
   576  				if err != nil {
   577  					lastError = err
   578  					return false, nil
   579  				}
   580  				defer func() { _ = cli.Close() }()
   581  
   582  				ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
   583  				resp, err = cli.Status(ctx, ep)
   584  				cancel()
   585  				if err == nil {
   586  					return true, nil
   587  				}
   588  				klog.V(5).Infof("Failed to get etcd status for %s: %v", ep, err)
   589  				lastError = err
   590  				return false, nil
   591  			})
   592  		if err != nil {
   593  			return nil, lastError
   594  		}
   595  
   596  		clusterStatus[ep] = resp
   597  	}
   598  	return clusterStatus, nil
   599  }
   600  
   601  // WaitForClusterAvailable returns true if all endpoints in the cluster are available after retry attempts, an error is returned otherwise
   602  func (c *Client) WaitForClusterAvailable(retries int, retryInterval time.Duration) (bool, error) {
   603  	for i := 0; i < retries; i++ {
   604  		if i > 0 {
   605  			klog.V(1).Infof("[etcd] Waiting %v until next retry\n", retryInterval)
   606  			time.Sleep(retryInterval)
   607  		}
   608  		klog.V(2).Infof("[etcd] attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
   609  		_, err := c.getClusterStatus()
   610  		if err != nil {
   611  			switch err {
   612  			case context.DeadlineExceeded:
   613  				klog.V(1).Infof("[etcd] Attempt timed out")
   614  			default:
   615  				klog.V(1).Infof("[etcd] Attempt failed with error: %v\n", err)
   616  			}
   617  			continue
   618  		}
   619  		return true, nil
   620  	}
   621  	return false, errors.New("timeout waiting for etcd cluster to be available")
   622  }
   623  
   624  // GetClientURL creates an HTTPS URL that uses the configured advertise
   625  // address and client port for the API controller
   626  func GetClientURL(localEndpoint *kubeadmapi.APIEndpoint) string {
   627  	return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenClientPort))
   628  }
   629  
   630  // GetPeerURL creates an HTTPS URL that uses the configured advertise
   631  // address and peer port for the API controller
   632  func GetPeerURL(localEndpoint *kubeadmapi.APIEndpoint) string {
   633  	return "https://" + net.JoinHostPort(localEndpoint.AdvertiseAddress, strconv.Itoa(constants.EtcdListenPeerPort))
   634  }
   635  
   636  // GetClientURLByIP creates an HTTPS URL based on an IP address
   637  // and the client listening port.
   638  func GetClientURLByIP(ip string) string {
   639  	return "https://" + net.JoinHostPort(ip, strconv.Itoa(constants.EtcdListenClientPort))
   640  }
   641  

View as plain text