1
16
17 package envtest
18
19 import (
20 "bufio"
21 "bytes"
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "os"
27 "path/filepath"
28 "time"
29
30 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
31 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
32 apierrors "k8s.io/apimachinery/pkg/api/errors"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/apimachinery/pkg/util/wait"
37 k8syaml "k8s.io/apimachinery/pkg/util/yaml"
38 "k8s.io/client-go/kubernetes/scheme"
39 "k8s.io/client-go/rest"
40 "k8s.io/client-go/util/retry"
41 "k8s.io/utils/ptr"
42 "sigs.k8s.io/yaml"
43
44 "sigs.k8s.io/controller-runtime/pkg/client"
45 "sigs.k8s.io/controller-runtime/pkg/webhook/conversion"
46 )
47
48
49 type CRDInstallOptions struct {
50
51
52
53
54
55
56
57 Scheme *runtime.Scheme
58
59
60 Paths []string
61
62
63 CRDs []*apiextensionsv1.CustomResourceDefinition
64
65
66 ErrorIfPathMissing bool
67
68
69 MaxTime time.Duration
70
71
72 PollInterval time.Duration
73
74
75
76
77 CleanUpAfterUse bool
78
79
80
81
82
83
84 WebhookOptions WebhookInstallOptions
85 }
86
87 const (
88 defaultPollInterval = 100 * time.Millisecond
89 defaultMaxWait = 10 * time.Second
90 )
91
92
93 func InstallCRDs(config *rest.Config, options CRDInstallOptions) ([]*apiextensionsv1.CustomResourceDefinition, error) {
94 defaultCRDOptions(&options)
95
96
97 if err := readCRDFiles(&options); err != nil {
98 return nil, fmt.Errorf("unable to read CRD files: %w", err)
99 }
100
101 if err := modifyConversionWebhooks(options.CRDs, options.Scheme, options.WebhookOptions); err != nil {
102 return nil, err
103 }
104
105
106 if err := CreateCRDs(config, options.CRDs); err != nil {
107 return options.CRDs, fmt.Errorf("unable to create CRD instances: %w", err)
108 }
109
110
111 if err := WaitForCRDs(config, options.CRDs, options); err != nil {
112 return options.CRDs, fmt.Errorf("something went wrong waiting for CRDs to appear as API resources: %w", err)
113 }
114
115 return options.CRDs, nil
116 }
117
118
119 func readCRDFiles(options *CRDInstallOptions) error {
120 if len(options.Paths) > 0 {
121 crdList, err := renderCRDs(options)
122 if err != nil {
123 return err
124 }
125
126 options.CRDs = append(options.CRDs, crdList...)
127 }
128 return nil
129 }
130
131
132 func defaultCRDOptions(o *CRDInstallOptions) {
133 if o.Scheme == nil {
134 o.Scheme = scheme.Scheme
135 }
136 if o.MaxTime == 0 {
137 o.MaxTime = defaultMaxWait
138 }
139 if o.PollInterval == 0 {
140 o.PollInterval = defaultPollInterval
141 }
142 }
143
144
145 func WaitForCRDs(config *rest.Config, crds []*apiextensionsv1.CustomResourceDefinition, options CRDInstallOptions) error {
146
147 waitingFor := map[schema.GroupVersion]*sets.Set[string]{}
148 for _, crd := range crds {
149 gvs := []schema.GroupVersion{}
150 for _, version := range crd.Spec.Versions {
151 if version.Served {
152 gvs = append(gvs, schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name})
153 }
154 }
155
156 for _, gv := range gvs {
157 log.V(1).Info("adding API in waitlist", "GV", gv)
158 if _, found := waitingFor[gv]; !found {
159
160 waitingFor[gv] = &sets.Set[string]{}
161 }
162
163 waitingFor[gv].Insert(crd.Spec.Names.Plural)
164 }
165 }
166
167
168 p := &poller{config: config, waitingFor: waitingFor}
169 return wait.PollUntilContextTimeout(context.TODO(), options.PollInterval, options.MaxTime, true, p.poll)
170 }
171
172
173 type poller struct {
174
175 config *rest.Config
176
177
178 waitingFor map[schema.GroupVersion]*sets.Set[string]
179 }
180
181
182 func (p *poller) poll(ctx context.Context) (done bool, err error) {
183
184 cs, err := clientset.NewForConfig(p.config)
185 if err != nil {
186 return false, err
187 }
188
189 allFound := true
190 for gv, resources := range p.waitingFor {
191
192 if resources.Len() == 0 {
193 delete(p.waitingFor, gv)
194 continue
195 }
196
197
198
199 resourceList, err := cs.Discovery().ServerResourcesForGroupVersion(gv.Group + "/" + gv.Version)
200 if err != nil {
201 return false, nil
202 }
203
204
205 for _, resource := range resourceList.APIResources {
206 resources.Delete(resource.Name)
207 }
208
209
210 if resources.Len() != 0 {
211 allFound = false
212 }
213 }
214 return allFound, nil
215 }
216
217
218 func UninstallCRDs(config *rest.Config, options CRDInstallOptions) error {
219
220 if err := readCRDFiles(&options); err != nil {
221 return err
222 }
223
224
225 cs, err := client.New(config, client.Options{})
226 if err != nil {
227 return err
228 }
229
230
231 for _, crd := range options.CRDs {
232 crd := crd
233 log.V(1).Info("uninstalling CRD", "crd", crd.GetName())
234 if err := cs.Delete(context.TODO(), crd); err != nil {
235
236 if !apierrors.IsNotFound(err) {
237 return err
238 }
239 }
240 }
241
242 return nil
243 }
244
245
246 func CreateCRDs(config *rest.Config, crds []*apiextensionsv1.CustomResourceDefinition) error {
247 cs, err := client.New(config, client.Options{})
248 if err != nil {
249 return fmt.Errorf("unable to create client: %w", err)
250 }
251
252
253 for _, crd := range crds {
254 crd := crd
255 log.V(1).Info("installing CRD", "crd", crd.GetName())
256 existingCrd := crd.DeepCopy()
257 err := cs.Get(context.TODO(), client.ObjectKey{Name: crd.GetName()}, existingCrd)
258 switch {
259 case apierrors.IsNotFound(err):
260 if err := cs.Create(context.TODO(), crd); err != nil {
261 return fmt.Errorf("unable to create CRD %q: %w", crd.GetName(), err)
262 }
263 case err != nil:
264 return fmt.Errorf("unable to get CRD %q to check if it exists: %w", crd.GetName(), err)
265 default:
266 log.V(1).Info("CRD already exists, updating", "crd", crd.GetName())
267 if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
268 if err := cs.Get(context.TODO(), client.ObjectKey{Name: crd.GetName()}, existingCrd); err != nil {
269 return err
270 }
271 crd.SetResourceVersion(existingCrd.GetResourceVersion())
272 return cs.Update(context.TODO(), crd)
273 }); err != nil {
274 return err
275 }
276 }
277 }
278 return nil
279 }
280
281
282 func renderCRDs(options *CRDInstallOptions) ([]*apiextensionsv1.CustomResourceDefinition, error) {
283 type GVKN struct {
284 GVK schema.GroupVersionKind
285 Name string
286 }
287
288 crds := map[GVKN]*apiextensionsv1.CustomResourceDefinition{}
289
290 for _, path := range options.Paths {
291 var (
292 err error
293 info os.FileInfo
294 files []string
295 filePath = path
296 )
297
298
299 if info, err = os.Stat(path); os.IsNotExist(err) {
300 if options.ErrorIfPathMissing {
301 return nil, err
302 }
303 continue
304 }
305
306 if !info.IsDir() {
307 filePath, files = filepath.Dir(path), []string{info.Name()}
308 } else {
309 entries, err := os.ReadDir(path)
310 if err != nil {
311 return nil, err
312 }
313 for _, e := range entries {
314 files = append(files, e.Name())
315 }
316 }
317
318 log.V(1).Info("reading CRDs from path", "path", path)
319 crdList, err := readCRDs(filePath, files)
320 if err != nil {
321 return nil, err
322 }
323
324 for i, crd := range crdList {
325 gvkn := GVKN{GVK: crd.GroupVersionKind(), Name: crd.GetName()}
326 if _, found := crds[gvkn]; found {
327
328 log.Info("there are more than one CRD definitions with the same <Group, Version, Kind, Name>", "GVKN", gvkn)
329 }
330
331 crds[gvkn] = crdList[i]
332 }
333 }
334
335
336 res := []*apiextensionsv1.CustomResourceDefinition{}
337 for _, obj := range crds {
338 res = append(res, obj)
339 }
340 return res, nil
341 }
342
343
344
345 func modifyConversionWebhooks(crds []*apiextensionsv1.CustomResourceDefinition, scheme *runtime.Scheme, webhookOptions WebhookInstallOptions) error {
346 if len(webhookOptions.LocalServingCAData) == 0 {
347 return nil
348 }
349
350
351 convertibles := map[schema.GroupKind]struct{}{}
352 for gvk := range scheme.AllKnownTypes() {
353 obj, err := scheme.New(gvk)
354 if err != nil {
355 return err
356 }
357 if ok, err := conversion.IsConvertible(scheme, obj); ok && err == nil {
358 convertibles[gvk.GroupKind()] = struct{}{}
359 }
360 }
361
362
363 hostPort, err := webhookOptions.generateHostPort()
364 if err != nil {
365 return err
366 }
367 url := ptr.To(fmt.Sprintf("https://%s/convert", hostPort))
368
369 for i := range crds {
370
371 if crds[i].Spec.PreserveUnknownFields {
372 continue
373 }
374 if !webhookOptions.IgnoreSchemeConvertible {
375
376
377
378
379
380 if _, ok := convertibles[schema.GroupKind{
381 Group: crds[i].Spec.Group,
382 Kind: crds[i].Spec.Names.Kind,
383 }]; !ok {
384 crds[i].Spec.Conversion = nil
385 continue
386 }
387 }
388 if crds[i].Spec.Conversion == nil {
389 crds[i].Spec.Conversion = &apiextensionsv1.CustomResourceConversion{
390 Webhook: &apiextensionsv1.WebhookConversion{},
391 }
392 }
393 crds[i].Spec.Conversion.Strategy = apiextensionsv1.WebhookConverter
394 crds[i].Spec.Conversion.Webhook.ConversionReviewVersions = []string{"v1", "v1beta1"}
395 crds[i].Spec.Conversion.Webhook.ClientConfig = &apiextensionsv1.WebhookClientConfig{
396 Service: nil,
397 URL: url,
398 CABundle: webhookOptions.LocalServingCAData,
399 }
400 }
401
402 return nil
403 }
404
405
406 func readCRDs(basePath string, files []string) ([]*apiextensionsv1.CustomResourceDefinition, error) {
407 var crds []*apiextensionsv1.CustomResourceDefinition
408
409
410 crdExts := sets.NewString(".json", ".yaml", ".yml")
411
412 for _, file := range files {
413
414 if !crdExts.Has(filepath.Ext(file)) {
415 continue
416 }
417
418
419 docs, err := readDocuments(filepath.Join(basePath, file))
420 if err != nil {
421 return nil, err
422 }
423
424 for _, doc := range docs {
425 crd := &apiextensionsv1.CustomResourceDefinition{}
426 if err = yaml.Unmarshal(doc, crd); err != nil {
427 return nil, err
428 }
429
430 if crd.Kind != "CustomResourceDefinition" || crd.Spec.Names.Kind == "" || crd.Spec.Group == "" {
431 continue
432 }
433 crds = append(crds, crd)
434 }
435
436 log.V(1).Info("read CRDs from file", "file", file)
437 }
438 return crds, nil
439 }
440
441
442 func readDocuments(fp string) ([][]byte, error) {
443 b, err := os.ReadFile(fp)
444 if err != nil {
445 return nil, err
446 }
447
448 docs := [][]byte{}
449 reader := k8syaml.NewYAMLReader(bufio.NewReader(bytes.NewReader(b)))
450 for {
451
452 doc, err := reader.Read()
453 if err != nil {
454 if errors.Is(err, io.EOF) {
455 break
456 }
457
458 return nil, err
459 }
460
461 docs = append(docs, doc)
462 }
463
464 return docs, nil
465 }
466
View as plain text