1
16
17 package resource
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "net/http"
25 "net/url"
26 "os"
27 "path/filepath"
28 "strings"
29 "time"
30
31 "golang.org/x/sync/errgroup"
32 "golang.org/x/text/encoding/unicode"
33 "golang.org/x/text/transform"
34 "k8s.io/apimachinery/pkg/api/errors"
35 "k8s.io/apimachinery/pkg/api/meta"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/labels"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/runtime/schema"
40 utilerrors "k8s.io/apimachinery/pkg/util/errors"
41 "k8s.io/apimachinery/pkg/util/yaml"
42 "k8s.io/apimachinery/pkg/watch"
43 )
44
45 const (
46 constSTDINstr = "STDIN"
47 stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"
48 )
49
50
51
52 type Watchable interface {
53 Watch(resourceVersion string) (watch.Interface, error)
54 }
55
56
57
58 type ResourceMapping interface {
59 ResourceMapping() *meta.RESTMapping
60 }
61
62
63
64 type Info struct {
65
66 Client RESTClient
67
68 Mapping *meta.RESTMapping
69
70
71 Namespace string
72 Name string
73
74
75
76 Source string
77
78
79
80
81
82
83 Object runtime.Object
84
85
86
87
88 ResourceVersion string
89
90
91 Subresource string
92 }
93
94
95 func (i *Info) Visit(fn VisitorFunc) error {
96 return fn(i, nil)
97 }
98
99
100 func (i *Info) Get() (err error) {
101 obj, err := NewHelper(i.Client, i.Mapping).WithSubresource(i.Subresource).Get(i.Namespace, i.Name)
102 if err != nil {
103 if errors.IsNotFound(err) && len(i.Namespace) > 0 && i.Namespace != metav1.NamespaceDefault && i.Namespace != metav1.NamespaceAll {
104 err2 := i.Client.Get().AbsPath("api", "v1", "namespaces", i.Namespace).Do(context.TODO()).Error()
105 if err2 != nil && errors.IsNotFound(err2) {
106 return err2
107 }
108 }
109 return err
110 }
111 i.Object = obj
112 i.ResourceVersion, _ = metadataAccessor.ResourceVersion(obj)
113 return nil
114 }
115
116
117
118
119 func (i *Info) Refresh(obj runtime.Object, ignoreError bool) error {
120 name, err := metadataAccessor.Name(obj)
121 if err != nil {
122 if !ignoreError {
123 return err
124 }
125 } else {
126 i.Name = name
127 }
128 namespace, err := metadataAccessor.Namespace(obj)
129 if err != nil {
130 if !ignoreError {
131 return err
132 }
133 } else {
134 i.Namespace = namespace
135 }
136 version, err := metadataAccessor.ResourceVersion(obj)
137 if err != nil {
138 if !ignoreError {
139 return err
140 }
141 } else {
142 i.ResourceVersion = version
143 }
144 i.Object = obj
145 return nil
146 }
147
148
149 func (i *Info) ObjectName() string {
150 if i.Mapping != nil {
151 return fmt.Sprintf("%s/%s", i.Mapping.Resource.Resource, i.Name)
152 }
153 gvk := i.Object.GetObjectKind().GroupVersionKind()
154 if len(gvk.Group) == 0 {
155 return fmt.Sprintf("%s/%s", strings.ToLower(gvk.Kind), i.Name)
156 }
157 return fmt.Sprintf("%s.%s/%s\n", strings.ToLower(gvk.Kind), gvk.Group, i.Name)
158 }
159
160
161 func (i *Info) String() string {
162 basicInfo := fmt.Sprintf("Name: %q, Namespace: %q", i.Name, i.Namespace)
163 if i.Mapping != nil {
164 mappingInfo := fmt.Sprintf("Resource: %q, GroupVersionKind: %q", i.Mapping.Resource.String(),
165 i.Mapping.GroupVersionKind.String())
166 return fmt.Sprint(mappingInfo, "\n", basicInfo)
167 }
168 return basicInfo
169 }
170
171
172 func (i *Info) Namespaced() bool {
173 if i.Mapping != nil {
174
175 return i.Mapping.Scope.Name() == meta.RESTScopeNameNamespace
176 }
177
178 return len(i.Namespace) > 0
179 }
180
181
182 func (i *Info) Watch(resourceVersion string) (watch.Interface, error) {
183 return NewHelper(i.Client, i.Mapping).WatchSingle(i.Namespace, i.Name, resourceVersion)
184 }
185
186
187 func (i *Info) ResourceMapping() *meta.RESTMapping {
188 return i.Mapping
189 }
190
191
192
193 type VisitorList []Visitor
194
195
196 func (l VisitorList) Visit(fn VisitorFunc) error {
197 for i := range l {
198 if err := l[i].Visit(fn); err != nil {
199 return err
200 }
201 }
202 return nil
203 }
204
205 type ConcurrentVisitorList struct {
206 visitors []Visitor
207 concurrency int
208 }
209
210 func (l ConcurrentVisitorList) Visit(fn VisitorFunc) error {
211 g := errgroup.Group{}
212
213
214
215
216 concurrency := 1
217 if l.concurrency > concurrency {
218 concurrency = l.concurrency
219 }
220 g.SetLimit(concurrency)
221
222 for i := range l.visitors {
223 i := i
224 g.Go(func() error {
225 return l.visitors[i].Visit(fn)
226 })
227 }
228
229 return g.Wait()
230 }
231
232
233
234 type EagerVisitorList []Visitor
235
236
237
238 func (l EagerVisitorList) Visit(fn VisitorFunc) error {
239 var errs []error
240 for i := range l {
241 err := l[i].Visit(func(info *Info, err error) error {
242 if err != nil {
243 errs = append(errs, err)
244 return nil
245 }
246 if err := fn(info, nil); err != nil {
247 errs = append(errs, err)
248 }
249 return nil
250 })
251 if err != nil {
252 errs = append(errs, err)
253 }
254 }
255 return utilerrors.NewAggregate(errs)
256 }
257
258 func ValidateSchema(data []byte, schema ContentValidator) error {
259 if schema == nil {
260 return nil
261 }
262 if err := schema.ValidateBytes(data); err != nil {
263 return fmt.Errorf("error validating data: %v; %s", err, stopValidateMessage)
264 }
265 return nil
266 }
267
268
269
270 type URLVisitor struct {
271 URL *url.URL
272 *StreamVisitor
273 HttpAttemptCount int
274 }
275
276 func (v *URLVisitor) Visit(fn VisitorFunc) error {
277 body, err := readHttpWithRetries(httpgetImpl, time.Second, v.URL.String(), v.HttpAttemptCount)
278 if err != nil {
279 return err
280 }
281 defer body.Close()
282 v.StreamVisitor.Reader = body
283 return v.StreamVisitor.Visit(fn)
284 }
285
286
287 func readHttpWithRetries(get httpget, duration time.Duration, u string, attempts int) (io.ReadCloser, error) {
288 var err error
289 if attempts <= 0 {
290 return nil, fmt.Errorf("http attempts must be greater than 0, was %d", attempts)
291 }
292 for i := 0; i < attempts; i++ {
293 var (
294 statusCode int
295 status string
296 body io.ReadCloser
297 )
298 if i > 0 {
299 time.Sleep(duration)
300 }
301
302
303 statusCode, status, body, err = get(u)
304
305
306 if err != nil {
307 continue
308 }
309
310 if statusCode == http.StatusOK {
311 return body, nil
312 }
313 body.Close()
314
315 err = fmt.Errorf("unable to read URL %q, server reported %s, status code=%d", u, status, statusCode)
316
317 if statusCode >= 500 && statusCode < 600 {
318
319 continue
320 } else {
321
322 break
323 }
324 }
325 return nil, err
326 }
327
328
329 type httpget func(url string) (int, string, io.ReadCloser, error)
330
331
332 func httpgetImpl(url string) (int, string, io.ReadCloser, error) {
333 resp, err := http.Get(url)
334 if err != nil {
335 return 0, "", nil, err
336 }
337 return resp.StatusCode, resp.Status, resp.Body, nil
338 }
339
340
341
342 type DecoratedVisitor struct {
343 visitor Visitor
344 decorators []VisitorFunc
345 }
346
347
348
349
350 func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
351 if len(fn) == 0 {
352 return v
353 }
354 return DecoratedVisitor{v, fn}
355 }
356
357
358 func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
359 return v.visitor.Visit(func(info *Info, err error) error {
360 if err != nil {
361 return err
362 }
363 for i := range v.decorators {
364 if err := v.decorators[i](info, nil); err != nil {
365 return err
366 }
367 }
368 return fn(info, nil)
369 })
370 }
371
372
373
374
375 type ContinueOnErrorVisitor struct {
376 Visitor
377 }
378
379
380
381
382
383
384
385 func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error {
386 var errs []error
387 err := v.Visitor.Visit(func(info *Info, err error) error {
388 if err != nil {
389 errs = append(errs, err)
390 return nil
391 }
392 if err := fn(info, nil); err != nil {
393 errs = append(errs, err)
394 }
395 return nil
396 })
397 if err != nil {
398 errs = append(errs, err)
399 }
400 if len(errs) == 1 {
401 return errs[0]
402 }
403 return utilerrors.NewAggregate(errs)
404 }
405
406
407
408
409
410
411 type FlattenListVisitor struct {
412 visitor Visitor
413 typer runtime.ObjectTyper
414 mapper *mapper
415 }
416
417
418
419 func NewFlattenListVisitor(v Visitor, typer runtime.ObjectTyper, mapper *mapper) Visitor {
420 return FlattenListVisitor{v, typer, mapper}
421 }
422
423 func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
424 return v.visitor.Visit(func(info *Info, err error) error {
425 if err != nil {
426 return err
427 }
428 if info.Object == nil {
429 return fn(info, nil)
430 }
431 if !meta.IsListType(info.Object) {
432 return fn(info, nil)
433 }
434
435 items := []runtime.Object{}
436 itemsToProcess := []runtime.Object{info.Object}
437
438 for i := 0; i < len(itemsToProcess); i++ {
439 currObj := itemsToProcess[i]
440 if !meta.IsListType(currObj) {
441 items = append(items, currObj)
442 continue
443 }
444
445 currItems, err := meta.ExtractList(currObj)
446 if err != nil {
447 return err
448 }
449 if errs := runtime.DecodeList(currItems, v.mapper.decoder); len(errs) > 0 {
450 return utilerrors.NewAggregate(errs)
451 }
452 itemsToProcess = append(itemsToProcess, currItems...)
453 }
454
455
456 var preferredGVKs []schema.GroupVersionKind
457 if info.Mapping != nil && !info.Mapping.GroupVersionKind.Empty() {
458 preferredGVKs = append(preferredGVKs, info.Mapping.GroupVersionKind)
459 }
460 var errs []error
461 for i := range items {
462 item, err := v.mapper.infoForObject(items[i], v.typer, preferredGVKs)
463 if err != nil {
464 errs = append(errs, err)
465 continue
466 }
467 if len(info.ResourceVersion) != 0 {
468 item.ResourceVersion = info.ResourceVersion
469 }
470
471 if len(info.Source) != 0 {
472 item.Source = info.Source
473 }
474 if err := fn(item, nil); err != nil {
475 errs = append(errs, err)
476 }
477 }
478 return utilerrors.NewAggregate(errs)
479 })
480 }
481
482 func ignoreFile(path string, extensions []string) bool {
483 if len(extensions) == 0 {
484 return false
485 }
486 ext := filepath.Ext(path)
487 for _, s := range extensions {
488 if s == ext {
489 return false
490 }
491 }
492 return true
493 }
494
495
496 func FileVisitorForSTDIN(mapper *mapper, schema ContentValidator) Visitor {
497 return &FileVisitor{
498 Path: constSTDINstr,
499 StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, schema),
500 }
501 }
502
503
504
505
506 func ExpandPathsToFileVisitors(mapper *mapper, paths string, recursive bool, extensions []string, schema ContentValidator) ([]Visitor, error) {
507 var visitors []Visitor
508 err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
509 if err != nil {
510 return err
511 }
512
513 if fi.IsDir() {
514 if path != paths && !recursive {
515 return filepath.SkipDir
516 }
517 return nil
518 }
519
520 if path != paths && ignoreFile(path, extensions) {
521 return nil
522 }
523
524 visitor := &FileVisitor{
525 Path: path,
526 StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
527 }
528
529 visitors = append(visitors, visitor)
530 return nil
531 })
532
533 if err != nil {
534 return nil, err
535 }
536 return visitors, nil
537 }
538
539
540 type FileVisitor struct {
541 Path string
542 *StreamVisitor
543 }
544
545
546 func (v *FileVisitor) Visit(fn VisitorFunc) error {
547 var f *os.File
548 if v.Path == constSTDINstr {
549 f = os.Stdin
550 } else {
551 var err error
552 f, err = os.Open(v.Path)
553 if err != nil {
554 return err
555 }
556 defer f.Close()
557 }
558
559
560
561 utf16bom := unicode.BOMOverride(unicode.UTF8.NewDecoder())
562 v.StreamVisitor.Reader = transform.NewReader(f, utf16bom)
563
564 return v.StreamVisitor.Visit(fn)
565 }
566
567
568
569
570
571 type StreamVisitor struct {
572 io.Reader
573 *mapper
574
575 Source string
576 Schema ContentValidator
577 }
578
579
580 func NewStreamVisitor(r io.Reader, mapper *mapper, source string, schema ContentValidator) *StreamVisitor {
581 return &StreamVisitor{
582 Reader: r,
583 mapper: mapper,
584 Source: source,
585 Schema: schema,
586 }
587 }
588
589
590 func (v *StreamVisitor) Visit(fn VisitorFunc) error {
591 d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
592 for {
593 ext := runtime.RawExtension{}
594 if err := d.Decode(&ext); err != nil {
595 if err == io.EOF {
596 return nil
597 }
598 return fmt.Errorf("error parsing %s: %v", v.Source, err)
599 }
600
601 ext.Raw = bytes.TrimSpace(ext.Raw)
602 if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
603 continue
604 }
605 if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
606 return fmt.Errorf("error validating %q: %v", v.Source, err)
607 }
608 info, err := v.infoForData(ext.Raw, v.Source)
609 if err != nil {
610 if fnErr := fn(info, err); fnErr != nil {
611 return fnErr
612 }
613 continue
614 }
615 if err := fn(info, nil); err != nil {
616 return err
617 }
618 }
619 }
620
621 func UpdateObjectNamespace(info *Info, err error) error {
622 if err != nil {
623 return err
624 }
625 if info.Object != nil {
626 return metadataAccessor.SetNamespace(info.Object, info.Namespace)
627 }
628 return nil
629 }
630
631
632 func FilterNamespace(info *Info, err error) error {
633 if err != nil {
634 return err
635 }
636 if !info.Namespaced() {
637 info.Namespace = ""
638 UpdateObjectNamespace(info, nil)
639 }
640 return nil
641 }
642
643
644
645 func SetNamespace(namespace string) VisitorFunc {
646 return func(info *Info, err error) error {
647 if err != nil {
648 return err
649 }
650 if !info.Namespaced() {
651 return nil
652 }
653 if len(info.Namespace) == 0 {
654 info.Namespace = namespace
655 UpdateObjectNamespace(info, nil)
656 }
657 return nil
658 }
659 }
660
661
662
663
664
665 func RequireNamespace(namespace string) VisitorFunc {
666 return func(info *Info, err error) error {
667 if err != nil {
668 return err
669 }
670 if !info.Namespaced() {
671 return nil
672 }
673 if len(info.Namespace) == 0 {
674 info.Namespace = namespace
675 UpdateObjectNamespace(info, nil)
676 return nil
677 }
678 if info.Namespace != namespace {
679 return fmt.Errorf("the namespace from the provided object %q does not match the namespace %q. You must pass '--namespace=%s' to perform this operation.", info.Namespace, namespace, info.Namespace)
680 }
681 return nil
682 }
683 }
684
685
686
687 func RetrieveLatest(info *Info, err error) error {
688 if err != nil {
689 return err
690 }
691 if meta.IsListType(info.Object) {
692 return fmt.Errorf("watch is only supported on individual resources and resource collections, but a list of resources is found")
693 }
694 if len(info.Name) == 0 {
695 return nil
696 }
697 if info.Namespaced() && len(info.Namespace) == 0 {
698 return fmt.Errorf("no namespace set on resource %s %q", info.Mapping.Resource, info.Name)
699 }
700 return info.Get()
701 }
702
703
704 func RetrieveLazy(info *Info, err error) error {
705 if err != nil {
706 return err
707 }
708 if info.Object == nil {
709 return info.Get()
710 }
711 return nil
712 }
713
714 type FilterFunc func(info *Info, err error) (bool, error)
715
716 type FilteredVisitor struct {
717 visitor Visitor
718 filters []FilterFunc
719 }
720
721 func NewFilteredVisitor(v Visitor, fn ...FilterFunc) Visitor {
722 if len(fn) == 0 {
723 return v
724 }
725 return FilteredVisitor{v, fn}
726 }
727
728 func (v FilteredVisitor) Visit(fn VisitorFunc) error {
729 return v.visitor.Visit(func(info *Info, err error) error {
730 if err != nil {
731 return err
732 }
733 for _, filter := range v.filters {
734 ok, err := filter(info, nil)
735 if err != nil {
736 return err
737 }
738 if !ok {
739 return nil
740 }
741 }
742 return fn(info, nil)
743 })
744 }
745
746 func FilterByLabelSelector(s labels.Selector) FilterFunc {
747 return func(info *Info, err error) (bool, error) {
748 if err != nil {
749 return false, err
750 }
751 a, err := meta.Accessor(info.Object)
752 if err != nil {
753 return false, err
754 }
755 if !s.Matches(labels.Set(a.GetLabels())) {
756 return false, nil
757 }
758 return true, nil
759 }
760 }
761
762 type InfoListVisitor []*Info
763
764 func (infos InfoListVisitor) Visit(fn VisitorFunc) error {
765 var err error
766 for _, i := range infos {
767 err = fn(i, err)
768 }
769 return err
770 }
771
View as plain text