...

Source file src/edge-infra.dev/test/f2/x/ktest/ktest.go

Documentation: edge-infra.dev/test/f2/x/ktest

     1  package ktest
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"flag"
     7  	"fmt"
     8  	"io"
     9  	"log"
    10  	"net"
    11  	"strings"
    12  	"testing"
    13  	"time"
    14  
    15  	corev1 "k8s.io/api/core/v1"
    16  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    17  	"k8s.io/client-go/kubernetes"
    18  	"sigs.k8s.io/controller-runtime/pkg/client"
    19  	"sigs.k8s.io/controller-runtime/pkg/manager"
    20  
    21  	"edge-infra.dev/pkg/k8s/runtime/controller"
    22  	"edge-infra.dev/pkg/k8s/runtime/sap"
    23  	"edge-infra.dev/test/f2"
    24  	"edge-infra.dev/test/f2/fctx"
    25  	"edge-infra.dev/test/f2/x/ktest/envtest"
    26  	"edge-infra.dev/test/f2/x/ktest/kpoll"
    27  )
    28  
    29  const (
    30  	MaxNamespaceLen = 63
    31  	Timeout         = time.Second * 30
    32  	Tick            = time.Second * 1
    33  	startErrTimeout = time.Minute * 5
    34  )
    35  
    36  var startErrChan chan error
    37  
    38  // TODO: install/uninstall KCC. flag for disabling cleanup
    39  // TODO: envtest excludes use case where tests are scheduled onto a real cluster
    40  // and use the client information injected into k8s pods? if so need to rework
    41  // to make both cases equally easy. envtest uses config.GetConfig() if an explicit
    42  // config is not passed to them, which will fall back to InClusterConfig if no
    43  // --kubeconfig or KUBECONFIG is present. Good enough?
    44  
    45  // K8s is a framework extension for Kubernetes that registers consistent
    46  // lifeycle hooks for K8s tests, flag binding, config parsing, and other test
    47  // utilities.
    48  type K8s struct {
    49  	Env       *envtest.Environment
    50  	Namespace string
    51  	Client    client.Client
    52  	Manager   manager.Manager
    53  	*kpoll.KPoll
    54  
    55  	Timeout time.Duration
    56  	Tick    time.Duration
    57  
    58  	options   *options
    59  	mgrCancel context.CancelFunc
    60  }
    61  
    62  // FromContext attempts to fetch an instance of K8s from the test context and
    63  // returns an error if it is not discovered.
    64  func FromContext(ctx fctx.Context) (*K8s, error) {
    65  	v := fctx.ValueFrom[K8s](ctx)
    66  	if v == nil {
    67  		return nil, fmt.Errorf("%w: ktest.K8s extension", fctx.ErrNotFound)
    68  	}
    69  	return v, nil
    70  }
    71  
    72  // FromContextT is a testing variant of FromContext that immediately fails the
    73  // test if K8s isnt presnt in the testing context.
    74  func FromContextT(ctx fctx.Context, t *testing.T) *K8s {
    75  	return fctx.ValueFromT[K8s](ctx, t)
    76  }
    77  
    78  // New creates a K8s framework
    79  func New(opts ...Option) *K8s {
    80  	o := makeOptions(opts...)
    81  	k8s := &K8s{
    82  		options: o,
    83  	}
    84  	return k8s
    85  }
    86  
    87  // RegisterFns is called by the framework after binding and parsing test flags.
    88  func (k *K8s) RegisterFns(f f2.Framework) {
    89  	// Always set up envtest and set K8s client using the envtest rest.Config
    90  	f.Setup(k.setupEnvtest)
    91  	// If we are going to create manager, register that fn first so that setClient
    92  	// can access the scheme.
    93  	if k.options.mgrCreator != nil {
    94  		f.Setup(k.startMgr)
    95  		f.Teardown(k.stopMgr)
    96  	}
    97  	// Make sure client is set before rest of k8s lifecycle funcs
    98  	f.Setup(func(ctx fctx.Context) (fctx.Context, error) {
    99  		return ctx, k.setClient()
   100  	})
   101  
   102  	// Ensure client is re-set and a new Namespace is created for each test.
   103  	// This is done to provide better isolation between individual test cases
   104  	// that may have inadvertently mutated state.
   105  	f.BeforeEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) {
   106  		if err := k.setClient(); err != nil {
   107  			return ctx, err
   108  		}
   109  
   110  		k.KPoll = kpoll.New(ctx, k.Client, k.Timeout, k.Tick)
   111  
   112  		if !k.options.skipNamespaceCreation {
   113  			name := strings.ToLower(strings.ReplaceAll(t.Name(), "_", "-"))
   114  			k.Namespace = name + "-" + ctx.RunID
   115  
   116  			// if the proposed namespace is above the max shorten the name
   117  			if len(k.Namespace) > MaxNamespaceLen {
   118  				t.Log("proposed namespace was too long", k.Namespace)
   119  				k.Namespace = name[:len(name)-(len(k.Namespace)-MaxNamespaceLen)] + "-" + ctx.RunID
   120  			}
   121  
   122  			t.Log("creating namespace", k.Namespace)
   123  
   124  			return ctx, k.Client.Create(ctx, &corev1.Namespace{
   125  				ObjectMeta: metav1.ObjectMeta{
   126  					Name: k.Namespace,
   127  					Labels: map[string]string{
   128  						"ktest": t.Name(),
   129  					},
   130  				},
   131  			})
   132  		}
   133  
   134  		return ctx, nil
   135  	})
   136  
   137  	f.AfterEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) {
   138  		if err := k.setClient(); err != nil {
   139  			return ctx, err
   140  		}
   141  
   142  		k.KPoll = kpoll.New(ctx, k.Client, k.Timeout, k.Tick)
   143  
   144  		// skip deletion if the user requested or if a namespace was never created
   145  		if k.options.skipNamespaceDeletion || k.options.skipNamespaceCreation {
   146  			t.Log("skipping namespace deletion")
   147  			return ctx, nil
   148  		}
   149  
   150  		t.Log("deleting namespace", k.Namespace)
   151  
   152  		return ctx, k.Client.Delete(ctx, &corev1.Namespace{
   153  			ObjectMeta: metav1.ObjectMeta{
   154  				Name: k.Namespace,
   155  				Labels: map[string]string{
   156  					"ktest": t.Name(),
   157  				},
   158  			},
   159  		})
   160  	})
   161  
   162  	// Always teardown envtest last
   163  	f.Teardown(k.teardownEnvtest)
   164  }
   165  
   166  // BindFlags registers test flags for the framework extension.
   167  func (k *K8s) BindFlags(fs *flag.FlagSet) {
   168  	envtest.BindFlags(fs)
   169  	fs.DurationVar(&k.Timeout,
   170  		"ktest-default-timeout",
   171  		Timeout,
   172  		"default timeout for K8s operations",
   173  	)
   174  	fs.DurationVar(&k.Tick,
   175  		"ktest-tick",
   176  		Tick,
   177  		"interval checks are evaluated at during tests",
   178  	)
   179  	// TODO: bind KCC flags?
   180  }
   181  
   182  func (k *K8s) Labels() map[string]string {
   183  	l := map[string]string{
   184  		"k8s": "true",
   185  	}
   186  
   187  	return l
   188  }
   189  
   190  // IntoContext stores the framework extension in the test context.
   191  func (k *K8s) IntoContext(ctx fctx.Context) fctx.Context {
   192  	return fctx.ValueInto(ctx, k)
   193  }
   194  
   195  // Return a stream of the logs for a given container within the defined pod & namespace
   196  func (k *K8s) GetContainerLogs(ctx context.Context, podname string, namespace string, container string) (io.ReadCloser, error) {
   197  	clientset, err := kubernetes.NewForConfig(k.Env.Config)
   198  	if err != nil {
   199  		return nil, err
   200  	}
   201  
   202  	podLogOptions := corev1.PodLogOptions{Container: container}
   203  
   204  	req := clientset.CoreV1().Pods(namespace).GetLogs(podname, &podLogOptions)
   205  	return req.Stream(ctx)
   206  }
   207  
   208  func (k *K8s) setClient() error {
   209  	if k.Env == nil {
   210  		return fmt.Errorf("setClient called before envtest setup")
   211  	}
   212  
   213  	opts := client.Options{}
   214  	if k.Manager != nil {
   215  		opts.Scheme = k.Manager.GetScheme()
   216  	}
   217  	if k.options.clientScheme != nil {
   218  		opts.Scheme = k.options.clientScheme
   219  	}
   220  	c, err := client.New(k.Env.Config, opts)
   221  	if err != nil {
   222  		return err
   223  	}
   224  	k.Client = c
   225  
   226  	return nil
   227  }
   228  
   229  // startMgr creates and starts the controller manager as defined by framework
   230  // options
   231  // Any errors from starting the controller are sent to an error channel and received during teardown
   232  func (k *K8s) startMgr(ctx fctx.Context) (fctx.Context, error) {
   233  	// backstop against situations that should not occur
   234  	if k.Env == nil {
   235  		return ctx, fmt.Errorf("startMgr called before envtest setup")
   236  	}
   237  	if k.options.mgrCreator == nil {
   238  		return ctx, fmt.Errorf("startMgr called without mgrCreator being set")
   239  	}
   240  
   241  	// if metricsAddress is provided, use desired port if available and a random port if not available
   242  	// else, metrics server is disabled
   243  	port := "0"
   244  	if k.options.metricsAddress != "" {
   245  		var err error
   246  		port, err = getMetricsAddress(k.options.metricsAddress)
   247  		if err != nil {
   248  			return ctx, err
   249  		}
   250  	}
   251  
   252  	mgrOpts := []controller.Option{
   253  		controller.WithCfg(k.Env.Config),
   254  		controller.WithMetricsAddress(port),
   255  	}
   256  
   257  	// Parse optional graceful shutdown timeout
   258  	if k.options.gracefulTimeout != "" {
   259  		timeout, err := time.ParseDuration(k.options.gracefulTimeout)
   260  		if err != nil {
   261  			return ctx, fmt.Errorf("failed to parse graceful shutdown timeout: %w", err)
   262  		}
   263  		mgrOpts = append(mgrOpts, controller.WithGracefulTimeout(timeout))
   264  	}
   265  
   266  	mgr, err := k.options.mgrCreator(mgrOpts...)
   267  	if err != nil {
   268  		return ctx, fmt.Errorf("failed to create controller manager: %w", err)
   269  	}
   270  	k.Manager = mgr
   271  
   272  	managerCtx, cancel := context.WithCancel(ctx)
   273  	k.mgrCancel = cancel
   274  
   275  	startErrChan = make(chan error, 1)
   276  
   277  	// Start the controller and send errors to an error channel
   278  	// Errors are received from this channel during teardown because the Start
   279  	// function blocks until the context is cancelled or an error occurs during setup.
   280  	//
   281  	// Attempting to receive the error value from the channel here can lead to an infinite block
   282  	// since setup would never complete and the context would never be cancelled.
   283  	go func() {
   284  		startErrChan <- k.Manager.Start(managerCtx)
   285  		close(startErrChan)
   286  	}()
   287  
   288  	return ctx, nil
   289  }
   290  
   291  // stopMgr stops the controller manager by invoking the cancellation func
   292  // and checks for any errors returned from startMgr's goroutine
   293  func (k *K8s) stopMgr(ctx fctx.Context) (fctx.Context, error) {
   294  	k.mgrCancel()
   295  
   296  	if err := receiveStartErr(startErrChan); err != nil {
   297  		return ctx, err
   298  	}
   299  
   300  	return ctx, nil
   301  }
   302  
   303  func (k *K8s) setupEnvtest(ctx fctx.Context) (fctx.Context, error) {
   304  	var err error
   305  	k.Env, err = envtest.Setup(k.options.envtestOpts...)
   306  	return ctx, err
   307  }
   308  
   309  func (k *K8s) teardownEnvtest(ctx fctx.Context) (fctx.Context, error) {
   310  	return ctx, k.Env.Stop()
   311  }
   312  
   313  // FieldManagerOwner creates a consistent ownership key for resource fields that
   314  // are created/updated by test framework code.
   315  func FieldManagerOwner() sap.Owner {
   316  	return sap.Owner{
   317  		Field: "f2-ktest",
   318  		Group: "f2-ktest",
   319  	}
   320  }
   321  
   322  // getMetricsAddress determines if the passed port is open.
   323  // If it is not, a random open port is returned.
   324  func getMetricsAddress(port string) (string, error) {
   325  	l, err := net.Listen("tcp", port)
   326  	defer func() { _ = l.Close() }()
   327  	if err != nil {
   328  		l, err = net.Listen("tcp", "")
   329  		if err != nil {
   330  			return "", fmt.Errorf("failed to find open port: %w", err)
   331  		}
   332  	}
   333  	tcpAddr, ok := l.Addr().(*net.TCPAddr)
   334  	if !ok {
   335  		return "", fmt.Errorf("tcpAddr should be of type TCPAddr")
   336  	}
   337  	port = fmt.Sprintf(":%d", tcpAddr.Port)
   338  	return port, nil
   339  }
   340  
   341  // receiveStartErr attempts to receive and handle any errors returned from startMgr.
   342  // Error if nothing is received from the startErrChan channel after 5 minutes
   343  // to prevent infinite block when attempting to receive from an empty channel
   344  func receiveStartErr(startErrChan <-chan error) error {
   345  	timeout := time.After(startErrTimeout)
   346  	select {
   347  	case err := <-startErrChan:
   348  		if errors.Is(err, context.Canceled) {
   349  			log.Printf("manager context canceled")
   350  			break
   351  		}
   352  		if err != nil {
   353  			log.Printf("manager error: %v", err)
   354  			return err
   355  		}
   356  	case <-timeout:
   357  		return fmt.Errorf("no error received from startMgr before 5m timeout")
   358  	}
   359  	return nil
   360  }
   361  

View as plain text