package embed import ( "errors" "fmt" "net/url" "strings" "time" "github.com/spf13/afero" "go.etcd.io/etcd/server/v3/embed" ) const ( preStart = "pre-start" running = "running" stopped = "stopped" terminated = "terminated" ) // Member represents an etcd member type Member struct { state memberState etcd *embed.Etcd config *Config clientAddress assignedAddress peerAddress assignedAddress } // NewMember returns a new member object that can be added to a Cluster. If no // Name is set in the config provided then an error will be returned. func NewMember(config *Config) (*Member, error) { if config.Name == "" { return nil, errors.New("invalid member config: no member name provided") } config.embed = &embed.Config{} if config.PeerURL == nil { config.PeerURL = &url.URL{} } if config.ClientURL == nil { config.ClientURL = &url.URL{} } return &Member{ state: &preStartState{}, config: config, peerAddress: assignedAddress{}, clientAddress: assignedAddress{}, }, nil } // ClientURL returns the URL to be used by clients to establish a connection // with the etcd member. func (m *Member) ClientURL() *url.URL { if len(m.config.embed.ListenClientUrls) == 0 { return nil } return &m.config.embed.ListenClientUrls[0] } // PeerURL returns the URL to be used by other etcd members to establish a // connection with the etcd member. func (m *Member) PeerURL() *url.URL { if len(m.config.embed.ListenPeerUrls) == 0 { return nil } return &m.config.embed.ListenPeerUrls[0] } // memberState is an interface that represents the methods that behave // differently for the Member depending on the state of the Member. type memberState interface { prepare(*Member) error start(*Member, chan<- error) error stop(*Member) error close(*Member) error String() string } // prepare is responsible for preparing the Member to be started. This involves // adding some extra configuration and ensuring the Member has everything it // needs to start it's etcd instance. // // prepare will direct the call to the correct method depending on the current // state of the Member. For more information about the possible prepare method // functionality, see defaultState, preStartState, runningState, stoppedState, // terminatedState func (m *Member) prepare() (err error) { return m.state.prepare(m) } // start is responsible for creating an etcd server and starting the Member's // etcd instance. // // start will direct the call to the correct method depending on the current // state of the Member. For more information about the possible start method // functionality, see defaultState, preStartState, runningState, stoppedState, // terminatedState func (m *Member) start(errCh chan<- error) error { return m.state.start(m, errCh) } // stop is responsible for stopping the Member's etcd instance. // // stop will direct the call to the correct method depending on the current // state of the Member. For more information about the possible stop method // functionality, see defaultState, preStartState, runningState, stoppedState, // terminatedState func (m *Member) stop() error { return m.state.stop(m) } // close is responsible for closing the Member's etcd instance and cleaning up // any lingering servers and listeners. // // close will direct the call to the correct method depending on the current // state of the Member. For more information about the possible close method // functionality, see defaultState, preStartState, runningState, stoppedState, // terminatedState func (m *Member) close() error { return m.state.close(m) } // defaultState is the base/default state used for the Member, this is generally // the most common functionality to all states. type defaultState struct{} // prepare returns nil as in most cases we want prepare to do nothing as a // default. func (d *defaultState) prepare(_ *Member) error { return nil } // start will start the etcd instance for the Member. It will then wait for 20 // seconds for the instance to report back that it is ready. If the instance // does not report ready in this time, then the instance will be stopped and the // function will fail. // // A separate go routine is started to monitor errors in the etcd instance // during runtime and send them back to the provided error channel. // // If the method is successful then the Member is transitioned to runningState. func (d *defaultState) start(m *Member, errCh chan<- error) error { e, err := embed.StartEtcd(m.config.embed) if err != nil { return fmt.Errorf("failed to start etcd member %s: %w", m.config.Name, err) } m.etcd = e if err := m.waitForServer(); err != nil { m.etcd.Server.Stop() return err } go m.watchErr(errCh) m.state = &runningState{} return nil } // stop will stop the Member's etcd instance and transition the Member into // stoppedState. func (d *defaultState) stop(m *Member) error { //nolint:unparam m.etcd.Server.Stop() m.state = &stoppedState{} return nil } // close will close the Member's etcd instance and then transition the Member // into terminatedState. func (d *defaultState) close(m *Member) error { //nolint:unparam m.etcd.Close() m.state = &terminatedState{} return nil } // String returns the string representation of the defaultState name. This is an // empty string as this state should never be used directly. func (d *defaultState) String() string { return "" } // preStartState is the state a Member will be in when it is first created, // before it has been started for the first time. type preStartState struct{ defaultState } // prepare updates the config values for the Member and prepares the config for // a start of the etcd instance. A host:port combination will be reserved for // the member to ensure that it has an available host:port when it is started. // Within preStartState is the only time we want the prepare method to run. func (p *preStartState) prepare(m *Member) (err error) { m.initialize() return m.prepareNew() } // stop returns nil without making any changes. This is because a Member in // preStartState has not been started yet and so can not be stopped. An attempt // to do so would make the application hang indefinitely. func (p *preStartState) stop(_ *Member) error { return nil } // close returns nil without making any changes because a Member that has not // been started has no reason to be closed. func (p *preStartState) close(_ *Member) error { return nil } // String returns the string representation of the preStartState name. func (p *preStartState) String() string { return preStart } // runningState is the state a Member will be in when it has been successfully // started. type runningState struct{ defaultState } // start returns nil without making any changes because a Member that has // already been started should not try to start again. func (r *runningState) start(_ *Member, _ chan<- error) error { return nil } // String returns the string representation of the runningState name. func (r *runningState) String() string { return running } // stoppedState is the state a Member will be in when it has been stopped, but // not closed. type stoppedState struct{ defaultState } // start will close the Member before calling start on the Member. Due to the // nature of how stop works, it will leave lingering connections from the // previous server instance which stop the new start from successfully binding // to it's port. To allow the start to be successful, we first close those // connections. The call to close will transition the Member into // terminatedState and the start call will be directed to // terminatedState.start(). func (s *stoppedState) start(m *Member, errCh chan<- error) error { if err := s.close(m); err != nil { return err } return m.start(errCh) } // stop will return nil without making any changes, this is because a Member // that is already stopped should not have another stop attempted. func (s *stoppedState) stop(_ *Member) error { return nil } // String returns the string representation of the stoppedState name. func (s *stoppedState) String() string { return stopped } // terminatedState is the state a Member will be in when it has been closed. type terminatedState struct{ defaultState } // stop will return nil without making any changes, this is because a Member // that is already closed should not have a stop attempted. func (t *terminatedState) stop(_ *Member) error { return nil } // close will return nil without making any changes, this is because a closed // Member should not be closed again. func (t *terminatedState) close(_ *Member) error { return nil } // String returns the string representation of the terminatedState name. func (t *terminatedState) String() string { return terminated } // intiialize prepares the etcd member to join a new Cluster by loading the // configuration options from the config provided via the Member constructor // call. func (m *Member) initialize() { m.config.embed = embed.NewConfig() m.config.embed.Name = m.config.Name m.config.embed.Dir = strings.Join([]string{"default.etcd", m.config.Name}, ".") m.config.embed.LogLevel = m.config.LogLevel if m.config.embed.LogLevel == "" { m.config.embed.LogLevel = "error" } } // prepareNew will delete any existing etcd state present for the Member from // the filesystem and ensure that the Member has a port on localhost to use if a // host port combinations were not provided via the config for the client or // peer URLs. func (m *Member) prepareNew() error { fs := afero.NewOsFs() if err := fs.RemoveAll(m.config.embed.Dir); err != nil { return err } return m.configureURLs(m.config.PeerURL, m.config.ClientURL) } // configureURLs assigns the host:port combination to the member. If the // host/port has been provided for the client/peer URLs then these will be used, // otherwise the default host will be localhost and the port will be selected at // random from the available open ports. func (m *Member) configureURLs(peerURL *url.URL, clientURL *url.URL) error { if err := m.peerAddress.assign(peerURL); err != nil { return err } if err := m.clientAddress.assign(clientURL); err != nil { return err } m.config.embed.ListenPeerUrls = []url.URL{m.peerAddress.url()} m.config.embed.ListenClientUrls = []url.URL{m.clientAddress.url()} m.config.embed.AdvertisePeerUrls = []url.URL{m.peerAddress.url()} m.config.embed.AdvertiseClientUrls = []url.URL{m.clientAddress.url()} return nil } // waitForServer waits for 30 seconds for the etcd instance to send a message to // the ReadyNotify channel. If the instance does not become healthy within this // timeframe, an error is returned. func (m *Member) waitForServer() error { select { case <-m.etcd.Server.ReadyNotify(): return nil case <-time.After(30 * time.Second): return errors.New("failed to start server") } } // watchErr will watch for stop events and errors for the member. If errors are // found, they will be forwarded to the specified error channel (which is // normally a shared Cluster error channel). If a stop event is found, then the // function ends to ensure it does not outlive the current etcd instance. func (m *Member) watchErr(errCh chan<- error) { for { select { case <-m.etcd.Server.StopNotify(): return case err := <-m.etcd.Err(): errCh <- err } } }