...

Source file src/edge-infra.dev/pkg/sds/etcd/operator/internal/integration/e2e_test.go

Documentation: edge-infra.dev/pkg/sds/etcd/operator/internal/integration

     1  // To run this test you can:
     2  // 1) Ensure you have a tunnel setup to port 6443 of the control plane.
     3  // 2) Point your local KUBECONFIG to the correct kubeconfig for the cluster
     4  // 3) Run the following command from the root of the repo:
     5  // bazel test --nocache_test_results //pkg/sds/k8s/controllers/etcdoperator/internal/integration:integration_test --config=integration --test_arg=-integration-level=2 --test_env=KUBECONFIG --test_env=RUN_ETCD_OPERATOR_INT_TEST=True --test_timeout=900
     6  package integration
     7  
     8  import (
     9  	"bytes"
    10  	"context"
    11  	"encoding/json"
    12  	"fmt"
    13  	"os"
    14  	"strings"
    15  	"testing"
    16  	"time"
    17  
    18  	nodemeta "edge-infra.dev/pkg/sds/ien/node"
    19  	"edge-infra.dev/test/f2"
    20  	"edge-infra.dev/test/f2/integration"
    21  	"edge-infra.dev/test/f2/x/ktest"
    22  
    23  	"github.com/stretchr/testify/assert"
    24  	"github.com/stretchr/testify/require"
    25  	corev1 "k8s.io/api/core/v1"
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/apimachinery/pkg/watch"
    31  	"k8s.io/client-go/kubernetes"
    32  	"k8s.io/client-go/kubernetes/scheme"
    33  	"k8s.io/client-go/rest"
    34  	"k8s.io/client-go/tools/remotecommand"
    35  )
    36  
    37  // TODO: Use EtcdMember CR instead of node annotations
    38  
    39  var (
    40  	f                 f2.Framework
    41  	operatorNamespace = "etcd-operator"
    42  	// TODO: Get this from EtcdMember CR
    43  	expectedFiles = []string{}
    44  	controlplane  = "controlplane"
    45  	worker        = "worker"
    46  )
    47  
    48  var etcdctlCmd = "etcdctl --endpoints=https://127.0.0.1:2379/ --cacert=/etc/kubernetes/pki/etcd/ca.crt --cert=/etc/kubernetes/pki/etcd/server.crt --key=/etc/kubernetes/pki/etcd/server.key"
    49  
    50  func TestMain(m *testing.M) {
    51  	if runIntTest, exists := os.LookupEnv("RUN_ETCD_OPERATOR_INT_TEST"); !exists || strings.ToLower(runIntTest) != "true" {
    52  		fmt.Println("Skipping integration test. Set RUN_ETCD_OPERATOR_INT_TEST=True to run")
    53  		return
    54  	}
    55  	// Set up test framework in TestMain
    56  	f = f2.New(
    57  		context.Background(),
    58  		f2.WithExtensions(
    59  			ktest.New(),
    60  		),
    61  	).
    62  		Setup(func(ctx f2.Context) (f2.Context, error) {
    63  			// Dummy setup funcion. Each Setup function runs once before all
    64  			// tests in the framework are run
    65  			return ctx, nil
    66  		}).Slow()
    67  
    68  	os.Exit(f.Run(m))
    69  }
    70  
    71  func TestEtcdOperator(t *testing.T) {
    72  	var nodes *corev1.NodeList
    73  	var testPod *corev1.Pod
    74  	var k *ktest.K8s
    75  	var clientset *kubernetes.Clientset
    76  
    77  	// Test execution should end here unless -integration-level=2 is passed to test
    78  	integration.SkipIfNot(t, integration.L2)
    79  
    80  	etcdOperatorSanity := f2.NewFeature("Etcd Operator sanity checks").
    81  		Setup("Setup environment", func(ctx f2.Context, t *testing.T) f2.Context {
    82  			k = ktest.FromContextT(ctx, t)
    83  			clientset, _ = kubernetes.NewForConfig(k.Env.Config)
    84  
    85  			return ctx
    86  		}).
    87  		Setup("Verifying etcd cluster is initially healthy", func(ctx f2.Context, t *testing.T) f2.Context {
    88  			var (
    89  				v1 = clientset.CoreV1()
    90  			)
    91  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
    92  			for _, node := range nodes.Items {
    93  				if node.GetLabels()[nodemeta.RoleLabel] == controlplane {
    94  					podName := "etcd-" + node.Name
    95  					out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, "etcd", "kube-system", etcdctlCmd+" endpoint health --cluster -w json")
    96  					require.NoError(t, err)
    97  
    98  					var etcdMemberStates []map[string]interface{}
    99  					err = json.Unmarshal([]byte(out), &etcdMemberStates)
   100  					require.NoError(t, err)
   101  
   102  					for _, etcdMemberState := range etcdMemberStates {
   103  						health, ok := etcdMemberState["health"].(bool)
   104  						assert.True(t, ok)
   105  						endpoint, ok := etcdMemberState["endpoint"].(string)
   106  						assert.True(t, ok)
   107  						fmt.Printf("Endpoint %s is healthy: %t\n", endpoint, health)
   108  					}
   109  
   110  					fmt.Println("Verified initial etcd cluster health")
   111  				}
   112  			}
   113  			return ctx
   114  		}).
   115  		Setup("Setup test pods on worker nodes", func(ctx f2.Context, t *testing.T) f2.Context {
   116  			var (
   117  				v1 = clientset.CoreV1()
   118  			)
   119  			decode := scheme.Codecs.UniversalDeserializer().Decode
   120  			stream, _ := os.ReadFile("testdata/etcd-operator-test-pod.yaml")
   121  			obj, gVK, _ := decode(stream, nil, nil)
   122  			if gVK.Kind == "Pod" {
   123  				testPod = obj.(*corev1.Pod)
   124  			}
   125  
   126  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   127  			for _, node := range nodes.Items {
   128  				if node.GetLabels()[nodemeta.RoleLabel] == worker {
   129  					copyTestPod := testPod.DeepCopy()
   130  					copyTestPod.Spec.NodeName = node.Name
   131  					copyTestPod.ObjectMeta.Name = fmt.Sprintf("%s-%s", copyTestPod.ObjectMeta.Name, node.Name)
   132  					_, err := v1.Pods(k.Namespace).Create(ctx, copyTestPod, metav1.CreateOptions{})
   133  					if err != nil {
   134  						require.NoError(t, err)
   135  					}
   136  					fmt.Printf("Created test pod on node %s\n", node.Name)
   137  				}
   138  			}
   139  			return ctx
   140  		}).
   141  		Setup("Deleting all workers cert secrets", func(ctx f2.Context, t *testing.T) f2.Context {
   142  			var (
   143  				v1 = clientset.CoreV1()
   144  			)
   145  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   146  			for _, node := range nodes.Items {
   147  				if node.GetLabels()[nodemeta.RoleLabel] == worker {
   148  					_, err := v1.Secrets(operatorNamespace).Get(ctx, node.Name, metav1.GetOptions{})
   149  					if errors.IsNotFound(err) {
   150  						fmt.Printf("Secret not found for node %s\n", node.Name)
   151  						continue
   152  					} else if err != nil {
   153  						require.NoError(t, err)
   154  					}
   155  
   156  					err = v1.Secrets(operatorNamespace).Delete(ctx, node.Name, metav1.DeleteOptions{})
   157  					require.NoError(t, err)
   158  					fmt.Printf("Deleted secret for node %s\n", node.Name)
   159  				}
   160  			}
   161  			return ctx
   162  		}).
   163  		Setup("Prepare worker nodes for testing", func(ctx f2.Context, t *testing.T) f2.Context {
   164  			var (
   165  				v1 = clientset.CoreV1()
   166  			)
   167  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   168  			for _, node := range nodes.Items {
   169  				if node.GetLabels()[nodemeta.RoleLabel] == worker {
   170  					podName := fmt.Sprintf("%s-%s", testPod.ObjectMeta.Name, node.Name)
   171  					containerName := testPod.Spec.Containers[0].Name
   172  					_, err := WaitForPodReady(ctx, clientset, podName, k.Namespace)
   173  					require.NoError(t, err)
   174  
   175  					_, err = ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, deleteAllExpectedFiles())
   176  					require.NoError(t, err)
   177  
   178  					out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, "find /etc/kubernetes")
   179  					require.NoError(t, err)
   180  					foundFiles := strings.Fields(out)
   181  					for _, foundFile := range foundFiles {
   182  						assert.NotContains(t, expectedFiles, foundFile)
   183  					}
   184  					fmt.Printf("Deleted expected files from node %s\n", node.Name)
   185  				}
   186  			}
   187  			return ctx
   188  		}).
   189  		Setup("Start test", func(ctx f2.Context, t *testing.T) f2.Context {
   190  			var (
   191  				v1 = clientset.CoreV1()
   192  			)
   193  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   194  			for _, node := range nodes.Items {
   195  				if node.GetLabels()[nodemeta.RoleLabel] == worker {
   196  					// modify the annotations map
   197  					if node.Annotations == nil {
   198  						node.Annotations = make(map[string]string)
   199  					}
   200  					node.Annotations["etcd-phase"] = "clean"
   201  					node := node
   202  					_, err := clientset.CoreV1().Nodes().Update(ctx, &node, metav1.UpdateOptions{})
   203  					require.NoError(t, err)
   204  					fmt.Printf("Annotated node %s\n", node.Name)
   205  				}
   206  			}
   207  			return ctx
   208  		}).
   209  		Test("Verify secrets were created", func(ctx f2.Context, t *testing.T) f2.Context {
   210  			var (
   211  				v1 = clientset.CoreV1()
   212  			)
   213  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   214  			for _, node := range nodes.Items {
   215  				if node.GetLabels()[nodemeta.RoleLabel] == worker {
   216  					err := WaitForNodeAnnotationWithRetry(ctx, clientset, 5*time.Minute, node.Name, "etcd-phase", "Provisioned")
   217  					require.NoError(t, err)
   218  
   219  					_, err = v1.Secrets(operatorNamespace).Get(ctx, node.Name, metav1.GetOptions{})
   220  					require.NoError(t, err)
   221  					fmt.Printf("Verified secret was created for node %s\n", node.Name)
   222  				}
   223  			}
   224  			return ctx
   225  		}).
   226  		Test("Verify expected files were copied", func(ctx f2.Context, t *testing.T) f2.Context {
   227  			var (
   228  				v1 = clientset.CoreV1()
   229  			)
   230  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   231  			for _, node := range nodes.Items {
   232  				if node.GetLabels()[nodemeta.RoleLabel] == worker {
   233  					err := WaitForNodeAnnotationWithRetry(ctx, clientset, 5*time.Minute, node.Name, "etcd-phase", "Installed")
   234  					require.NoError(t, err)
   235  
   236  					podName := fmt.Sprintf("%s-%s", testPod.ObjectMeta.Name, node.Name)
   237  					containerName := testPod.Spec.Containers[0].Name
   238  
   239  					out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, containerName, k.Namespace, "find /etc/kubernetes")
   240  					require.NoError(t, err)
   241  					foundFiles := strings.Fields(out)
   242  					for _, expectedFile := range expectedFiles {
   243  						assert.Contains(t, foundFiles, expectedFile)
   244  					}
   245  					fmt.Printf("Verified expected files were created for node %s\n", node.Name)
   246  				}
   247  			}
   248  			return ctx
   249  		}).
   250  		Test("Verifying etcd cluster is healthy after configuration", func(ctx f2.Context, t *testing.T) f2.Context {
   251  			var (
   252  				v1 = clientset.CoreV1()
   253  			)
   254  			nodes, _ = v1.Nodes().List(ctx, metav1.ListOptions{})
   255  			for _, node := range nodes.Items {
   256  				if node.GetLabels()[nodemeta.RoleLabel] == controlplane {
   257  					podName := "etcd-" + node.Name
   258  					out, err := ExecCmd(ctx, clientset, k.Env.Config, podName, "etcd", "kube-system", etcdctlCmd+" endpoint health --cluster -w json")
   259  					require.NoError(t, err)
   260  
   261  					var etcdMemberStates []map[string]interface{}
   262  					err = json.Unmarshal([]byte(out), &etcdMemberStates)
   263  					require.NoError(t, err)
   264  
   265  					for _, etcdMemberState := range etcdMemberStates {
   266  						health, ok := etcdMemberState["health"].(bool)
   267  						assert.True(t, ok)
   268  						endpoint, ok := etcdMemberState["endpoint"].(string)
   269  						assert.True(t, ok)
   270  						fmt.Printf("Endpoint %s is healthy: %t\n", endpoint, health)
   271  						assert.True(t, health)
   272  					}
   273  
   274  					fmt.Println("Verified etcd cluster is healthy after configuration")
   275  				}
   276  			}
   277  			return ctx
   278  		}).
   279  		Teardown("Clean up cluster", func(ctx f2.Context, t *testing.T) f2.Context {
   280  			err := clientset.CoreV1().Namespaces().Delete(ctx, k.Namespace, metav1.DeleteOptions{})
   281  			assert.NoError(t, err)
   282  			fmt.Println("Deleted test namespace")
   283  			return ctx
   284  		}).
   285  		Feature()
   286  
   287  	// Run the tests
   288  	f.Test(t, etcdOperatorSanity)
   289  }
   290  
   291  func deleteAllExpectedFiles() (cmd string) {
   292  	for _, file := range expectedFiles {
   293  		cmd += fmt.Sprintf("rm -rf %s; ", file)
   294  	}
   295  	return
   296  }
   297  
   298  // ExecCmd executes a command in a pod and returns the output as a string
   299  func ExecCmd(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, namespace, command string) (string, error) {
   300  	// Create a POST request for the pods/exec resource
   301  	req := clientset.CoreV1().RESTClient().Post().Resource("pods").Name(podName).
   302  		Namespace(namespace).SubResource("exec").Param("container", containerName)
   303  
   304  	// Set the command and the standard streams
   305  	option := &v1.PodExecOptions{
   306  		Command: []string{"sh", "-c", command},
   307  		Stdout:  true,
   308  		Stderr:  true,
   309  	}
   310  	req.VersionedParams(option, scheme.ParameterCodec)
   311  
   312  	// Create an executor for the request
   313  	exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
   314  	if err != nil {
   315  		return "", err
   316  	}
   317  
   318  	// Create a buffer to capture the output
   319  	out := &bytes.Buffer{}
   320  	errOut := &bytes.Buffer{}
   321  
   322  	streamOptions := remotecommand.StreamOptions{
   323  		Stdout: out,
   324  		Stderr: errOut,
   325  	}
   326  
   327  	// Stream the command execution and wait for it to finish
   328  	err = exec.StreamWithContext(ctx, streamOptions)
   329  	if err != nil {
   330  		return "", err
   331  	}
   332  
   333  	if errOut.Len() > 0 {
   334  		return "", fmt.Errorf("command error: %s", errOut.String())
   335  	}
   336  
   337  	return out.String(), nil
   338  }
   339  
   340  // WaitForPodReady waits for a pod to be ready and returns it
   341  func WaitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace string) (*v1.Pod, error) {
   342  	// Get the pod object
   343  	pod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
   344  	if err == nil {
   345  		// Check if the pod is already ready
   346  		if pod.Status.Phase == v1.PodRunning {
   347  			return pod, nil
   348  		}
   349  	}
   350  	// Create a watch for the pod
   351  	watchOptions := metav1.ListOptions{
   352  		FieldSelector: fmt.Sprintf("metadata.name=%s", podName),
   353  	}
   354  	watcher, err := clientset.CoreV1().Pods(namespace).Watch(ctx, watchOptions)
   355  	if err != nil {
   356  		return nil, err
   357  	}
   358  
   359  	// Create a timeout channel
   360  	timeout := time.After(1 * time.Minute)
   361  
   362  	ch := watcher.ResultChan()
   363  
   364  	// Loop over the watch events
   365  	for {
   366  		select {
   367  		case event, ok := <-ch:
   368  			if !ok {
   369  				return nil, fmt.Errorf("closed channel")
   370  			}
   371  			pod, ok := event.Object.(*v1.Pod)
   372  			if !ok {
   373  				return nil, fmt.Errorf("unexpected type")
   374  			}
   375  			switch event.Type {
   376  			case watch.Added, watch.Modified:
   377  				if pod.Status.Phase == v1.PodRunning {
   378  					// watcher.Stop()
   379  					return pod, nil
   380  				}
   381  			case watch.Deleted:
   382  				return nil, fmt.Errorf("test pod was deleted")
   383  			case watch.Error:
   384  				return nil, fmt.Errorf("error with test pod")
   385  			default:
   386  				return nil, fmt.Errorf("unexpected event type: %v", event.Type)
   387  			}
   388  		case <-timeout:
   389  			// watcher.Stop()
   390  			return nil, fmt.Errorf("timeout waiting for test pods to be ready")
   391  		}
   392  	}
   393  }
   394  
   395  func WaitForNodeAnnotationWithRetry(ctx context.Context, clientset *kubernetes.Clientset, timeout time.Duration, nodeName, annotationKey, annotationValue string) (err error) {
   396  	ctx, cancel := context.WithTimeout(ctx, timeout)
   397  	defer cancel()
   398  
   399  	err = wait.PollUntilContextCancel(ctx, 10*time.Second, true, func(ctx context.Context) (bool, error) {
   400  		node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
   401  		if err != nil {
   402  			return false, err
   403  		}
   404  		if node.GetAnnotations()[annotationKey] == annotationValue {
   405  			return true, nil
   406  		}
   407  		return false, nil
   408  	})
   409  	return
   410  }
   411  

View as plain text