package embed import ( "context" "errors" "strings" "go.etcd.io/etcd/server/v3/embed" "golang.org/x/sync/errgroup" "edge-infra.dev/pkg/lib/fog" ) // Cluster struct representing the etcd cluster type Cluster struct { initialized bool members []*Member errCh chan error } // NewCluster should be passed the members that are intended to form the initial // etcd cluster. This method will take care of adding them to the Cluster // struct, but will not start the Member's etcd instances or add them to the // physical cluster. The Start method should be called on the Cluster to // complete that process. func NewCluster(members ...*Member) (*Cluster, error) { if len(members) == 0 { return nil, errors.New("failed to initialize cluster, no members provided") } c := &Cluster{ members: []*Member{}, errCh: make(chan error), } c.members = append(c.members, members...) return c, nil } // ClientURLs will return a list of the current client URLs advertised for the // Cluster's Members. func (c *Cluster) ClientURLs() []string { for _, m := range c.members { if m.etcd.Server == nil { continue } return m.etcd.Server.Cluster().ClientURLs() } return []string{} } // CurrentCluster returns a string representation of the current Cluster that // can be used for a member's InitialCluster field. func (c *Cluster) CurrentCluster() string { var initialCluster string for _, m := range c.members { initialCluster = strings.Join([]string{initialCluster, m.config.embed.InitialClusterFromName(m.config.Name)}, ",") } return strings.Trim(initialCluster, ",") } // Size returns the member count for the etcd cluster. func (c *Cluster) Size() int { for _, m := range c.members { if m.etcd.Server == nil { continue } return len(m.etcd.Server.Cluster().Members()) } return 0 } // AddMember will add the provided member to the Cluster. The member's etcd // instance will not be started, however the returned function can be used to // start the etcd member if the requirement is for it to join the existing // running Cluster. // // The returned start function alone will not join the member to an existing, // running cluster. If the cluster has already been started, then the use of an // etcd client will be required to add the member to the cluster from the // perspective of the current members. func (c *Cluster) AddMember(member *Member) (func() error, error) { if err := member.prepare(); err != nil { return nil, err } c.members = append(c.members, member) var initialCluster string for _, m := range c.members { initialCluster = strings.Join([]string{initialCluster, m.config.embed.InitialClusterFromName(m.config.Name)}, ",") } initialCluster = strings.Trim(initialCluster, ",") member.config.embed.InitialCluster = initialCluster return func() error { member.config.embed.ClusterState = embed.ClusterStateFlagExisting return member.start(c.errCh) }, nil } // prepare will call the prepare method for each individual member in the // Cluster. This method should be called before each start call on the Cluster. // // If the Cluster has not been initialized before, then the ClusterState will be // kept as the default "new", however once it has been initialized, we want to // use the value "existing". func (c *Cluster) prepare() error { for _, member := range c.members { if err := member.prepare(); err != nil { return err } if c.initialized { member.config.embed.ClusterState = embed.ClusterStateFlagExisting } } return nil } // Start will call the prepare and start methods on each individual member. If // there is an error while starting any of the member's etcd instances, the // entire Cluster will be stopped and appropriate errors returned for each // failing member. // // The errors for a particular member can be found by using the member name as // the key for the error map. If there is no entry for that member name, it // means that no error was found when starting this member. func (c *Cluster) Start(ctx context.Context) error { if err := c.prepare(); err != nil { return err } if err := c.start(ctx); err != nil { if err := c.Stop(ctx); err != nil { fog.FromContext(ctx).Error(err, "failed to stop server") } return err } c.initialized = true return nil } // start sets the InitialCluster value for Members to include all current // Members of the Cluster and then starts each Member's etcd instance. If any // Member's etcd instance fails to start, the Cluster will be stopped. func (c *Cluster) start(ctx context.Context) error { errGroup, _ := errgroup.WithContext(ctx) for _, member := range c.members { errGroup.Go(startFunc(c, member)) } return errGroup.Wait() } // startFunc will return a function that updates an etcd member's initial // cluster values and then start that member. func startFunc(cluster *Cluster, member *Member) func() error { return func() error { member.config.embed.InitialCluster = cluster.CurrentCluster() return member.start(cluster.errCh) } } // Stop will call the stop method on all Members in the Cluster. func (c *Cluster) Stop(ctx context.Context) error { errGroup, _ := errgroup.WithContext(ctx) for _, member := range c.members { errGroup.Go(stopFunc(member)) } return errGroup.Wait() } // stopFunc will return a function that stops the etcd member func stopFunc(member *Member) func() error { return func() error { return member.stop() } } // Close will call the close method on all Members in the Cluster. func (c *Cluster) Close(ctx context.Context) error { errGroup, _ := errgroup.WithContext(ctx) for _, member := range c.members { errGroup.Go(closeFunc(member)) } return errGroup.Wait() } // closeFunc will return a function that closes the etcd member func closeFunc(member *Member) func() error { return func() error { return member.close() } } // Err returns an error channel that can be watched for runtime errors from the // Member's etcd instances. func (c *Cluster) Err() <-chan error { return c.errCh }