package ktest import ( "context" "errors" "flag" "fmt" "io" "log" "net" "strings" "testing" "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/fctx" "edge-infra.dev/test/f2/x/ktest/envtest" "edge-infra.dev/test/f2/x/ktest/kpoll" ) const ( MaxNamespaceLen = 63 Timeout = time.Second * 30 Tick = time.Second * 1 startErrTimeout = time.Minute * 5 ) var startErrChan chan error // TODO: install/uninstall KCC. flag for disabling cleanup // TODO: envtest excludes use case where tests are scheduled onto a real cluster // and use the client information injected into k8s pods? if so need to rework // to make both cases equally easy. envtest uses config.GetConfig() if an explicit // config is not passed to them, which will fall back to InClusterConfig if no // --kubeconfig or KUBECONFIG is present. Good enough? // K8s is a framework extension for Kubernetes that registers consistent // lifeycle hooks for K8s tests, flag binding, config parsing, and other test // utilities. type K8s struct { Env *envtest.Environment Namespace string Client client.Client Manager manager.Manager *kpoll.KPoll Timeout time.Duration Tick time.Duration options *options mgrCancel context.CancelFunc } // FromContext attempts to fetch an instance of K8s from the test context and // returns an error if it is not discovered. func FromContext(ctx fctx.Context) (*K8s, error) { v := fctx.ValueFrom[K8s](ctx) if v == nil { return nil, fmt.Errorf("%w: ktest.K8s extension", fctx.ErrNotFound) } return v, nil } // FromContextT is a testing variant of FromContext that immediately fails the // test if K8s isnt presnt in the testing context. func FromContextT(ctx fctx.Context, t *testing.T) *K8s { return fctx.ValueFromT[K8s](ctx, t) } // New creates a K8s framework func New(opts ...Option) *K8s { o := makeOptions(opts...) k8s := &K8s{ options: o, } return k8s } // RegisterFns is called by the framework after binding and parsing test flags. func (k *K8s) RegisterFns(f f2.Framework) { // Always set up envtest and set K8s client using the envtest rest.Config f.Setup(k.setupEnvtest) // If we are going to create manager, register that fn first so that setClient // can access the scheme. if k.options.mgrCreator != nil { f.Setup(k.startMgr) f.Teardown(k.stopMgr) } // Make sure client is set before rest of k8s lifecycle funcs f.Setup(func(ctx fctx.Context) (fctx.Context, error) { return ctx, k.setClient() }) // Ensure client is re-set and a new Namespace is created for each test. // This is done to provide better isolation between individual test cases // that may have inadvertently mutated state. f.BeforeEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) { if err := k.setClient(); err != nil { return ctx, err } k.KPoll = kpoll.New(ctx, k.Client, k.Timeout, k.Tick) if !k.options.skipNamespaceCreation { name := strings.ToLower(strings.ReplaceAll(t.Name(), "_", "-")) k.Namespace = name + "-" + ctx.RunID // if the proposed namespace is above the max shorten the name if len(k.Namespace) > MaxNamespaceLen { t.Log("proposed namespace was too long", k.Namespace) k.Namespace = name[:len(name)-(len(k.Namespace)-MaxNamespaceLen)] + "-" + ctx.RunID } t.Log("creating namespace", k.Namespace) return ctx, k.Client.Create(ctx, &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: k.Namespace, Labels: map[string]string{ "ktest": t.Name(), }, }, }) } return ctx, nil }) f.AfterEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) { if err := k.setClient(); err != nil { return ctx, err } k.KPoll = kpoll.New(ctx, k.Client, k.Timeout, k.Tick) // skip deletion if the user requested or if a namespace was never created if k.options.skipNamespaceDeletion || k.options.skipNamespaceCreation { t.Log("skipping namespace deletion") return ctx, nil } t.Log("deleting namespace", k.Namespace) return ctx, k.Client.Delete(ctx, &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: k.Namespace, Labels: map[string]string{ "ktest": t.Name(), }, }, }) }) // Always teardown envtest last f.Teardown(k.teardownEnvtest) } // BindFlags registers test flags for the framework extension. func (k *K8s) BindFlags(fs *flag.FlagSet) { envtest.BindFlags(fs) fs.DurationVar(&k.Timeout, "ktest-default-timeout", Timeout, "default timeout for K8s operations", ) fs.DurationVar(&k.Tick, "ktest-tick", Tick, "interval checks are evaluated at during tests", ) // TODO: bind KCC flags? } func (k *K8s) Labels() map[string]string { l := map[string]string{ "k8s": "true", } return l } // IntoContext stores the framework extension in the test context. func (k *K8s) IntoContext(ctx fctx.Context) fctx.Context { return fctx.ValueInto(ctx, k) } // Return a stream of the logs for a given container within the defined pod & namespace func (k *K8s) GetContainerLogs(ctx context.Context, podname string, namespace string, container string) (io.ReadCloser, error) { clientset, err := kubernetes.NewForConfig(k.Env.Config) if err != nil { return nil, err } podLogOptions := corev1.PodLogOptions{Container: container} req := clientset.CoreV1().Pods(namespace).GetLogs(podname, &podLogOptions) return req.Stream(ctx) } func (k *K8s) setClient() error { if k.Env == nil { return fmt.Errorf("setClient called before envtest setup") } opts := client.Options{} if k.Manager != nil { opts.Scheme = k.Manager.GetScheme() } if k.options.clientScheme != nil { opts.Scheme = k.options.clientScheme } c, err := client.New(k.Env.Config, opts) if err != nil { return err } k.Client = c return nil } // startMgr creates and starts the controller manager as defined by framework // options // Any errors from starting the controller are sent to an error channel and received during teardown func (k *K8s) startMgr(ctx fctx.Context) (fctx.Context, error) { // backstop against situations that should not occur if k.Env == nil { return ctx, fmt.Errorf("startMgr called before envtest setup") } if k.options.mgrCreator == nil { return ctx, fmt.Errorf("startMgr called without mgrCreator being set") } // if metricsAddress is provided, use desired port if available and a random port if not available // else, metrics server is disabled port := "0" if k.options.metricsAddress != "" { var err error port, err = getMetricsAddress(k.options.metricsAddress) if err != nil { return ctx, err } } mgrOpts := []controller.Option{ controller.WithCfg(k.Env.Config), controller.WithMetricsAddress(port), } // Parse optional graceful shutdown timeout if k.options.gracefulTimeout != "" { timeout, err := time.ParseDuration(k.options.gracefulTimeout) if err != nil { return ctx, fmt.Errorf("failed to parse graceful shutdown timeout: %w", err) } mgrOpts = append(mgrOpts, controller.WithGracefulTimeout(timeout)) } mgr, err := k.options.mgrCreator(mgrOpts...) if err != nil { return ctx, fmt.Errorf("failed to create controller manager: %w", err) } k.Manager = mgr managerCtx, cancel := context.WithCancel(ctx) k.mgrCancel = cancel startErrChan = make(chan error, 1) // Start the controller and send errors to an error channel // Errors are received from this channel during teardown because the Start // function blocks until the context is cancelled or an error occurs during setup. // // Attempting to receive the error value from the channel here can lead to an infinite block // since setup would never complete and the context would never be cancelled. go func() { startErrChan <- k.Manager.Start(managerCtx) close(startErrChan) }() return ctx, nil } // stopMgr stops the controller manager by invoking the cancellation func // and checks for any errors returned from startMgr's goroutine func (k *K8s) stopMgr(ctx fctx.Context) (fctx.Context, error) { k.mgrCancel() if err := receiveStartErr(startErrChan); err != nil { return ctx, err } return ctx, nil } func (k *K8s) setupEnvtest(ctx fctx.Context) (fctx.Context, error) { var err error k.Env, err = envtest.Setup(k.options.envtestOpts...) return ctx, err } func (k *K8s) teardownEnvtest(ctx fctx.Context) (fctx.Context, error) { return ctx, k.Env.Stop() } // FieldManagerOwner creates a consistent ownership key for resource fields that // are created/updated by test framework code. func FieldManagerOwner() sap.Owner { return sap.Owner{ Field: "f2-ktest", Group: "f2-ktest", } } // getMetricsAddress determines if the passed port is open. // If it is not, a random open port is returned. func getMetricsAddress(port string) (string, error) { l, err := net.Listen("tcp", port) defer func() { _ = l.Close() }() if err != nil { l, err = net.Listen("tcp", "") if err != nil { return "", fmt.Errorf("failed to find open port: %w", err) } } tcpAddr, ok := l.Addr().(*net.TCPAddr) if !ok { return "", fmt.Errorf("tcpAddr should be of type TCPAddr") } port = fmt.Sprintf(":%d", tcpAddr.Port) return port, nil } // receiveStartErr attempts to receive and handle any errors returned from startMgr. // Error if nothing is received from the startErrChan channel after 5 minutes // to prevent infinite block when attempting to receive from an empty channel func receiveStartErr(startErrChan <-chan error) error { timeout := time.After(startErrTimeout) select { case err := <-startErrChan: if errors.Is(err, context.Canceled) { log.Printf("manager context canceled") break } if err != nil { log.Printf("manager error: %v", err) return err } case <-timeout: return fmt.Errorf("no error received from startMgr before 5m timeout") } return nil }