1
16
17 package testresource
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math/rand"
24 goruntime "runtime"
25 "testing"
26 "time"
27
28 cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
29 "github.com/stretchr/testify/require"
30 certificatesv1 "k8s.io/api/certificates/v1"
31 corev1 "k8s.io/api/core/v1"
32 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
33 apierrors "k8s.io/apimachinery/pkg/api/errors"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/runtime"
36 "k8s.io/apimachinery/pkg/watch"
37 "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/rest"
39 "sigs.k8s.io/controller-runtime/pkg/client"
40 "sigs.k8s.io/controller-runtime/pkg/client/config"
41 "sigs.k8s.io/controller-runtime/pkg/envtest"
42
43 "github.com/cert-manager/issuer-lib/internal/kubeutil"
44 "github.com/cert-manager/issuer-lib/internal/testapi/api"
45 )
46
47 type OwnedKubeClients struct {
48 EnvTest *envtest.Environment
49 Rest *rest.Config
50 Scheme *runtime.Scheme
51 KubeClient *kubernetes.Clientset
52 Client client.WithWatch
53 }
54
55 func KubeClients(tb testing.TB, kubeconfig *string) *OwnedKubeClients {
56 tb.Helper()
57
58 scheme := runtime.NewScheme()
59 require.NoError(tb, corev1.AddToScheme(scheme))
60 require.NoError(tb, certificatesv1.AddToScheme(scheme))
61 require.NoError(tb, cmapi.AddToScheme(scheme))
62 require.NoError(tb, api.AddToScheme(scheme))
63
64 testKubernetes := &OwnedKubeClients{
65 Scheme: scheme,
66 }
67
68 if kubeconfig == nil {
69 testKubernetes.initTestEnv(tb, testKubernetes.Scheme)
70 } else {
71 testKubernetes.initExistingKubernetes(tb, *kubeconfig)
72 }
73
74 kubeClientset, err := kubernetes.NewForConfig(testKubernetes.Rest)
75 require.NoError(tb, err)
76
77 testKubernetes.KubeClient = kubeClientset
78
79 controllerClient, err := client.NewWithWatch(testKubernetes.Rest, client.Options{Scheme: scheme})
80 require.NoError(tb, err)
81
82 testKubernetes.Client = controllerClient
83
84 return testKubernetes
85 }
86
87 func (k *OwnedKubeClients) initTestEnv(tb testing.TB, scheme *runtime.Scheme) {
88 tb.Helper()
89
90 k.EnvTest = &envtest.Environment{
91 Scheme: scheme,
92 }
93
94 tb.Log("Creating a Kubernetes API server")
95 cfg, err := k.EnvTest.Start()
96 require.NoError(tb, err)
97
98 tb.Cleanup(func() {
99 tb.Log("Waiting for testEnv to exit")
100 require.NoError(tb, k.EnvTest.Stop())
101 })
102
103 k.Rest = cfg
104 }
105
106 func (k *OwnedKubeClients) initExistingKubernetes(tb testing.TB, kubeconfig string) {
107 tb.Helper()
108
109 tb.Setenv("KUBECONFIG", kubeconfig)
110 kubeConfig, err := config.GetConfigWithContext("")
111 require.NoError(tb, err)
112
113 k.Rest = kubeConfig
114 }
115
116 func (k *OwnedKubeClients) InstallCRDs(options envtest.CRDInstallOptions) ([]*apiextensionsv1.CustomResourceDefinition, error) {
117 return envtest.InstallCRDs(k.Rest, options)
118 }
119
120 type CompleteFunc func(fn func(runtime.Object) error, eventTypes ...watch.EventType) error
121
122
123
124
125
126
127
128 func (k *OwnedKubeClients) StartObjectWatch(
129 tb testing.TB,
130 ctx context.Context,
131 object client.Object,
132 ) CompleteFunc {
133 tb.Helper()
134
135 fields := map[string]string{}
136 if name := object.GetName(); name != "" {
137 fields["metadata.name"] = name
138 }
139 if namespace := object.GetNamespace(); namespace != "" {
140 fields["metadata.namespace"] = namespace
141 }
142
143 err := kubeutil.SetGroupVersionKind(k.Scheme, object)
144 require.NoError(tb, err)
145
146 listObj, err := kubeutil.NewListObject(k.Scheme, object.GetObjectKind().GroupVersionKind())
147 require.NoError(tb, err)
148
149 watcher, startWatchError := k.Client.Watch(ctx, listObj, client.MatchingFields(fields), client.Limit(1))
150 stopped := (startWatchError != nil)
151 checkFunctionCalledBeforeCleanup(tb, "StartObjectWatch", "CompleteFunc", &stopped)
152
153 stop := func() {
154 if !stopped {
155 watcher.Stop()
156 stopped = true
157 }
158 }
159
160 return func(fn func(runtime.Object) error, eventTypes ...watch.EventType) error {
161 if startWatchError != nil {
162 return startWatchError
163 }
164
165 defer stop()
166
167 if fn == nil {
168 return nil
169 }
170
171 var lastError error
172 for {
173 var event watch.Event
174 select {
175 case <-ctx.Done():
176 if lastError == nil {
177 lastError = ctx.Err()
178 }
179 return lastError
180 case event = <-watcher.ResultChan():
181 }
182
183 found := false
184 CheckLoop:
185 for _, eventType := range eventTypes {
186 if eventType == event.Type {
187 found = true
188 break CheckLoop
189 }
190 }
191
192 if !found {
193 continue
194 }
195
196 fnErr := fn(event.Object)
197 if fnErr == nil {
198 return nil
199 }
200
201 if lastError == nil || !errors.Is(fnErr, context.DeadlineExceeded) {
202 lastError = fnErr
203 }
204 }
205 }
206 }
207
208 const letterBytes = "abcdefghijklmnopqrstuvwxyz"
209
210 func randStringBytes(n int) string {
211 b := make([]byte, n)
212 for i := range b {
213 b[i] = letterBytes[rand.Intn(len(letterBytes))]
214 }
215 return string(b)
216 }
217
218 func (k *OwnedKubeClients) SetupNamespace(tb testing.TB, ctx context.Context) (string, context.CancelFunc) {
219 tb.Helper()
220
221 namespace := randStringBytes(15)
222
223 removeNamespace := func(cleanupCtx context.Context) (bool, error) {
224 err := k.KubeClient.CoreV1().Namespaces().Delete(cleanupCtx, namespace, metav1.DeleteOptions{})
225 if err != nil {
226 if apierrors.IsNotFound(err) {
227 return true, nil
228 }
229 return false, err
230 }
231 return false, nil
232 }
233
234 cleanupExisting := func(cleanupCtx context.Context) error {
235 complete := k.StartObjectWatch(tb, cleanupCtx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})
236 defer require.NoError(tb, complete(nil))
237
238 if notFound, err := removeNamespace(cleanupCtx); err != nil {
239 return err
240 } else if notFound {
241 return nil
242 }
243
244 return complete(func(o runtime.Object) error {
245 return nil
246 }, watch.Deleted)
247 }
248 require.NoError(tb, cleanupExisting(ctx))
249
250 namespaceObj := &corev1.Namespace{
251 ObjectMeta: metav1.ObjectMeta{Name: namespace},
252 }
253 _, err := k.KubeClient.CoreV1().Namespaces().Create(ctx, namespaceObj, metav1.CreateOptions{})
254 require.NoError(tb, err)
255
256 stopped := false
257 checkFunctionCalledBeforeCleanup(tb, "SetupNamespace", "CancelFunc", &stopped)
258
259 return namespace, func() {
260 defer func() { stopped = true }()
261
262 cleanupCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
263 defer cancel()
264
265 _, err := removeNamespace(cleanupCtx)
266 require.NoError(tb, err)
267 }
268 }
269
270 func checkFunctionCalledBeforeCleanup(tb testing.TB, name string, funcname string, stopped *bool) {
271 tb.Helper()
272
273 _, file, no, ok := goruntime.Caller(2)
274 message := fmt.Sprintf("%s's %s was not called", name, funcname)
275 if ok {
276 message += fmt.Sprintf(", %s called at %s#%d", name, file, no)
277 }
278 tb.Cleanup(func() {
279 if !*stopped {
280 panic(message)
281 }
282 })
283 }
284
View as plain text