...

Source file src/edge-infra.dev/pkg/sds/lib/etcd/server/embed/member.go

Documentation: edge-infra.dev/pkg/sds/lib/etcd/server/embed

     1  package embed
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net/url"
     7  	"strings"
     8  	"time"
     9  
    10  	"github.com/spf13/afero"
    11  	"go.etcd.io/etcd/server/v3/embed"
    12  )
    13  
    14  const (
    15  	preStart   = "pre-start"
    16  	running    = "running"
    17  	stopped    = "stopped"
    18  	terminated = "terminated"
    19  )
    20  
    21  // Member represents an etcd member
    22  type Member struct {
    23  	state memberState
    24  
    25  	etcd          *embed.Etcd
    26  	config        *Config
    27  	clientAddress assignedAddress
    28  	peerAddress   assignedAddress
    29  }
    30  
    31  // NewMember returns a new member object that can be added to a Cluster. If no
    32  // Name is set in the config provided then an error will be returned.
    33  func NewMember(config *Config) (*Member, error) {
    34  	if config.Name == "" {
    35  		return nil, errors.New("invalid member config: no member name provided")
    36  	}
    37  
    38  	config.embed = &embed.Config{}
    39  	if config.PeerURL == nil {
    40  		config.PeerURL = &url.URL{}
    41  	}
    42  	if config.ClientURL == nil {
    43  		config.ClientURL = &url.URL{}
    44  	}
    45  
    46  	return &Member{
    47  		state:         &preStartState{},
    48  		config:        config,
    49  		peerAddress:   assignedAddress{},
    50  		clientAddress: assignedAddress{},
    51  	}, nil
    52  }
    53  
    54  // ClientURL returns the URL to be used by clients to establish a connection
    55  // with the etcd member.
    56  func (m *Member) ClientURL() *url.URL {
    57  	if len(m.config.embed.ListenClientUrls) == 0 {
    58  		return nil
    59  	}
    60  	return &m.config.embed.ListenClientUrls[0]
    61  }
    62  
    63  // PeerURL returns the URL to be used by other etcd members to establish a
    64  // connection with the etcd member.
    65  func (m *Member) PeerURL() *url.URL {
    66  	if len(m.config.embed.ListenPeerUrls) == 0 {
    67  		return nil
    68  	}
    69  	return &m.config.embed.ListenPeerUrls[0]
    70  }
    71  
    72  // memberState is an interface that represents the methods that behave
    73  // differently for the Member depending on the state of the Member.
    74  type memberState interface {
    75  	prepare(*Member) error
    76  	start(*Member, chan<- error) error
    77  	stop(*Member) error
    78  	close(*Member) error
    79  	String() string
    80  }
    81  
    82  // prepare is responsible for preparing the Member to be started. This involves
    83  // adding some extra configuration and ensuring the Member has everything it
    84  // needs to start it's etcd instance.
    85  //
    86  // prepare will direct the call to the correct method depending on the current
    87  // state of the Member. For more information about the possible prepare method
    88  // functionality, see defaultState, preStartState, runningState, stoppedState,
    89  // terminatedState
    90  func (m *Member) prepare() (err error) {
    91  	return m.state.prepare(m)
    92  }
    93  
    94  // start is responsible for creating an etcd server and starting the Member's
    95  // etcd instance.
    96  //
    97  // start will direct the call to the correct method depending on the current
    98  // state of the Member. For more information about the possible start method
    99  // functionality, see defaultState, preStartState, runningState, stoppedState,
   100  // terminatedState
   101  func (m *Member) start(errCh chan<- error) error {
   102  	return m.state.start(m, errCh)
   103  }
   104  
   105  // stop is responsible for stopping the Member's etcd instance.
   106  //
   107  // stop will direct the call to the correct method depending on the current
   108  // state of the Member. For more information about the possible stop method
   109  // functionality, see defaultState, preStartState, runningState, stoppedState,
   110  // terminatedState
   111  func (m *Member) stop() error {
   112  	return m.state.stop(m)
   113  }
   114  
   115  // close is responsible for closing the Member's etcd instance and cleaning up
   116  // any lingering servers and listeners.
   117  //
   118  // close will direct the call to the correct method depending on the current
   119  // state of the Member. For more information about the possible close method
   120  // functionality, see defaultState, preStartState, runningState, stoppedState,
   121  // terminatedState
   122  func (m *Member) close() error {
   123  	return m.state.close(m)
   124  }
   125  
   126  // defaultState is the base/default state used for the Member, this is generally
   127  // the most common functionality to all states.
   128  type defaultState struct{}
   129  
   130  // prepare returns nil as in most cases we want prepare to do nothing as a
   131  // default.
   132  func (d *defaultState) prepare(_ *Member) error {
   133  	return nil
   134  }
   135  
   136  // start will start the etcd instance for the Member. It will then wait for 20
   137  // seconds for the instance to report back that it is ready. If the instance
   138  // does not report ready in this time, then the instance will be stopped and the
   139  // function will fail.
   140  //
   141  // A separate go routine is started to monitor errors in the etcd instance
   142  // during runtime and send them back to the provided error channel.
   143  //
   144  // If the method is successful then the Member is transitioned to runningState.
   145  func (d *defaultState) start(m *Member, errCh chan<- error) error {
   146  	e, err := embed.StartEtcd(m.config.embed)
   147  	if err != nil {
   148  		return fmt.Errorf("failed to start etcd member %s: %w", m.config.Name, err)
   149  	}
   150  	m.etcd = e
   151  
   152  	if err := m.waitForServer(); err != nil {
   153  		m.etcd.Server.Stop()
   154  		return err
   155  	}
   156  
   157  	go m.watchErr(errCh)
   158  
   159  	m.state = &runningState{}
   160  	return nil
   161  }
   162  
   163  // stop will stop the Member's etcd instance and transition the Member into
   164  // stoppedState.
   165  func (d *defaultState) stop(m *Member) error { //nolint:unparam
   166  	m.etcd.Server.Stop()
   167  	m.state = &stoppedState{}
   168  	return nil
   169  }
   170  
   171  // close will close the Member's etcd instance and then transition the Member
   172  // into terminatedState.
   173  func (d *defaultState) close(m *Member) error { //nolint:unparam
   174  	m.etcd.Close()
   175  	m.state = &terminatedState{}
   176  	return nil
   177  }
   178  
   179  // String returns the string representation of the defaultState name. This is an
   180  // empty string as this state should never be used directly.
   181  func (d *defaultState) String() string {
   182  	return ""
   183  }
   184  
   185  // preStartState is the state a Member will be in when it is first created,
   186  // before it has been started for the first time.
   187  type preStartState struct{ defaultState }
   188  
   189  // prepare updates the config values for the Member and prepares the config for
   190  // a start of the etcd instance. A host:port combination will be reserved for
   191  // the member to ensure that it has an available host:port when it is started.
   192  // Within preStartState is the only time we want the prepare method to run.
   193  func (p *preStartState) prepare(m *Member) (err error) {
   194  	m.initialize()
   195  	return m.prepareNew()
   196  }
   197  
   198  // stop returns nil without making any changes. This is because a Member in
   199  // preStartState has not been started yet and so can not be stopped. An attempt
   200  // to do so would make the application hang indefinitely.
   201  func (p *preStartState) stop(_ *Member) error {
   202  	return nil
   203  }
   204  
   205  // close returns nil without making any changes because a Member that has not
   206  // been started has no reason to be closed.
   207  func (p *preStartState) close(_ *Member) error {
   208  	return nil
   209  }
   210  
   211  // String returns the string representation of the preStartState name.
   212  func (p *preStartState) String() string {
   213  	return preStart
   214  }
   215  
   216  // runningState is the state a Member will be in when it has been successfully
   217  // started.
   218  type runningState struct{ defaultState }
   219  
   220  // start returns nil without making any changes because a Member that has
   221  // already been started should not try to start again.
   222  func (r *runningState) start(_ *Member, _ chan<- error) error {
   223  	return nil
   224  }
   225  
   226  // String returns the string representation of the runningState name.
   227  func (r *runningState) String() string {
   228  	return running
   229  }
   230  
   231  // stoppedState is the state a Member will be in when it has been stopped, but
   232  // not closed.
   233  type stoppedState struct{ defaultState }
   234  
   235  // start will close the Member before calling start on the Member. Due to the
   236  // nature of how stop works, it will leave lingering connections from the
   237  // previous server instance which stop the new start from successfully binding
   238  // to it's port. To allow the start to be successful, we first close those
   239  // connections. The call to close will transition the Member into
   240  // terminatedState and the start call will be directed to
   241  // terminatedState.start().
   242  func (s *stoppedState) start(m *Member, errCh chan<- error) error {
   243  	if err := s.close(m); err != nil {
   244  		return err
   245  	}
   246  	return m.start(errCh)
   247  }
   248  
   249  // stop will return nil without making any changes, this is because a Member
   250  // that is already stopped should not have another stop attempted.
   251  func (s *stoppedState) stop(_ *Member) error {
   252  	return nil
   253  }
   254  
   255  // String returns the string representation of the stoppedState name.
   256  func (s *stoppedState) String() string {
   257  	return stopped
   258  }
   259  
   260  // terminatedState is the state a Member will be in when it has been closed.
   261  type terminatedState struct{ defaultState }
   262  
   263  // stop will return nil without making any changes, this is because a Member
   264  // that is already closed should not have a stop attempted.
   265  func (t *terminatedState) stop(_ *Member) error {
   266  	return nil
   267  }
   268  
   269  // close will return nil without making any changes, this is because a closed
   270  // Member should not  be closed again.
   271  func (t *terminatedState) close(_ *Member) error {
   272  	return nil
   273  }
   274  
   275  // String returns the string representation of the terminatedState name.
   276  func (t *terminatedState) String() string {
   277  	return terminated
   278  }
   279  
   280  // intiialize prepares the etcd member to join a new Cluster by loading the
   281  // configuration options from the config provided via the Member constructor
   282  // call.
   283  func (m *Member) initialize() {
   284  	m.config.embed = embed.NewConfig()
   285  	m.config.embed.Name = m.config.Name
   286  
   287  	m.config.embed.Dir = strings.Join([]string{"default.etcd", m.config.Name}, ".")
   288  
   289  	m.config.embed.LogLevel = m.config.LogLevel
   290  	if m.config.embed.LogLevel == "" {
   291  		m.config.embed.LogLevel = "error"
   292  	}
   293  }
   294  
   295  // prepareNew will delete any existing etcd state present for the Member from
   296  // the filesystem and ensure that the Member has a port on localhost to use if a
   297  // host port combinations were not provided via the config for the client or
   298  // peer URLs.
   299  func (m *Member) prepareNew() error {
   300  	fs := afero.NewOsFs()
   301  	if err := fs.RemoveAll(m.config.embed.Dir); err != nil {
   302  		return err
   303  	}
   304  	return m.configureURLs(m.config.PeerURL, m.config.ClientURL)
   305  }
   306  
   307  // configureURLs assigns the host:port combination to the member. If the
   308  // host/port has been provided for the client/peer URLs then these will be used,
   309  // otherwise the default host will be localhost and the port will be selected at
   310  // random from the available open ports.
   311  func (m *Member) configureURLs(peerURL *url.URL, clientURL *url.URL) error {
   312  	if err := m.peerAddress.assign(peerURL); err != nil {
   313  		return err
   314  	}
   315  	if err := m.clientAddress.assign(clientURL); err != nil {
   316  		return err
   317  	}
   318  
   319  	m.config.embed.ListenPeerUrls = []url.URL{m.peerAddress.url()}
   320  	m.config.embed.ListenClientUrls = []url.URL{m.clientAddress.url()}
   321  	m.config.embed.AdvertisePeerUrls = []url.URL{m.peerAddress.url()}
   322  	m.config.embed.AdvertiseClientUrls = []url.URL{m.clientAddress.url()}
   323  	return nil
   324  }
   325  
   326  // waitForServer waits for 30 seconds for the etcd instance to send a message to
   327  // the ReadyNotify channel. If the instance does not become healthy within this
   328  // timeframe, an error is returned.
   329  func (m *Member) waitForServer() error {
   330  	select {
   331  	case <-m.etcd.Server.ReadyNotify():
   332  		return nil
   333  	case <-time.After(30 * time.Second):
   334  		return errors.New("failed to start server")
   335  	}
   336  }
   337  
   338  // watchErr will watch for stop events and errors for the member. If errors are
   339  // found, they will be forwarded to the specified error channel (which is
   340  // normally a shared Cluster error channel). If a stop event is found, then the
   341  // function ends to ensure it does not outlive the current etcd instance.
   342  func (m *Member) watchErr(errCh chan<- error) {
   343  	for {
   344  		select {
   345  		case <-m.etcd.Server.StopNotify():
   346  			return
   347  		case err := <-m.etcd.Err():
   348  			errCh <- err
   349  		}
   350  	}
   351  }
   352  

View as plain text