1 package embed 2 3 import ( 4 "errors" 5 "fmt" 6 "net/url" 7 "strings" 8 "time" 9 10 "github.com/spf13/afero" 11 "go.etcd.io/etcd/server/v3/embed" 12 ) 13 14 const ( 15 preStart = "pre-start" 16 running = "running" 17 stopped = "stopped" 18 terminated = "terminated" 19 ) 20 21 // Member represents an etcd member 22 type Member struct { 23 state memberState 24 25 etcd *embed.Etcd 26 config *Config 27 clientAddress assignedAddress 28 peerAddress assignedAddress 29 } 30 31 // NewMember returns a new member object that can be added to a Cluster. If no 32 // Name is set in the config provided then an error will be returned. 33 func NewMember(config *Config) (*Member, error) { 34 if config.Name == "" { 35 return nil, errors.New("invalid member config: no member name provided") 36 } 37 38 config.embed = &embed.Config{} 39 if config.PeerURL == nil { 40 config.PeerURL = &url.URL{} 41 } 42 if config.ClientURL == nil { 43 config.ClientURL = &url.URL{} 44 } 45 46 return &Member{ 47 state: &preStartState{}, 48 config: config, 49 peerAddress: assignedAddress{}, 50 clientAddress: assignedAddress{}, 51 }, nil 52 } 53 54 // ClientURL returns the URL to be used by clients to establish a connection 55 // with the etcd member. 56 func (m *Member) ClientURL() *url.URL { 57 if len(m.config.embed.ListenClientUrls) == 0 { 58 return nil 59 } 60 return &m.config.embed.ListenClientUrls[0] 61 } 62 63 // PeerURL returns the URL to be used by other etcd members to establish a 64 // connection with the etcd member. 65 func (m *Member) PeerURL() *url.URL { 66 if len(m.config.embed.ListenPeerUrls) == 0 { 67 return nil 68 } 69 return &m.config.embed.ListenPeerUrls[0] 70 } 71 72 // memberState is an interface that represents the methods that behave 73 // differently for the Member depending on the state of the Member. 74 type memberState interface { 75 prepare(*Member) error 76 start(*Member, chan<- error) error 77 stop(*Member) error 78 close(*Member) error 79 String() string 80 } 81 82 // prepare is responsible for preparing the Member to be started. This involves 83 // adding some extra configuration and ensuring the Member has everything it 84 // needs to start it's etcd instance. 85 // 86 // prepare will direct the call to the correct method depending on the current 87 // state of the Member. For more information about the possible prepare method 88 // functionality, see defaultState, preStartState, runningState, stoppedState, 89 // terminatedState 90 func (m *Member) prepare() (err error) { 91 return m.state.prepare(m) 92 } 93 94 // start is responsible for creating an etcd server and starting the Member's 95 // etcd instance. 96 // 97 // start will direct the call to the correct method depending on the current 98 // state of the Member. For more information about the possible start method 99 // functionality, see defaultState, preStartState, runningState, stoppedState, 100 // terminatedState 101 func (m *Member) start(errCh chan<- error) error { 102 return m.state.start(m, errCh) 103 } 104 105 // stop is responsible for stopping the Member's etcd instance. 106 // 107 // stop will direct the call to the correct method depending on the current 108 // state of the Member. For more information about the possible stop method 109 // functionality, see defaultState, preStartState, runningState, stoppedState, 110 // terminatedState 111 func (m *Member) stop() error { 112 return m.state.stop(m) 113 } 114 115 // close is responsible for closing the Member's etcd instance and cleaning up 116 // any lingering servers and listeners. 117 // 118 // close will direct the call to the correct method depending on the current 119 // state of the Member. For more information about the possible close method 120 // functionality, see defaultState, preStartState, runningState, stoppedState, 121 // terminatedState 122 func (m *Member) close() error { 123 return m.state.close(m) 124 } 125 126 // defaultState is the base/default state used for the Member, this is generally 127 // the most common functionality to all states. 128 type defaultState struct{} 129 130 // prepare returns nil as in most cases we want prepare to do nothing as a 131 // default. 132 func (d *defaultState) prepare(_ *Member) error { 133 return nil 134 } 135 136 // start will start the etcd instance for the Member. It will then wait for 20 137 // seconds for the instance to report back that it is ready. If the instance 138 // does not report ready in this time, then the instance will be stopped and the 139 // function will fail. 140 // 141 // A separate go routine is started to monitor errors in the etcd instance 142 // during runtime and send them back to the provided error channel. 143 // 144 // If the method is successful then the Member is transitioned to runningState. 145 func (d *defaultState) start(m *Member, errCh chan<- error) error { 146 e, err := embed.StartEtcd(m.config.embed) 147 if err != nil { 148 return fmt.Errorf("failed to start etcd member %s: %w", m.config.Name, err) 149 } 150 m.etcd = e 151 152 if err := m.waitForServer(); err != nil { 153 m.etcd.Server.Stop() 154 return err 155 } 156 157 go m.watchErr(errCh) 158 159 m.state = &runningState{} 160 return nil 161 } 162 163 // stop will stop the Member's etcd instance and transition the Member into 164 // stoppedState. 165 func (d *defaultState) stop(m *Member) error { //nolint:unparam 166 m.etcd.Server.Stop() 167 m.state = &stoppedState{} 168 return nil 169 } 170 171 // close will close the Member's etcd instance and then transition the Member 172 // into terminatedState. 173 func (d *defaultState) close(m *Member) error { //nolint:unparam 174 m.etcd.Close() 175 m.state = &terminatedState{} 176 return nil 177 } 178 179 // String returns the string representation of the defaultState name. This is an 180 // empty string as this state should never be used directly. 181 func (d *defaultState) String() string { 182 return "" 183 } 184 185 // preStartState is the state a Member will be in when it is first created, 186 // before it has been started for the first time. 187 type preStartState struct{ defaultState } 188 189 // prepare updates the config values for the Member and prepares the config for 190 // a start of the etcd instance. A host:port combination will be reserved for 191 // the member to ensure that it has an available host:port when it is started. 192 // Within preStartState is the only time we want the prepare method to run. 193 func (p *preStartState) prepare(m *Member) (err error) { 194 m.initialize() 195 return m.prepareNew() 196 } 197 198 // stop returns nil without making any changes. This is because a Member in 199 // preStartState has not been started yet and so can not be stopped. An attempt 200 // to do so would make the application hang indefinitely. 201 func (p *preStartState) stop(_ *Member) error { 202 return nil 203 } 204 205 // close returns nil without making any changes because a Member that has not 206 // been started has no reason to be closed. 207 func (p *preStartState) close(_ *Member) error { 208 return nil 209 } 210 211 // String returns the string representation of the preStartState name. 212 func (p *preStartState) String() string { 213 return preStart 214 } 215 216 // runningState is the state a Member will be in when it has been successfully 217 // started. 218 type runningState struct{ defaultState } 219 220 // start returns nil without making any changes because a Member that has 221 // already been started should not try to start again. 222 func (r *runningState) start(_ *Member, _ chan<- error) error { 223 return nil 224 } 225 226 // String returns the string representation of the runningState name. 227 func (r *runningState) String() string { 228 return running 229 } 230 231 // stoppedState is the state a Member will be in when it has been stopped, but 232 // not closed. 233 type stoppedState struct{ defaultState } 234 235 // start will close the Member before calling start on the Member. Due to the 236 // nature of how stop works, it will leave lingering connections from the 237 // previous server instance which stop the new start from successfully binding 238 // to it's port. To allow the start to be successful, we first close those 239 // connections. The call to close will transition the Member into 240 // terminatedState and the start call will be directed to 241 // terminatedState.start(). 242 func (s *stoppedState) start(m *Member, errCh chan<- error) error { 243 if err := s.close(m); err != nil { 244 return err 245 } 246 return m.start(errCh) 247 } 248 249 // stop will return nil without making any changes, this is because a Member 250 // that is already stopped should not have another stop attempted. 251 func (s *stoppedState) stop(_ *Member) error { 252 return nil 253 } 254 255 // String returns the string representation of the stoppedState name. 256 func (s *stoppedState) String() string { 257 return stopped 258 } 259 260 // terminatedState is the state a Member will be in when it has been closed. 261 type terminatedState struct{ defaultState } 262 263 // stop will return nil without making any changes, this is because a Member 264 // that is already closed should not have a stop attempted. 265 func (t *terminatedState) stop(_ *Member) error { 266 return nil 267 } 268 269 // close will return nil without making any changes, this is because a closed 270 // Member should not be closed again. 271 func (t *terminatedState) close(_ *Member) error { 272 return nil 273 } 274 275 // String returns the string representation of the terminatedState name. 276 func (t *terminatedState) String() string { 277 return terminated 278 } 279 280 // intiialize prepares the etcd member to join a new Cluster by loading the 281 // configuration options from the config provided via the Member constructor 282 // call. 283 func (m *Member) initialize() { 284 m.config.embed = embed.NewConfig() 285 m.config.embed.Name = m.config.Name 286 287 m.config.embed.Dir = strings.Join([]string{"default.etcd", m.config.Name}, ".") 288 289 m.config.embed.LogLevel = m.config.LogLevel 290 if m.config.embed.LogLevel == "" { 291 m.config.embed.LogLevel = "error" 292 } 293 } 294 295 // prepareNew will delete any existing etcd state present for the Member from 296 // the filesystem and ensure that the Member has a port on localhost to use if a 297 // host port combinations were not provided via the config for the client or 298 // peer URLs. 299 func (m *Member) prepareNew() error { 300 fs := afero.NewOsFs() 301 if err := fs.RemoveAll(m.config.embed.Dir); err != nil { 302 return err 303 } 304 return m.configureURLs(m.config.PeerURL, m.config.ClientURL) 305 } 306 307 // configureURLs assigns the host:port combination to the member. If the 308 // host/port has been provided for the client/peer URLs then these will be used, 309 // otherwise the default host will be localhost and the port will be selected at 310 // random from the available open ports. 311 func (m *Member) configureURLs(peerURL *url.URL, clientURL *url.URL) error { 312 if err := m.peerAddress.assign(peerURL); err != nil { 313 return err 314 } 315 if err := m.clientAddress.assign(clientURL); err != nil { 316 return err 317 } 318 319 m.config.embed.ListenPeerUrls = []url.URL{m.peerAddress.url()} 320 m.config.embed.ListenClientUrls = []url.URL{m.clientAddress.url()} 321 m.config.embed.AdvertisePeerUrls = []url.URL{m.peerAddress.url()} 322 m.config.embed.AdvertiseClientUrls = []url.URL{m.clientAddress.url()} 323 return nil 324 } 325 326 // waitForServer waits for 30 seconds for the etcd instance to send a message to 327 // the ReadyNotify channel. If the instance does not become healthy within this 328 // timeframe, an error is returned. 329 func (m *Member) waitForServer() error { 330 select { 331 case <-m.etcd.Server.ReadyNotify(): 332 return nil 333 case <-time.After(30 * time.Second): 334 return errors.New("failed to start server") 335 } 336 } 337 338 // watchErr will watch for stop events and errors for the member. If errors are 339 // found, they will be forwarded to the specified error channel (which is 340 // normally a shared Cluster error channel). If a stop event is found, then the 341 // function ends to ensure it does not outlive the current etcd instance. 342 func (m *Member) watchErr(errCh chan<- error) { 343 for { 344 select { 345 case <-m.etcd.Server.StopNotify(): 346 return 347 case err := <-m.etcd.Err(): 348 errCh <- err 349 } 350 } 351 } 352