...

Source file src/edge-infra.dev/pkg/sds/lib/etcd/server/embed/cluster.go

Documentation: edge-infra.dev/pkg/sds/lib/etcd/server/embed

     1  package embed
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"strings"
     7  
     8  	"go.etcd.io/etcd/server/v3/embed"
     9  	"golang.org/x/sync/errgroup"
    10  
    11  	"edge-infra.dev/pkg/lib/fog"
    12  )
    13  
    14  // Cluster struct representing the etcd cluster
    15  type Cluster struct {
    16  	initialized bool
    17  	members     []*Member
    18  	errCh       chan error
    19  }
    20  
    21  // NewCluster should be passed the members that are intended to form the initial
    22  // etcd cluster. This method will take care of adding them to the Cluster
    23  // struct, but will not start the Member's etcd instances or add them to the
    24  // physical cluster. The Start method should be called on the Cluster to
    25  // complete that process.
    26  func NewCluster(members ...*Member) (*Cluster, error) {
    27  	if len(members) == 0 {
    28  		return nil, errors.New("failed to initialize cluster, no members provided")
    29  	}
    30  
    31  	c := &Cluster{
    32  		members: []*Member{},
    33  		errCh:   make(chan error),
    34  	}
    35  	c.members = append(c.members, members...)
    36  
    37  	return c, nil
    38  }
    39  
    40  // ClientURLs will return a list of the current client URLs advertised for the
    41  // Cluster's Members.
    42  func (c *Cluster) ClientURLs() []string {
    43  	for _, m := range c.members {
    44  		if m.etcd.Server == nil {
    45  			continue
    46  		}
    47  		return m.etcd.Server.Cluster().ClientURLs()
    48  	}
    49  	return []string{}
    50  }
    51  
    52  // CurrentCluster returns a string representation of the current Cluster that
    53  // can be used for a member's InitialCluster field.
    54  func (c *Cluster) CurrentCluster() string {
    55  	var initialCluster string
    56  	for _, m := range c.members {
    57  		initialCluster = strings.Join([]string{initialCluster, m.config.embed.InitialClusterFromName(m.config.Name)}, ",")
    58  	}
    59  	return strings.Trim(initialCluster, ",")
    60  }
    61  
    62  // Size returns the member count for the etcd cluster.
    63  func (c *Cluster) Size() int {
    64  	for _, m := range c.members {
    65  		if m.etcd.Server == nil {
    66  			continue
    67  		}
    68  		return len(m.etcd.Server.Cluster().Members())
    69  	}
    70  	return 0
    71  }
    72  
    73  // AddMember will add the provided member to the Cluster. The member's etcd
    74  // instance will not be started, however the returned function can be used to
    75  // start the etcd member if the requirement is for it to join the existing
    76  // running Cluster.
    77  //
    78  // The returned start function alone will not join the member to an existing,
    79  // running cluster. If the cluster has already been started, then the use of an
    80  // etcd client will be required to add the member to the cluster from the
    81  // perspective of the current members.
    82  func (c *Cluster) AddMember(member *Member) (func() error, error) {
    83  	if err := member.prepare(); err != nil {
    84  		return nil, err
    85  	}
    86  	c.members = append(c.members, member)
    87  
    88  	var initialCluster string
    89  	for _, m := range c.members {
    90  		initialCluster = strings.Join([]string{initialCluster, m.config.embed.InitialClusterFromName(m.config.Name)}, ",")
    91  	}
    92  	initialCluster = strings.Trim(initialCluster, ",")
    93  	member.config.embed.InitialCluster = initialCluster
    94  
    95  	return func() error {
    96  		member.config.embed.ClusterState = embed.ClusterStateFlagExisting
    97  		return member.start(c.errCh)
    98  	}, nil
    99  }
   100  
   101  // prepare will call the prepare method for each individual member in the
   102  // Cluster. This method should be called before each start call on the Cluster.
   103  //
   104  // If the Cluster has not been initialized before, then the ClusterState will be
   105  // kept as the default "new", however once it has been initialized, we want to
   106  // use the value "existing".
   107  func (c *Cluster) prepare() error {
   108  	for _, member := range c.members {
   109  		if err := member.prepare(); err != nil {
   110  			return err
   111  		}
   112  		if c.initialized {
   113  			member.config.embed.ClusterState = embed.ClusterStateFlagExisting
   114  		}
   115  	}
   116  	return nil
   117  }
   118  
   119  // Start will call the prepare and start methods on each individual member. If
   120  // there is an error while starting any of the member's etcd instances, the
   121  // entire Cluster will be stopped and appropriate errors returned for each
   122  // failing member.
   123  //
   124  // The errors for a particular member can be found by using the member name as
   125  // the key for the error map. If there is no entry for that member name, it
   126  // means that no error was found when starting this member.
   127  func (c *Cluster) Start(ctx context.Context) error {
   128  	if err := c.prepare(); err != nil {
   129  		return err
   130  	}
   131  
   132  	if err := c.start(ctx); err != nil {
   133  		if err := c.Stop(ctx); err != nil {
   134  			fog.FromContext(ctx).Error(err, "failed to stop server")
   135  		}
   136  		return err
   137  	}
   138  
   139  	c.initialized = true
   140  	return nil
   141  }
   142  
   143  // start sets the InitialCluster value for Members to include all current
   144  // Members of the Cluster and then starts each Member's etcd instance. If any
   145  // Member's etcd instance fails to start, the Cluster will be stopped.
   146  func (c *Cluster) start(ctx context.Context) error {
   147  	errGroup, _ := errgroup.WithContext(ctx)
   148  
   149  	for _, member := range c.members {
   150  		errGroup.Go(startFunc(c, member))
   151  	}
   152  	return errGroup.Wait()
   153  }
   154  
   155  // startFunc will return a function that updates an etcd member's initial
   156  // cluster values and then start that member.
   157  func startFunc(cluster *Cluster, member *Member) func() error {
   158  	return func() error {
   159  		member.config.embed.InitialCluster = cluster.CurrentCluster()
   160  		return member.start(cluster.errCh)
   161  	}
   162  }
   163  
   164  // Stop will call the stop method on all Members in the Cluster.
   165  func (c *Cluster) Stop(ctx context.Context) error {
   166  	errGroup, _ := errgroup.WithContext(ctx)
   167  
   168  	for _, member := range c.members {
   169  		errGroup.Go(stopFunc(member))
   170  	}
   171  
   172  	return errGroup.Wait()
   173  }
   174  
   175  // stopFunc will return a function that stops the etcd member
   176  func stopFunc(member *Member) func() error {
   177  	return func() error {
   178  		return member.stop()
   179  	}
   180  }
   181  
   182  // Close will call the close method on all Members in the Cluster.
   183  func (c *Cluster) Close(ctx context.Context) error {
   184  	errGroup, _ := errgroup.WithContext(ctx)
   185  
   186  	for _, member := range c.members {
   187  		errGroup.Go(closeFunc(member))
   188  	}
   189  
   190  	return errGroup.Wait()
   191  }
   192  
   193  // closeFunc will return a function that closes the etcd member
   194  func closeFunc(member *Member) func() error {
   195  	return func() error {
   196  		return member.close()
   197  	}
   198  }
   199  
   200  // Err returns an error channel that can be watched for runtime errors from the
   201  // Member's etcd instances.
   202  func (c *Cluster) Err() <-chan error {
   203  	return c.errCh
   204  }
   205  

View as plain text