// To run this test you can: // 1) Ensure you have a tunnel setup to port 6443 of the control plane. // 2) Point your local KUBECONFIG to the correct kubeconfig for the cluster // 3) Run the following command from the root of the repo: // bazel test --nocache_test_results //pkg/sds/k8s/controllers/etcdoperator/internal/integration:integration_test --config=integration --test_arg=-integration-level=2 --test_env=KUBECONFIG --test_env=RUN_ETCD_OPERATOR_INT_TEST=True --test_timeout=900 package integration import ( "bytes" "context" "encoding/json" "fmt" "os" "strings" "testing" "time" nodemeta "edge-infra.dev/pkg/sds/ien/node" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/integration" "edge-infra.dev/test/f2/x/ktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" ) // TODO: Use EtcdMember CR instead of node annotations var ( f f2.Framework operatorNamespace = "etcd-operator" // TODO: Get this from EtcdMember CR expectedFiles = []string{} controlplane = "controlplane" worker = "worker" ) var etcdctlCmd = "etcdctl --endpoints=https://127.0.0.1:2379/ --cacert=/etc/kubernetes/pki/etcd/ca.crt --cert=/etc/kubernetes/pki/etcd/server.crt --key=/etc/kubernetes/pki/etcd/server.key" func TestMain(m *testing.M) { if runIntTest, exists := os.LookupEnv("RUN_ETCD_OPERATOR_INT_TEST"); !exists || strings.ToLower(runIntTest) != "true" { fmt.Println("Skipping integration test. Set RUN_ETCD_OPERATOR_INT_TEST=True to run") return } // Set up test framework in TestMain f = f2.New( context.Background(), f2.WithExtensions( ktest.New(), ), ). Setup(func(ctx f2.Context) (f2.Context, error) { // Dummy setup funcion. Each Setup function runs once before all // tests in the framework are run return ctx, nil }).Slow() os.Exit(f.Run(m)) } func TestEtcdOperator(t *testing.T) { var nodes *corev1.NodeList var testPod *corev1.Pod var k *ktest.K8s var clientset *kubernetes.Clientset // Test execution should end here unless -integration-level=2 is passed to test integration.SkipIfNot(t, integration.L2) etcdOperatorSanity := f2.NewFeature("Etcd Operator sanity checks"). Setup("Setup environment", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) clientset, _ = kubernetes.NewForConfig(k.Env.Config) return ctx }). Setup("Verifying etcd cluster is initially healthy", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == controlplane { podName := "etcd-" + node.Name out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, "etcd", "kube-system", etcdctlCmd+" endpoint health --cluster -w json") require.NoError(t, err) var etcdMemberStates []map[string]interface{} err = json.Unmarshal([]byte(out), &etcdMemberStates) require.NoError(t, err) for _, etcdMemberState := range etcdMemberStates { health, ok := etcdMemberState["health"].(bool) assert.True(t, ok) endpoint, ok := etcdMemberState["endpoint"].(string) assert.True(t, ok) fmt.Printf("Endpoint %s is healthy: %t\n", endpoint, health) } fmt.Println("Verified initial etcd cluster health") } } return ctx }). Setup("Setup test pods on worker nodes", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) decode := scheme.Codecs.UniversalDeserializer().Decode stream, _ := os.ReadFile("testdata/etcd-operator-test-pod.yaml") obj, gVK, _ := decode(stream, nil, nil) if gVK.Kind == "Pod" { testPod = obj.(*corev1.Pod) } nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == worker { copyTestPod := testPod.DeepCopy() copyTestPod.Spec.NodeName = node.Name copyTestPod.ObjectMeta.Name = fmt.Sprintf("%s-%s", copyTestPod.ObjectMeta.Name, node.Name) _, err := v1.Pods(k.Namespace).Create(ctx, copyTestPod, metav1.CreateOptions{}) if err != nil { require.NoError(t, err) } fmt.Printf("Created test pod on node %s\n", node.Name) } } return ctx }). Setup("Deleting all workers cert secrets", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == worker { _, err := v1.Secrets(operatorNamespace).Get(ctx, node.Name, metav1.GetOptions{}) if errors.IsNotFound(err) { fmt.Printf("Secret not found for node %s\n", node.Name) continue } else if err != nil { require.NoError(t, err) } err = v1.Secrets(operatorNamespace).Delete(ctx, node.Name, metav1.DeleteOptions{}) require.NoError(t, err) fmt.Printf("Deleted secret for node %s\n", node.Name) } } return ctx }). Setup("Prepare worker nodes for testing", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == worker { podName := fmt.Sprintf("%s-%s", testPod.ObjectMeta.Name, node.Name) containerName := testPod.Spec.Containers[0].Name _, err := WaitForPodReady(ctx, clientset, podName, k.Namespace) require.NoError(t, err) _, err = ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, deleteAllExpectedFiles()) require.NoError(t, err) out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, "find /etc/kubernetes") require.NoError(t, err) foundFiles := strings.Fields(out) for _, foundFile := range foundFiles { assert.NotContains(t, expectedFiles, foundFile) } fmt.Printf("Deleted expected files from node %s\n", node.Name) } } return ctx }). Setup("Start test", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == worker { // modify the annotations map if node.Annotations == nil { node.Annotations = make(map[string]string) } node.Annotations["etcd-phase"] = "clean" node := node _, err := clientset.CoreV1().Nodes().Update(ctx, &node, metav1.UpdateOptions{}) require.NoError(t, err) fmt.Printf("Annotated node %s\n", node.Name) } } return ctx }). Test("Verify secrets were created", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == worker { err := WaitForNodeAnnotationWithRetry(ctx, clientset, 5*time.Minute, node.Name, "etcd-phase", "Provisioned") require.NoError(t, err) _, err = v1.Secrets(operatorNamespace).Get(ctx, node.Name, metav1.GetOptions{}) require.NoError(t, err) fmt.Printf("Verified secret was created for node %s\n", node.Name) } } return ctx }). Test("Verify expected files were copied", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == worker { err := WaitForNodeAnnotationWithRetry(ctx, clientset, 5*time.Minute, node.Name, "etcd-phase", "Installed") require.NoError(t, err) podName := fmt.Sprintf("%s-%s", testPod.ObjectMeta.Name, node.Name) containerName := testPod.Spec.Containers[0].Name out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, "find /etc/kubernetes") require.NoError(t, err) foundFiles := strings.Fields(out) for _, expectedFile := range expectedFiles { assert.Contains(t, foundFiles, expectedFile) } fmt.Printf("Verified expected files were created for node %s\n", node.Name) } } return ctx }). Test("Verifying etcd cluster is healthy after configuration", func(ctx f2.Context, t *testing.T) f2.Context { var ( v1 = clientset.CoreV1() ) nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{}) for _, node := range nodes.Items { if node.GetLabels()[nodemeta.RoleLabel] == controlplane { podName := "etcd-" + node.Name out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, "etcd", "kube-system", etcdctlCmd+" endpoint health --cluster -w json") require.NoError(t, err) var etcdMemberStates []map[string]interface{} err = json.Unmarshal([]byte(out), &etcdMemberStates) require.NoError(t, err) for _, etcdMemberState := range etcdMemberStates { health, ok := etcdMemberState["health"].(bool) assert.True(t, ok) endpoint, ok := etcdMemberState["endpoint"].(string) assert.True(t, ok) fmt.Printf("Endpoint %s is healthy: %t\n", endpoint, health) assert.True(t, health) } fmt.Println("Verified etcd cluster is healthy after configuration") } } return ctx }). Teardown("Clean up cluster", func(ctx f2.Context, t *testing.T) f2.Context { err := clientset.CoreV1().Namespaces().Delete(ctx, k.Namespace, metav1.DeleteOptions{}) assert.NoError(t, err) fmt.Println("Deleted test namespace") return ctx }). Feature() // Run the tests f.Test(t, etcdOperatorSanity) } func deleteAllExpectedFiles() (cmd string) { for _, file := range expectedFiles { cmd += fmt.Sprintf("rm -rf %s; ", file) } return } // ExecCmd executes a command in a pod and returns the output as a string func ExecCmd(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, namespace, command string) (string, error) { // Create a POST request for the pods/exec resource req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName). Namespace(namespace).SubResource("exec").Param("container", containerName) // Set the command and the standard streams option := &v1.PodExecOptions{ Command: []string{"sh", "-c", command}, Stdout: true, Stderr: true, } req.VersionedParams(option, scheme.ParameterCodec) // Create an executor for the request exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) if err != nil { return "", err } // Create a buffer to capture the output out := &bytes.Buffer{} errOut := &bytes.Buffer{} streamOptions := remotecommand.StreamOptions{ Stdout: out, Stderr: errOut, } // Stream the command execution and wait for it to finish err = exec.StreamWithContext(ctx, streamOptions) if err != nil { return "", err } if errOut.Len() > 0 { return "", fmt.Errorf("command error: %s", errOut.String()) } return out.String(), nil } // WaitForPodReady waits for a pod to be ready and returns it func WaitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace string) (*v1.Pod, error) { // Get the pod object pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) if err == nil { // Check if the pod is already ready if pod.Status.Phase == v1.PodRunning { return pod, nil } } // Create a watch for the pod watchOptions := metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", podName), } watcher, err := clientset.CoreV1().Pods(namespace).Watch(ctx, watchOptions) if err != nil { return nil, err } // Create a timeout channel timeout := time.After(1 * time.Minute) ch := watcher.ResultChan() // Loop over the watch events for { select { case event, ok := <-ch: if !ok { return nil, fmt.Errorf("closed channel") } pod, ok := event.Object.(*v1.Pod) if !ok { return nil, fmt.Errorf("unexpected type") } switch event.Type { case watch.Added, watch.Modified: if pod.Status.Phase == v1.PodRunning { // watcher.Stop() return pod, nil } case watch.Deleted: return nil, fmt.Errorf("test pod was deleted") case watch.Error: return nil, fmt.Errorf("error with test pod") default: return nil, fmt.Errorf("unexpected event type: %v", event.Type) } case <-timeout: // watcher.Stop() return nil, fmt.Errorf("timeout waiting for test pods to be ready") } } } func WaitForNodeAnnotationWithRetry(ctx context.Context, clientset *kubernetes.Clientset, timeout time.Duration, nodeName, annotationKey, annotationValue string) (err error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() err = wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) { node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return false, err } if node.GetAnnotations()[annotationKey] == annotationValue { return true, nil } return false, nil }) return }