...
1
16
17 package main
18
19 import (
20 "fmt"
21 "os"
22 "os/exec"
23 "strings"
24 "time"
25
26 "k8s.io/klog/v2"
27 )
28
29
30 type EtcdMigrateServer struct {
31 cfg *EtcdMigrateCfg
32 client EtcdMigrateClient
33 cmd *exec.Cmd
34 }
35
36
37 func NewEtcdMigrateServer(cfg *EtcdMigrateCfg, client EtcdMigrateClient) *EtcdMigrateServer {
38 return &EtcdMigrateServer{cfg: cfg, client: client}
39 }
40
41
42
43 func (r *EtcdMigrateServer) Start(version *EtcdVersion) error {
44 etcdCmd := exec.Command(
45 fmt.Sprintf("%s/etcd-%s", r.cfg.binPath, version),
46 "--name", r.cfg.name,
47 "--initial-cluster", r.cfg.initialCluster,
48 "--debug",
49 "--data-dir", r.cfg.dataDirectory,
50 "--listen-client-urls", r.cfg.clientListenUrls,
51 "--advertise-client-urls", fmt.Sprintf("http://127.0.0.1:%d", r.cfg.port),
52 "--listen-peer-urls", r.cfg.peerListenUrls,
53 "--initial-advertise-peer-urls", r.cfg.peerAdvertiseUrls,
54 )
55 if r.cfg.etcdServerArgs != "" {
56 extraArgs := strings.Fields(r.cfg.etcdServerArgs)
57 etcdCmd.Args = append(etcdCmd.Args, extraArgs...)
58 }
59 fmt.Printf("Starting server %s: %+v\n", r.cfg.name, etcdCmd.Args)
60
61 etcdCmd.Stdout = os.Stdout
62 etcdCmd.Stderr = os.Stderr
63 err := etcdCmd.Start()
64 if err != nil {
65 return err
66 }
67 interval := time.NewTicker(time.Millisecond * 500)
68 defer interval.Stop()
69 done := make(chan bool)
70 go func() {
71 time.Sleep(time.Minute * 2)
72 close(done)
73 }()
74 for {
75 select {
76 case <-interval.C:
77 err := r.client.SetEtcdVersionKeyValue(version)
78 if err != nil {
79 klog.Infof("Still waiting for etcd to start, current error: %v", err)
80
81 } else {
82 klog.Infof("Etcd on port %d is up.", r.cfg.port)
83 r.cmd = etcdCmd
84 return nil
85 }
86 case <-done:
87 err = etcdCmd.Process.Kill()
88 if err != nil {
89 return fmt.Errorf("error killing etcd: %v", err)
90 }
91 return fmt.Errorf("timed out waiting for etcd on port %d", r.cfg.port)
92 }
93 }
94 }
95
96
97
98 func (r *EtcdMigrateServer) Stop() error {
99 if r.cmd == nil {
100 return fmt.Errorf("cannot stop EtcdMigrateServer that has not been started")
101 }
102 err := r.cmd.Process.Signal(os.Interrupt)
103 if err != nil {
104 return fmt.Errorf("error sending SIGINT to etcd for graceful shutdown: %v", err)
105 }
106 gracefulWait := time.Minute * 2
107 stopped := make(chan bool)
108 timedout := make(chan bool)
109 go func() {
110 time.Sleep(gracefulWait)
111 close(timedout)
112 }()
113 go func() {
114 select {
115 case <-stopped:
116 return
117 case <-timedout:
118 klog.Infof("etcd server has not terminated gracefully after %s, killing it.", gracefulWait)
119 r.cmd.Process.Kill()
120 return
121 }
122 }()
123 err = r.cmd.Wait()
124 close(stopped)
125 if exiterr, ok := err.(*exec.ExitError); ok {
126 klog.Infof("etcd server stopped (signal: %s)", exiterr.Error())
127
128 } else if err != nil {
129 return fmt.Errorf("error waiting for etcd to stop: %v", err)
130 }
131 klog.Infof("Stopped etcd server %s", r.cfg.name)
132 return nil
133 }
134
View as plain text