1
2
3
4
5
6 package integration
7
8 import (
9 "bytes"
10 "context"
11 "encoding/json"
12 "fmt"
13 "os"
14 "strings"
15 "testing"
16 "time"
17
18 nodemeta "edge-infra.dev/pkg/sds/ien/node"
19 "edge-infra.dev/test/f2"
20 "edge-infra.dev/test/f2/integration"
21 "edge-infra.dev/test/f2/x/ktest"
22
23 "github.com/stretchr/testify/assert"
24 "github.com/stretchr/testify/require"
25 corev1 "k8s.io/api/core/v1"
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/apimachinery/pkg/watch"
31 "k8s.io/client-go/kubernetes"
32 "k8s.io/client-go/kubernetes/scheme"
33 "k8s.io/client-go/rest"
34 "k8s.io/client-go/tools/remotecommand"
35 )
36
37
38
39 var (
40 f f2.Framework
41 operatorNamespace = "etcd-operator"
42
43 expectedFiles = []string{}
44 controlplane = "controlplane"
45 worker = "worker"
46 )
47
48 var etcdctlCmd = "etcdctl --endpoints=https://127.0.0.1:2379/ --cacert=/etc/kubernetes/pki/etcd/ca.crt --cert=/etc/kubernetes/pki/etcd/server.crt --key=/etc/kubernetes/pki/etcd/server.key"
49
50 func TestMain(m *testing.M) {
51 if runIntTest, exists := os.LookupEnv("RUN_ETCD_OPERATOR_INT_TEST"); !exists || strings.ToLower(runIntTest) != "true" {
52 fmt.Println("Skipping integration test. Set RUN_ETCD_OPERATOR_INT_TEST=True to run")
53 return
54 }
55
56 f = f2.New(
57 context.Background(),
58 f2.WithExtensions(
59 ktest.New(),
60 ),
61 ).
62 Setup(func(ctx f2.Context) (f2.Context, error) {
63
64
65 return ctx, nil
66 }).Slow()
67
68 os.Exit(f.Run(m))
69 }
70
71 func TestEtcdOperator(t *testing.T) {
72 var nodes *corev1.NodeList
73 var testPod *corev1.Pod
74 var k *ktest.K8s
75 var clientset *kubernetes.Clientset
76
77
78 integration.SkipIfNot(t, integration.L2)
79
80 etcdOperatorSanity := f2.NewFeature("Etcd Operator sanity checks").
81 Setup("Setup environment", func(ctx f2.Context, t *testing.T) f2.Context {
82 k = ktest.FromContextT(ctx, t)
83 clientset, _ = kubernetes.NewForConfig(k.Env.Config)
84
85 return ctx
86 }).
87 Setup("Verifying etcd cluster is initially healthy", func(ctx f2.Context, t *testing.T) f2.Context {
88 var (
89 v1 = clientset.CoreV1()
90 )
91 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
92 for _, node := range nodes.Items {
93 if node.GetLabels()[nodemeta.RoleLabel] == controlplane {
94 podName := "etcd-" + node.Name
95 out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, "etcd", "kube-system", etcdctlCmd+" endpoint health --cluster -w json")
96 require.NoError(t, err)
97
98 var etcdMemberStates []map[string]interface{}
99 err = json.Unmarshal([]byte(out), &etcdMemberStates)
100 require.NoError(t, err)
101
102 for _, etcdMemberState := range etcdMemberStates {
103 health, ok := etcdMemberState["health"].(bool)
104 assert.True(t, ok)
105 endpoint, ok := etcdMemberState["endpoint"].(string)
106 assert.True(t, ok)
107 fmt.Printf("Endpoint %s is healthy: %t\n", endpoint, health)
108 }
109
110 fmt.Println("Verified initial etcd cluster health")
111 }
112 }
113 return ctx
114 }).
115 Setup("Setup test pods on worker nodes", func(ctx f2.Context, t *testing.T) f2.Context {
116 var (
117 v1 = clientset.CoreV1()
118 )
119 decode := scheme.Codecs.UniversalDeserializer().Decode
120 stream, _ := os.ReadFile("testdata/etcd-operator-test-pod.yaml")
121 obj, gVK, _ := decode(stream, nil, nil)
122 if gVK.Kind == "Pod" {
123 testPod = obj.(*corev1.Pod)
124 }
125
126 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
127 for _, node := range nodes.Items {
128 if node.GetLabels()[nodemeta.RoleLabel] == worker {
129 copyTestPod := testPod.DeepCopy()
130 copyTestPod.Spec.NodeName = node.Name
131 copyTestPod.ObjectMeta.Name = fmt.Sprintf("%s-%s", copyTestPod.ObjectMeta.Name, node.Name)
132 _, err := v1.Pods(k.Namespace).Create(ctx, copyTestPod, metav1.CreateOptions{})
133 if err != nil {
134 require.NoError(t, err)
135 }
136 fmt.Printf("Created test pod on node %s\n", node.Name)
137 }
138 }
139 return ctx
140 }).
141 Setup("Deleting all workers cert secrets", func(ctx f2.Context, t *testing.T) f2.Context {
142 var (
143 v1 = clientset.CoreV1()
144 )
145 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
146 for _, node := range nodes.Items {
147 if node.GetLabels()[nodemeta.RoleLabel] == worker {
148 _, err := v1.Secrets(operatorNamespace).Get(ctx, node.Name, metav1.GetOptions{})
149 if errors.IsNotFound(err) {
150 fmt.Printf("Secret not found for node %s\n", node.Name)
151 continue
152 } else if err != nil {
153 require.NoError(t, err)
154 }
155
156 err = v1.Secrets(operatorNamespace).Delete(ctx, node.Name, metav1.DeleteOptions{})
157 require.NoError(t, err)
158 fmt.Printf("Deleted secret for node %s\n", node.Name)
159 }
160 }
161 return ctx
162 }).
163 Setup("Prepare worker nodes for testing", func(ctx f2.Context, t *testing.T) f2.Context {
164 var (
165 v1 = clientset.CoreV1()
166 )
167 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
168 for _, node := range nodes.Items {
169 if node.GetLabels()[nodemeta.RoleLabel] == worker {
170 podName := fmt.Sprintf("%s-%s", testPod.ObjectMeta.Name, node.Name)
171 containerName := testPod.Spec.Containers[0].Name
172 _, err := WaitForPodReady(ctx, clientset, podName, k.Namespace)
173 require.NoError(t, err)
174
175 _, err = ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, deleteAllExpectedFiles())
176 require.NoError(t, err)
177
178 out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, "find /etc/kubernetes")
179 require.NoError(t, err)
180 foundFiles := strings.Fields(out)
181 for _, foundFile := range foundFiles {
182 assert.NotContains(t, expectedFiles, foundFile)
183 }
184 fmt.Printf("Deleted expected files from node %s\n", node.Name)
185 }
186 }
187 return ctx
188 }).
189 Setup("Start test", func(ctx f2.Context, t *testing.T) f2.Context {
190 var (
191 v1 = clientset.CoreV1()
192 )
193 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
194 for _, node := range nodes.Items {
195 if node.GetLabels()[nodemeta.RoleLabel] == worker {
196
197 if node.Annotations == nil {
198 node.Annotations = make(map[string]string)
199 }
200 node.Annotations["etcd-phase"] = "clean"
201 node := node
202 _, err := clientset.CoreV1().Nodes().Update(ctx, &node, metav1.UpdateOptions{})
203 require.NoError(t, err)
204 fmt.Printf("Annotated node %s\n", node.Name)
205 }
206 }
207 return ctx
208 }).
209 Test("Verify secrets were created", func(ctx f2.Context, t *testing.T) f2.Context {
210 var (
211 v1 = clientset.CoreV1()
212 )
213 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
214 for _, node := range nodes.Items {
215 if node.GetLabels()[nodemeta.RoleLabel] == worker {
216 err := WaitForNodeAnnotationWithRetry(ctx, clientset, 5*time.Minute, node.Name, "etcd-phase", "Provisioned")
217 require.NoError(t, err)
218
219 _, err = v1.Secrets(operatorNamespace).Get(ctx, node.Name, metav1.GetOptions{})
220 require.NoError(t, err)
221 fmt.Printf("Verified secret was created for node %s\n", node.Name)
222 }
223 }
224 return ctx
225 }).
226 Test("Verify expected files were copied", func(ctx f2.Context, t *testing.T) f2.Context {
227 var (
228 v1 = clientset.CoreV1()
229 )
230 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
231 for _, node := range nodes.Items {
232 if node.GetLabels()[nodemeta.RoleLabel] == worker {
233 err := WaitForNodeAnnotationWithRetry(ctx, clientset, 5*time.Minute, node.Name, "etcd-phase", "Installed")
234 require.NoError(t, err)
235
236 podName := fmt.Sprintf("%s-%s", testPod.ObjectMeta.Name, node.Name)
237 containerName := testPod.Spec.Containers[0].Name
238
239 out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, "find /etc/kubernetes")
240 require.NoError(t, err)
241 foundFiles := strings.Fields(out)
242 for _, expectedFile := range expectedFiles {
243 assert.Contains(t, foundFiles, expectedFile)
244 }
245 fmt.Printf("Verified expected files were created for node %s\n", node.Name)
246 }
247 }
248 return ctx
249 }).
250 Test("Verifying etcd cluster is healthy after configuration", func(ctx f2.Context, t *testing.T) f2.Context {
251 var (
252 v1 = clientset.CoreV1()
253 )
254 nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
255 for _, node := range nodes.Items {
256 if node.GetLabels()[nodemeta.RoleLabel] == controlplane {
257 podName := "etcd-" + node.Name
258 out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, "etcd", "kube-system", etcdctlCmd+" endpoint health --cluster -w json")
259 require.NoError(t, err)
260
261 var etcdMemberStates []map[string]interface{}
262 err = json.Unmarshal([]byte(out), &etcdMemberStates)
263 require.NoError(t, err)
264
265 for _, etcdMemberState := range etcdMemberStates {
266 health, ok := etcdMemberState["health"].(bool)
267 assert.True(t, ok)
268 endpoint, ok := etcdMemberState["endpoint"].(string)
269 assert.True(t, ok)
270 fmt.Printf("Endpoint %s is healthy: %t\n", endpoint, health)
271 assert.True(t, health)
272 }
273
274 fmt.Println("Verified etcd cluster is healthy after configuration")
275 }
276 }
277 return ctx
278 }).
279 Teardown("Clean up cluster", func(ctx f2.Context, t *testing.T) f2.Context {
280 err := clientset.CoreV1().Namespaces().Delete(ctx, k.Namespace, metav1.DeleteOptions{})
281 assert.NoError(t, err)
282 fmt.Println("Deleted test namespace")
283 return ctx
284 }).
285 Feature()
286
287
288 f.Test(t, etcdOperatorSanity)
289 }
290
291 func deleteAllExpectedFiles() (cmd string) {
292 for _, file := range expectedFiles {
293 cmd += fmt.Sprintf("rm -rf %s; ", file)
294 }
295 return
296 }
297
298
299 func ExecCmd(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, namespace, command string) (string, error) {
300
301 req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).
302 Namespace(namespace).SubResource("exec").Param("container", containerName)
303
304
305 option := &v1.PodExecOptions{
306 Command: []string{"sh", "-c", command},
307 Stdout: true,
308 Stderr: true,
309 }
310 req.VersionedParams(option, scheme.ParameterCodec)
311
312
313 exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
314 if err != nil {
315 return "", err
316 }
317
318
319 out := &bytes.Buffer{}
320 errOut := &bytes.Buffer{}
321
322 streamOptions := remotecommand.StreamOptions{
323 Stdout: out,
324 Stderr: errOut,
325 }
326
327
328 err = exec.StreamWithContext(ctx, streamOptions)
329 if err != nil {
330 return "", err
331 }
332
333 if errOut.Len() > 0 {
334 return "", fmt.Errorf("command error: %s", errOut.String())
335 }
336
337 return out.String(), nil
338 }
339
340
341 func WaitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace string) (*v1.Pod, error) {
342
343 pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
344 if err == nil {
345
346 if pod.Status.Phase == v1.PodRunning {
347 return pod, nil
348 }
349 }
350
351 watchOptions := metav1.ListOptions{
352 FieldSelector: fmt.Sprintf("metadata.name=%s", podName),
353 }
354 watcher, err := clientset.CoreV1().Pods(namespace).Watch(ctx, watchOptions)
355 if err != nil {
356 return nil, err
357 }
358
359
360 timeout := time.After(1 * time.Minute)
361
362 ch := watcher.ResultChan()
363
364
365 for {
366 select {
367 case event, ok := <-ch:
368 if !ok {
369 return nil, fmt.Errorf("closed channel")
370 }
371 pod, ok := event.Object.(*v1.Pod)
372 if !ok {
373 return nil, fmt.Errorf("unexpected type")
374 }
375 switch event.Type {
376 case watch.Added, watch.Modified:
377 if pod.Status.Phase == v1.PodRunning {
378
379 return pod, nil
380 }
381 case watch.Deleted:
382 return nil, fmt.Errorf("test pod was deleted")
383 case watch.Error:
384 return nil, fmt.Errorf("error with test pod")
385 default:
386 return nil, fmt.Errorf("unexpected event type: %v", event.Type)
387 }
388 case <-timeout:
389
390 return nil, fmt.Errorf("timeout waiting for test pods to be ready")
391 }
392 }
393 }
394
395 func WaitForNodeAnnotationWithRetry(ctx context.Context, clientset *kubernetes.Clientset, timeout time.Duration, nodeName, annotationKey, annotationValue string) (err error) {
396 ctx, cancel := context.WithTimeout(ctx, timeout)
397 defer cancel()
398
399 err = wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {
400 node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
401 if err != nil {
402 return false, err
403 }
404 if node.GetAnnotations()[annotationKey] == annotationValue {
405 return true, nil
406 }
407 return false, nil
408 })
409 return
410 }
411
View as plain text