// Package k8s provides test framework utilities for K8s-based unit and integration // tests, supporting the ability to do both with the same test suite. package k8s import ( "context" "fmt" "io" "os" "strings" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" konfigkonnector "edge-infra.dev/pkg/k8s/konfigkonnector" configconnector "edge-infra.dev/pkg/k8s/konfigkonnector/apis/configconnector/v1beta1" "edge-infra.dev/pkg/k8s/unstructured" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/k8s/runtime/sap/install" "edge-infra.dev/pkg/lib/gcp/iam" "edge-infra.dev/test/framework" "edge-infra.dev/test/framework/gcp" "edge-infra.dev/test/framework/integration" "edge-infra.dev/third_party/k8s/certmanager" ) // K8s implements framework.SubFramework and provides lifecycle utilities for // integration tests that run on K8s clusters. type K8s struct { skipNamespaceCreation bool cfg *rest.Config // the K8s controller manager being tested by this suite. the K8s struct // controls when it starts so that it can ensure the required dependencies // (e.g., K8s config connector) are up and running before the manager is // started mgr manager.Manager // cancel function for the context.Context used to start manager // called at the end of the test suite to stop the manager goroutine mgrCancel context.CancelFunc kfgkonnector bool certmgr bool Namespace string Client client.Client } // Option is used to expose public optional K8s configurations. // It should generally only be used to change non-public members of the K8s // struct type Option func(*K8s) // SkipNamespaceCreation will make the framework skip creation of a namespace // for each test case. Useful for tests that don't operate against any // Namespaced resources. func SkipNamespaceCreation() Option { return func(k8s *K8s) { k8s.skipNamespaceCreation = true } } // WithKonfigKonnector controls whether or not K8s config connector is installed // and configured for the test run. func WithKonfigKonnector() Option { return func(k *K8s) { k.kfgkonnector = true } } // WithCertManager ensures that cert-manager is installed for the test run. func WithCertManager() Option { return func(k *K8s) { k.certmgr = true } } // WithCtrlManager sets up the framework for testing a controller-runtime Manager func WithCtrlManager(mgr manager.Manager) Option { return func(k *K8s) { k.mgr = mgr } } // New creates a K8s framework func New(cfg *rest.Config, opts ...Option) *K8s { k8s := &K8s{ skipNamespaceCreation: false, cfg: cfg, } for _, opt := range opts { opt(k8s) } return k8s } // SetupWithFramework registers the called instance of K8s with the provided // Framework, setting up lifecycle hooks and framework metadata. func (k *K8s) SetupWithFramework(f *framework.Framework) { k.setClient(f) // register KCC setup first because K8s managers which depend on it will // error at start up if the CRDs aren't present if integration.Only(k.kfgkonnector) { f.Setup(k.setupKonfigConnector) } if integration.Only(k.certmgr) { f.Setup(k.setupCertManager) } if k.mgr != nil { f.Setup(k.startManager) } // label tests using this framework so they can be skipped via command line, // e.g., -skip-labels=k8s f.Label("k8s", "true"). Setup(k.setup). Teardown(k.teardown). BeforeEachTest(k.beforeEach) } func (k *K8s) RESTConfig() *rest.Config { return k.cfg } // setupKonfigKonnector is ran once per suite, installing K8s config connector // and configuring it so that integration tests can use it // TODO(aw185176): we should sync across all suites to save time/cycles func (k *K8s) setupKonfigConnector(f *framework.Framework) { // skip if required k8s cfg connector configuration isnt present, or // this isnt an integration test run integration.Skip(NeedsKonfigKonnector)(f) //nolint gosec not actually a secret or sensitive value secret := "gcp-creds" ctx := context.Background() c, err := client.New(k.cfg, client.Options{Scheme: konfigkonnector.CreateScheme()}) if err != nil { f.FailNow("failed to create k8s client", err) } manifests, err := konfigkonnector.LoadManifests() if err != nil { f.FailNow("failed to load k8s cfg connector manifests", err) } // TODO(aw185176): integrate framework with sap properly instead of one-shot install if _, err := install.Install(ctx, k.cfg, manifests, FieldManagerOwner(f), InstallOpts()...); err != nil { f.FailNow("failed to install k8s cfg connector", err) } cfgConn := &configconnector.ConfigConnector{} if KonfigKonnector.ServiceAccount != "" { // TODO(aw185176): probably need separate KCC project ID here to support things like foreman cnrm SA cfgConn = configconnector.New( configconnector.WithSvcAccount( iam.SvcAccountEmail(KonfigKonnector.ServiceAccount, gcp.GCloud.ProjectID), ), ) } if KonfigKonnector.APIKey != "" { key, err := os.ReadFile(framework.ResolvePath(KonfigKonnector.APIKey)) if err != nil { f.FailNow("failed to read k8s cfg connector api key", err) } if err := konfigkonnector.SetupCNRMSystem(ctx, c, secret, key); err != nil { f.FailNow("failed to setup k8s cfg connector namespace", err) } cfgConn = configconnector.New(configconnector.WithCredentialsSecret(secret)) } if err := c.Create(ctx, cfgConn); err != nil && !errors.IsAlreadyExists(err) { f.FailNow("failed to create ConfigConnector object", err) } f.Eventually(func() bool { _ = c.Get(ctx, client.ObjectKeyFromObject(cfgConn), cfgConn) return cfgConn.IsHealthy() }, Timeouts.DefaultTimeout, Timeouts.Tick, "ConfigConnector object didnt become ready in time") } // setupCertManager is ran once per suite, installing cert manager // and configuring it so that integration tests can use it // TODO(aw185176): we should sync across all suites to save time/cycles func (k *K8s) setupCertManager(f *framework.Framework) { ctx := context.Background() manifests, err := certmanager.LoadManifests() if err != nil { f.FailNow("failed to load embedded certmanager manifests", err) } // TODO(aw185176): integrate framework with sap properly instead of one-shot install if _, err := install.Install(ctx, k.cfg, manifests, FieldManagerOwner(f), InstallOpts()...); err != nil { f.FailNow("failed to install cert manager", err) } } // startManager creates a separate goroutine for the K8s manager being tested // and starts it with a cancellable context, storing the cancel in k8s.mgrCancel // to be called on test teardown func (k *K8s) startManager(f *framework.Framework) { ctx, cancel := context.WithCancel(context.TODO()) k.mgrCancel = cancel go func() { if err := k.mgr.Start(ctx); err != nil { f.FailNow("failed to start manager", err) } }() } // setClient creates and sets the client on the K8s struct if it is not present func (k *K8s) setClient(f *framework.Framework) { if k.Client == nil { opts := client.Options{} // TODO: should be able to provide scheme without manager if k.mgr != nil { opts.Scheme = k.mgr.GetScheme() } c, err := client.New(k.cfg, opts) if err != nil { f.FailNow("failed to create client", err) } k.Client = c } } // update manifest to set unique namespace and update associated role bindings func ProcessManifest(manifest *unstructured.Unstructured, namespace string) (err error) { manifest.SetNamespace(namespace) if manifest.GetKind() == "RoleBinding" { err = processRoleBindings(manifest, namespace) } return err } func processRoleBindings(manifest *unstructured.Unstructured, namespace string) error { var roleBinding rbacv1.RoleBinding err := unstructured.FromUnstructured(manifest, &roleBinding) if err != nil { return err } for idx := range roleBinding.Subjects { if roleBinding.Subjects[idx].Kind == "ServiceAccount" { roleBinding.Subjects[idx].Namespace = namespace } } updatedManifest, err := unstructured.ToUnstructured(&roleBinding) if err != nil { return err } *manifest = *updatedManifest return nil } // teardown stops the manager started for the test suite func (k *K8s) teardown(_ *framework.Framework) { if k.mgr != nil { k.mgrCancel() } } // setup runs once before the K8s suite. it creates the test suites namespace func (k *K8s) setup(f *framework.Framework) { if !k.skipNamespaceCreation { // TODO: ensure f.UniqueName is valid K8s resource name? k.Namespace = f.UniqueName f.NoError(k.Client.Create(context.Background(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: k.Namespace, Labels: map[string]string{ "edge-framework": f.BaseName, }, }, })) } } // beforeEach creates the K8s client if it doesn't exist. // it creates a client separately from the K8s manager to avoid client cache // pollution in the test suites: https://book.kubebuilder.io/cronjob-tutorial/writing-tests.html#test-environment-setup func (k *K8s) beforeEach(f *framework.Framework) { k.setClient(f) } // FieldManagerOwner creates a consistent ownership key for resource fields that // are created/updated by test framework code. The unique name for the test run // is used in conjunction with a framework constant to provide scope. func FieldManagerOwner(f *framework.Framework) sap.Owner { return sap.Owner{ Field: fmt.Sprintf("edge-framework-%s", f.BaseName), Group: "edge-framework", } } // InstallOpts returns consistent server-side apply options based on the configured // framework timeouts. These options should be used for all of the apply operations func InstallOpts() []install.Option { return []install.Option{ install.WithForce(), install.WithTimeout(Timeouts.DefaultTimeout), } } // Create pod object that runs the specified args in the test cluster. // Registry parameter defaults to "localhost:21700" if nothing is input, otherwise it uses the first input value. func CreateTestPod(name string, namespace string, args []string, image string, registry ...string) *corev1.Pod { var reg string if registry == nil { reg = "localhost:21700" } else { reg = strings.Trim(registry[0], "/") } image = strings.Trim(image, "/") return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: name, Image: reg + "/" + image, Args: args, ImagePullPolicy: corev1.PullAlways, }, }, RestartPolicy: corev1.RestartPolicyNever, }, } } // Return a stream of the Pod logs for a given pod in a namespace func (k *K8s) FindPodLogs(ctx context.Context, podname string, namespace string) (io.ReadCloser, error) { clientset, err := kubernetes.NewForConfig(k.cfg) if err != nil { return nil, err } podLogOptions := corev1.PodLogOptions{} req := clientset.CoreV1().Pods(namespace).GetLogs(podname, &podLogOptions) return req.Stream(ctx) }