...

Source file src/github.com/cert-manager/issuer-lib/internal/tests/testresource/kube.go

Documentation: github.com/cert-manager/issuer-lib/internal/tests/testresource

     1  /*
     2  Copyright 2023 The cert-manager Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package testresource
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"math/rand"
    24  	goruntime "runtime"
    25  	"testing"
    26  	"time"
    27  
    28  	cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
    29  	"github.com/stretchr/testify/require"
    30  	certificatesv1 "k8s.io/api/certificates/v1"
    31  	corev1 "k8s.io/api/core/v1"
    32  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    33  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/runtime"
    36  	"k8s.io/apimachinery/pkg/watch"
    37  	"k8s.io/client-go/kubernetes"
    38  	"k8s.io/client-go/rest"
    39  	"sigs.k8s.io/controller-runtime/pkg/client"
    40  	"sigs.k8s.io/controller-runtime/pkg/client/config"
    41  	"sigs.k8s.io/controller-runtime/pkg/envtest"
    42  
    43  	"github.com/cert-manager/issuer-lib/internal/kubeutil"
    44  	"github.com/cert-manager/issuer-lib/internal/testapi/api"
    45  )
    46  
    47  type OwnedKubeClients struct {
    48  	EnvTest    *envtest.Environment
    49  	Rest       *rest.Config
    50  	Scheme     *runtime.Scheme
    51  	KubeClient *kubernetes.Clientset
    52  	Client     client.WithWatch
    53  }
    54  
    55  func KubeClients(tb testing.TB, kubeconfig *string) *OwnedKubeClients {
    56  	tb.Helper()
    57  
    58  	scheme := runtime.NewScheme()
    59  	require.NoError(tb, corev1.AddToScheme(scheme))
    60  	require.NoError(tb, certificatesv1.AddToScheme(scheme))
    61  	require.NoError(tb, cmapi.AddToScheme(scheme))
    62  	require.NoError(tb, api.AddToScheme(scheme))
    63  
    64  	testKubernetes := &OwnedKubeClients{
    65  		Scheme: scheme,
    66  	}
    67  
    68  	if kubeconfig == nil {
    69  		testKubernetes.initTestEnv(tb, testKubernetes.Scheme)
    70  	} else {
    71  		testKubernetes.initExistingKubernetes(tb, *kubeconfig)
    72  	}
    73  
    74  	kubeClientset, err := kubernetes.NewForConfig(testKubernetes.Rest)
    75  	require.NoError(tb, err)
    76  
    77  	testKubernetes.KubeClient = kubeClientset
    78  
    79  	controllerClient, err := client.NewWithWatch(testKubernetes.Rest, client.Options{Scheme: scheme})
    80  	require.NoError(tb, err)
    81  
    82  	testKubernetes.Client = controllerClient
    83  
    84  	return testKubernetes
    85  }
    86  
    87  func (k *OwnedKubeClients) initTestEnv(tb testing.TB, scheme *runtime.Scheme) {
    88  	tb.Helper()
    89  
    90  	k.EnvTest = &envtest.Environment{
    91  		Scheme: scheme,
    92  	}
    93  
    94  	tb.Log("Creating a Kubernetes API server")
    95  	cfg, err := k.EnvTest.Start()
    96  	require.NoError(tb, err)
    97  
    98  	tb.Cleanup(func() {
    99  		tb.Log("Waiting for testEnv to exit")
   100  		require.NoError(tb, k.EnvTest.Stop())
   101  	})
   102  
   103  	k.Rest = cfg
   104  }
   105  
   106  func (k *OwnedKubeClients) initExistingKubernetes(tb testing.TB, kubeconfig string) {
   107  	tb.Helper()
   108  
   109  	tb.Setenv("KUBECONFIG", kubeconfig)
   110  	kubeConfig, err := config.GetConfigWithContext("")
   111  	require.NoError(tb, err)
   112  
   113  	k.Rest = kubeConfig
   114  }
   115  
   116  func (k *OwnedKubeClients) InstallCRDs(options envtest.CRDInstallOptions) ([]*apiextensionsv1.CustomResourceDefinition, error) {
   117  	return envtest.InstallCRDs(k.Rest, options)
   118  }
   119  
   120  type CompleteFunc func(fn func(runtime.Object) error, eventTypes ...watch.EventType) error
   121  
   122  // StartObjectWatch starts a watch for the provided object,
   123  // the returned function should be used to wait for a condition
   124  // to succeed. The watch will start after calling this function.
   125  // This means that the completion function can respond to events
   126  // received before calling the complete function but after calling
   127  // StartObjectWatch.
   128  func (k *OwnedKubeClients) StartObjectWatch(
   129  	tb testing.TB,
   130  	ctx context.Context,
   131  	object client.Object,
   132  ) CompleteFunc {
   133  	tb.Helper()
   134  
   135  	fields := map[string]string{}
   136  	if name := object.GetName(); name != "" {
   137  		fields["metadata.name"] = name
   138  	}
   139  	if namespace := object.GetNamespace(); namespace != "" {
   140  		fields["metadata.namespace"] = namespace
   141  	}
   142  
   143  	err := kubeutil.SetGroupVersionKind(k.Scheme, object)
   144  	require.NoError(tb, err)
   145  
   146  	listObj, err := kubeutil.NewListObject(k.Scheme, object.GetObjectKind().GroupVersionKind())
   147  	require.NoError(tb, err)
   148  
   149  	watcher, startWatchError := k.Client.Watch(ctx, listObj, client.MatchingFields(fields), client.Limit(1))
   150  	stopped := (startWatchError != nil)
   151  	checkFunctionCalledBeforeCleanup(tb, "StartObjectWatch", "CompleteFunc", &stopped)
   152  
   153  	stop := func() {
   154  		if !stopped {
   155  			watcher.Stop()
   156  			stopped = true
   157  		}
   158  	}
   159  
   160  	return func(fn func(runtime.Object) error, eventTypes ...watch.EventType) error {
   161  		if startWatchError != nil {
   162  			return startWatchError
   163  		}
   164  
   165  		defer stop()
   166  
   167  		if fn == nil {
   168  			return nil
   169  		}
   170  
   171  		var lastError error
   172  		for {
   173  			var event watch.Event
   174  			select {
   175  			case <-ctx.Done():
   176  				if lastError == nil {
   177  					lastError = ctx.Err()
   178  				}
   179  				return lastError
   180  			case event = <-watcher.ResultChan():
   181  			}
   182  
   183  			found := false
   184  		CheckLoop:
   185  			for _, eventType := range eventTypes {
   186  				if eventType == event.Type {
   187  					found = true
   188  					break CheckLoop
   189  				}
   190  			}
   191  
   192  			if !found {
   193  				continue
   194  			}
   195  
   196  			fnErr := fn(event.Object)
   197  			if fnErr == nil {
   198  				return nil
   199  			}
   200  			// we only want to overwrite the error if it is not a DeadlineExceeded error
   201  			if lastError == nil || !errors.Is(fnErr, context.DeadlineExceeded) {
   202  				lastError = fnErr
   203  			}
   204  		}
   205  	}
   206  }
   207  
   208  const letterBytes = "abcdefghijklmnopqrstuvwxyz"
   209  
   210  func randStringBytes(n int) string {
   211  	b := make([]byte, n)
   212  	for i := range b {
   213  		b[i] = letterBytes[rand.Intn(len(letterBytes))]
   214  	}
   215  	return string(b)
   216  }
   217  
   218  func (k *OwnedKubeClients) SetupNamespace(tb testing.TB, ctx context.Context) (string, context.CancelFunc) {
   219  	tb.Helper()
   220  
   221  	namespace := randStringBytes(15)
   222  
   223  	removeNamespace := func(cleanupCtx context.Context) (bool, error) {
   224  		err := k.KubeClient.CoreV1().Namespaces().Delete(cleanupCtx, namespace, metav1.DeleteOptions{})
   225  		if err != nil {
   226  			if apierrors.IsNotFound(err) {
   227  				return true, nil
   228  			}
   229  			return false, err
   230  		}
   231  		return false, nil
   232  	}
   233  
   234  	cleanupExisting := func(cleanupCtx context.Context) error {
   235  		complete := k.StartObjectWatch(tb, cleanupCtx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})
   236  		defer require.NoError(tb, complete(nil))
   237  
   238  		if notFound, err := removeNamespace(cleanupCtx); err != nil {
   239  			return err
   240  		} else if notFound {
   241  			return nil
   242  		}
   243  
   244  		return complete(func(o runtime.Object) error {
   245  			return nil
   246  		}, watch.Deleted)
   247  	}
   248  	require.NoError(tb, cleanupExisting(ctx))
   249  
   250  	namespaceObj := &corev1.Namespace{
   251  		ObjectMeta: metav1.ObjectMeta{Name: namespace},
   252  	}
   253  	_, err := k.KubeClient.CoreV1().Namespaces().Create(ctx, namespaceObj, metav1.CreateOptions{})
   254  	require.NoError(tb, err)
   255  
   256  	stopped := false
   257  	checkFunctionCalledBeforeCleanup(tb, "SetupNamespace", "CancelFunc", &stopped)
   258  
   259  	return namespace, func() {
   260  		defer func() { stopped = true }()
   261  
   262  		cleanupCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   263  		defer cancel()
   264  
   265  		_, err := removeNamespace(cleanupCtx)
   266  		require.NoError(tb, err)
   267  	}
   268  }
   269  
   270  func checkFunctionCalledBeforeCleanup(tb testing.TB, name string, funcname string, stopped *bool) {
   271  	tb.Helper()
   272  
   273  	_, file, no, ok := goruntime.Caller(2)
   274  	message := fmt.Sprintf("%s's %s was not called", name, funcname)
   275  	if ok {
   276  		message += fmt.Sprintf(", %s called at %s#%d", name, file, no)
   277  	}
   278  	tb.Cleanup(func() {
   279  		if !*stopped {
   280  			panic(message)
   281  		}
   282  	})
   283  }
   284  

View as plain text