1
16
17 package framework
18
19 import (
20 "context"
21 "flag"
22 "fmt"
23 "io"
24 "net"
25 "os"
26 "os/exec"
27 "strconv"
28 "strings"
29 "syscall"
30 "testing"
31 "time"
32
33 "go.uber.org/goleak"
34 "google.golang.org/grpc/grpclog"
35
36 "k8s.io/klog/v2"
37 "k8s.io/kubernetes/pkg/util/env"
38 )
39
40 const installEtcd = `
41 Cannot find etcd, cannot run integration tests
42 Please see https://git.k8s.io/community/contributors/devel/sig-testing/integration-tests.md#install-etcd-dependency for instructions.
43
44 You can use 'hack/install-etcd.sh' to install a copy in third_party/.
45
46 `
47
48
49 func getEtcdPath() (string, error) {
50 return exec.LookPath("etcd")
51 }
52
53
54 func getAvailablePort() (int, error) {
55 l, err := net.Listen("tcp", ":0")
56 if err != nil {
57 return 0, fmt.Errorf("could not bind to a port: %v", err)
58 }
59
60
61 defer l.Close()
62 return l.Addr().(*net.TCPAddr).Port, nil
63 }
64
65
66
67 func startEtcd(output io.Writer) (func(), error) {
68 etcdURL := env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
69 conn, err := net.Dial("tcp", strings.TrimPrefix(etcdURL, "http://"))
70 if err == nil {
71 klog.Infof("etcd already running at %s", etcdURL)
72 conn.Close()
73 return func() {}, nil
74 }
75 klog.V(1).Infof("could not connect to etcd: %v", err)
76
77 currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil, output)
78 if err != nil {
79 return nil, err
80 }
81
82 os.Setenv("KUBE_INTEGRATION_ETCD_URL", currentURL)
83
84 return stop, nil
85 }
86
87 func init() {
88
89
90
91
92
93
94 grpclog.SetLoggerV2(grpclog.NewLoggerV2(io.Discard, io.Discard, os.Stderr))
95 }
96
97
98 func RunCustomEtcd(dataDir string, customFlags []string, output io.Writer) (url string, stopFn func(), err error) {
99
100 etcdPath, err := getEtcdPath()
101 if err != nil {
102 fmt.Fprint(os.Stderr, installEtcd)
103 return "", nil, fmt.Errorf("could not find etcd in PATH: %v", err)
104 }
105 etcdPort, err := getAvailablePort()
106 if err != nil {
107 return "", nil, fmt.Errorf("could not get a port: %v", err)
108 }
109 customURL := fmt.Sprintf("http://127.0.0.1:%d", etcdPort)
110
111 klog.Infof("starting etcd on %s", customURL)
112
113 etcdDataDir, err := os.MkdirTemp(os.TempDir(), dataDir)
114 if err != nil {
115 return "", nil, fmt.Errorf("unable to make temp etcd data dir %s: %v", dataDir, err)
116 }
117 klog.Infof("storing etcd data in: %v", etcdDataDir)
118
119 ctx, cancel := context.WithCancel(context.Background())
120 args := []string{
121 "--data-dir",
122 etcdDataDir,
123 "--listen-client-urls",
124 customURL,
125 "--advertise-client-urls",
126 customURL,
127 "--listen-peer-urls",
128 "http://127.0.0.1:0",
129 "-log-level",
130 "warn",
131 "--quota-backend-bytes",
132 strconv.FormatInt(8*1024*1024*1024, 10),
133 }
134 args = append(args, customFlags...)
135 cmd := exec.CommandContext(ctx, etcdPath, args...)
136 if output == nil {
137 cmd.Stdout = os.Stdout
138 cmd.Stderr = os.Stderr
139 } else {
140 cmd.Stdout = output
141 cmd.Stderr = output
142 }
143 stop := func() {
144
145 defer cancel()
146 cmd.Process.Signal(syscall.SIGTERM)
147 go func() {
148 select {
149 case <-ctx.Done():
150 klog.Infof("etcd exited gracefully, context cancelled")
151 case <-time.After(5 * time.Second):
152 klog.Infof("etcd didn't exit in 5 seconds, killing it")
153 cancel()
154 }
155 }()
156 err := cmd.Wait()
157 klog.Infof("etcd exit status: %v", err)
158 err = os.RemoveAll(etcdDataDir)
159 if err != nil {
160 klog.Warningf("error during etcd cleanup: %v", err)
161 }
162 }
163
164 if err := cmd.Start(); err != nil {
165 return "", nil, fmt.Errorf("failed to run etcd: %v", err)
166 }
167
168 var i int32 = 1
169 const pollCount = int32(300)
170
171 for i <= pollCount {
172 conn, err := net.DialTimeout("tcp", strings.TrimPrefix(customURL, "http://"), 1*time.Second)
173 if err == nil {
174 conn.Close()
175 break
176 }
177
178 if i == pollCount {
179 stop()
180 return "", nil, fmt.Errorf("could not start etcd")
181 }
182
183 time.Sleep(100 * time.Millisecond)
184 i = i + 1
185 }
186
187 return customURL, stop, nil
188 }
189
190
191 func EtcdMain(tests func() int) {
192
193 flag.Parse()
194
195
196 goleakOpts := IgnoreBackgroundGoroutines()
197
198 goleakOpts = append(goleakOpts,
199
200
201
202
203
204
205
206
207
208 goleak.IgnoreTopFunction("k8s.io/kubernetes/vendor/gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
209 goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
210 )
211
212 stop, err := startEtcd(nil)
213 if err != nil {
214 klog.Fatalf("cannot run integration tests: unable to start etcd: %v", err)
215 }
216 result := tests()
217 stop()
218 klog.StopFlushDaemon()
219
220 if err := goleakFindRetry(goleakOpts...); err != nil {
221 klog.ErrorS(err, "EtcdMain goroutine check")
222 result = 1
223 }
224
225 os.Exit(result)
226 }
227
228
229 func GetEtcdURL() string {
230 return env.GetEnvAsStringOrFallback("KUBE_INTEGRATION_ETCD_URL", "http://127.0.0.1:2379")
231 }
232
233
234
235
236
237
238
239
240
241
242 func StartEtcd(tb testing.TB, etcdOutput io.Writer) {
243 stop, err := startEtcd(etcdOutput)
244 if err != nil {
245 tb.Fatalf("unable to start etcd: %v", err)
246 }
247 tb.Cleanup(stop)
248 }
249
View as plain text