1 package kates
2
3 import (
4 "bufio"
5 "context"
6 "fmt"
7 "io"
8 "os"
9 "reflect"
10 "strconv"
11 "strings"
12 "sync"
13 "time"
14
15 "github.com/pkg/errors"
16 "github.com/spf13/pflag"
17
18
19 apierrors "k8s.io/apimachinery/pkg/api/errors"
20 "k8s.io/apimachinery/pkg/api/meta"
21 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
22 "k8s.io/apimachinery/pkg/runtime"
23 "k8s.io/apimachinery/pkg/runtime/schema"
24 "k8s.io/apimachinery/pkg/watch"
25 "k8s.io/client-go/discovery"
26 "k8s.io/client-go/discovery/cached/disk"
27 "k8s.io/client-go/discovery/cached/memory"
28 "k8s.io/client-go/dynamic"
29 "k8s.io/client-go/rest"
30 "k8s.io/client-go/restmapper"
31 "k8s.io/client-go/tools/cache"
32 "k8s.io/kubectl/pkg/polymorphichelpers"
33
34
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
37
38 _ "k8s.io/client-go/plugin/pkg/client/auth"
39
40 "github.com/datawire/dlib/dlog"
41 kates_internal "github.com/emissary-ingress/emissary/v3/pkg/kates_internal"
42 )
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 type Client struct {
80 config *ConfigFlags
81 cli dynamic.Interface
82 mapper meta.RESTMapper
83 disco discovery.CachedDiscoveryInterface
84 mutex sync.Mutex
85 canonical map[string]*Unstructured
86 maxAccumulatorInterval time.Duration
87
88
89
90
91
92 watchAdded func(*Unstructured, *Unstructured)
93 watchUpdated func(*Unstructured, *Unstructured)
94 watchDeleted func(*Unstructured, *Unstructured)
95 }
96
97
98
99 type ClientConfig struct {
100 Kubeconfig string
101 Context string
102 Namespace string
103 }
104
105
106 func NewClient(options ClientConfig) (*Client, error) {
107 return NewClientFromConfigFlags(options.toConfigFlags())
108 }
109
110
111
112 func NewClientFactory(flags *pflag.FlagSet) func() (*Client, error) {
113 if flags.Parsed() {
114
115 panic("kates.NewClientFactory(flagset) must be called before flagset.Parse()")
116 }
117
118 config := NewConfigFlags(false)
119
120
121
122
123
124
125
126 config.AddFlags(flags)
127
128 return func() (*Client, error) {
129 if !flags.Parsed() {
130 return nil, fmt.Errorf("kates client factory must be called after flagset.Parse()")
131 }
132 return NewClientFromConfigFlags(config)
133 }
134 }
135
136 func NewClientFromConfigFlags(config *ConfigFlags) (*Client, error) {
137 restconfig, err := config.ToRESTConfig()
138 if err != nil {
139 return nil, err
140 }
141
142 cli, err := dynamic.NewForConfig(restconfig)
143 if err != nil {
144 return nil, err
145 }
146
147 mapper, disco, err := NewRESTMapper(config)
148 if err != nil {
149 return nil, err
150 }
151
152 return &Client{
153 config: config,
154 cli: cli,
155 mapper: mapper,
156 disco: disco,
157 canonical: make(map[string]*Unstructured),
158 maxAccumulatorInterval: 1 * time.Second,
159 watchAdded: func(oldObj, newObj *Unstructured) {},
160 watchUpdated: func(oldObj, newObj *Unstructured) {},
161 watchDeleted: func(oldObj, newObj *Unstructured) {},
162 }, nil
163 }
164
165 func NewRESTMapper(config *ConfigFlags) (meta.RESTMapper, discovery.CachedDiscoveryInterface, error) {
166
167
168
169
170
171
172
173
174
175
176 restconfig, err := config.ToRESTConfig()
177 if err != nil {
178 return nil, nil, err
179 }
180 restconfig.QPS = 1000000
181 restconfig.Burst = 1000000
182
183 var cachedDiscoveryClient discovery.CachedDiscoveryInterface
184 if config.CacheDir != nil {
185 cachedDiscoveryClient, err = disk.NewCachedDiscoveryClientForConfig(restconfig, *config.CacheDir, "",
186 time.Duration(10*time.Minute))
187 if err != nil {
188 return nil, nil, err
189 }
190 } else {
191 discoveryClient, err := discovery.NewDiscoveryClientForConfig(restconfig)
192 if err != nil {
193 return nil, nil, err
194 }
195 cachedDiscoveryClient = memory.NewMemCacheClient(discoveryClient)
196 }
197
198 mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
199 expander := restmapper.NewShortcutExpander(mapper, cachedDiscoveryClient)
200
201 return expander, cachedDiscoveryClient, nil
202 }
203
204
205
206
207
208
209 func InCluster() bool {
210 fi, err := os.Stat("/var/run/secrets/kubernetes.io/serviceaccount/token")
211 return os.Getenv("KUBERNETES_SERVICE_HOST") != "" &&
212 os.Getenv("KUBERNETES_SERVICE_PORT") != "" &&
213 err == nil && !fi.IsDir()
214 }
215
216
217
218 func (c *Client) MaxAccumulatorInterval(interval time.Duration) error {
219 c.mutex.Lock()
220 defer c.mutex.Unlock()
221 if interval <= 0 {
222 return fmt.Errorf("interval must be positive")
223 }
224 c.maxAccumulatorInterval = interval
225 return nil
226 }
227
228
229 func (c *Client) DynamicInterface() dynamic.Interface {
230 return c.cli
231 }
232
233 func (c *Client) WaitFor(ctx context.Context, kindOrResource string) error {
234 for {
235 _, err := c.mappingFor(kindOrResource)
236 if err != nil {
237 _, ok := err.(*unknownResource)
238 if ok {
239 select {
240 case <-time.After(1 * time.Second):
241 if err := c.InvalidateCache(); err != nil {
242 return err
243 }
244 continue
245 case <-ctx.Done():
246 return nil
247 }
248 }
249 }
250 return nil
251 }
252 }
253
254 func (c *Client) InvalidateCache() error {
255
256
257 mapper, disco, err := NewRESTMapper(c.config)
258 if err != nil {
259 return err
260 }
261 c.mapper = mapper
262 c.disco = disco
263 return nil
264 }
265
266
267
268 func (c *Client) ServerVersion() (*VersionInfo, error) {
269 return c.disco.ServerVersion()
270 }
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 func processAPIResourceLists(listsByGV []*metav1.APIResourceList) []APIResource {
288
289 count := 0
290 for _, list := range listsByGV {
291 if list != nil {
292 count += len(list.APIResources)
293 }
294 }
295 if count == 0 {
296 return nil
297 }
298
299
300 ret := make([]APIResource, 0, count)
301 for _, list := range listsByGV {
302 if list != nil {
303 gv, err := schema.ParseGroupVersion(list.GroupVersion)
304 if err != nil {
305 continue
306 }
307 for _, typeinfo := range list.APIResources {
308
309
310
311 if typeinfo.Group == "" {
312 typeinfo.Group = gv.Group
313 }
314 if typeinfo.Version == "" {
315 typeinfo.Version = gv.Version
316 }
317 ret = append(ret, typeinfo)
318 }
319 }
320 }
321
322 return ret
323 }
324
325
326
327
328
329 func (c *Client) ServerPreferredResources() ([]APIResource, error) {
330
331
332 listsByGV, err := c.disco.ServerPreferredResources()
333 return processAPIResourceLists(listsByGV), err
334 }
335
336
337
338
339
340 func (c *Client) ServerResources() ([]APIResource, error) {
341
342
343 _, listsByGV, err := c.disco.ServerGroupsAndResources()
344 return processAPIResourceLists(listsByGV), err
345 }
346
347
348
349
350
351
352
353
354
355 type Query struct {
356
357
358
359 Name string
360
361 Kind string
362
363 Namespace string
364
365
366
367
368 FieldSelector string
369
370
371 LabelSelector string
372 }
373
374 func (c *Client) Watch(ctx context.Context, queries ...Query) (*Accumulator, error) {
375 return newAccumulator(ctx, c, queries...)
376 }
377
378
379
380 func (c *Client) watchRaw(ctx context.Context, query Query, target chan rawUpdate, cli dynamic.ResourceInterface) {
381 var informer cache.SharedInformer
382
383
384
385
386 lw := newListWatcher(ctx, cli, query, func(lw *lw) {
387 if lw.hasSynced() {
388 target <- rawUpdate{query.Name, true, nil, nil, time.Now()}
389 }
390 })
391 informer = cache.NewSharedInformer(lw, &Unstructured{}, 5*time.Minute)
392
393
394
395
418 informer.AddEventHandler(
419 cache.ResourceEventHandlerFuncs{
420 AddFunc: func(obj interface{}) {
421
422
423
424
425 c.watchAdded(nil, obj.(*Unstructured))
426 lw.countAddEvent()
427 target <- rawUpdate{query.Name, lw.hasSynced(), nil, obj.(*Unstructured), time.Now()}
428 },
429 UpdateFunc: func(oldObj, newObj interface{}) {
430 old := oldObj.(*Unstructured)
431 new := newObj.(*Unstructured)
432
433
434
435
436
437 c.watchUpdated(old, new)
438 target <- rawUpdate{query.Name, lw.hasSynced(), old, new, time.Now()}
439 },
440 DeleteFunc: func(obj interface{}) {
441 var old *Unstructured
442 switch o := obj.(type) {
443 case cache.DeletedFinalStateUnknown:
444 old = o.Obj.(*Unstructured)
445 case *Unstructured:
446 old = o
447 }
448
449
450
451
452
453 c.watchDeleted(old, nil)
454
455 key := unKey(old)
456
457
458 c.mutex.Lock()
459 delete(c.canonical, key)
460 c.mutex.Unlock()
461 target <- rawUpdate{query.Name, lw.hasSynced(), old, nil, time.Now()}
462 },
463 },
464 )
465
466 go informer.Run(ctx.Done())
467 }
468
469 type rawUpdate struct {
470 name string
471 synced bool
472 old *unstructured.Unstructured
473 new *unstructured.Unstructured
474 ts time.Time
475 }
476
477 type lw struct {
478
479 ctx context.Context
480 client dynamic.ResourceInterface
481 query Query
482 synced func(*lw)
483 once sync.Once
484
485
486 mutex sync.Mutex
487 initialListDone bool
488 initialListCount int
489 addEventCount int
490 listForbidden bool
491 }
492
493 func newListWatcher(ctx context.Context, client dynamic.ResourceInterface, query Query, synced func(*lw)) *lw {
494 return &lw{ctx: ctx, client: client, query: query, synced: synced}
495 }
496
497 func (lw *lw) withMutex(f func()) {
498 lw.mutex.Lock()
499 defer lw.mutex.Unlock()
500 f()
501 }
502
503 func (lw *lw) countAddEvent() {
504 lw.withMutex(func() {
505 lw.addEventCount++
506 })
507 }
508
509
510
511
512
513
514
515
516
517
518
519 func (lw *lw) hasSynced() (result bool) {
520 lw.withMutex(func() {
521 result = lw.initialListDone && lw.addEventCount >= lw.initialListCount
522 })
523 return
524 }
525
526
527
528 func (lw *lw) List(opts ListOptions) (runtime.Object, error) {
529
530
531
532 synced := false
533 forbidden := false
534
535 opts.FieldSelector = lw.query.FieldSelector
536 opts.LabelSelector = lw.query.LabelSelector
537 result, err := lw.client.List(lw.ctx, opts)
538
539 if err == nil {
540
541 synced = true
542
543 forbidden = false
544 } else if apierrors.IsForbidden(err) {
545
546
547
548 synced = true
549 forbidden = true
550
551
552
553 result = &unstructured.UnstructuredList{}
554 err = nil
555 } else {
556
557
558 dlog.Infof(lw.ctx, "couldn't list %s (will retry): %s", lw.query.Kind, err)
559 }
560
561 lw.withMutex(func() {
562 if synced {
563 if !lw.initialListDone {
564 lw.initialListDone = true
565 lw.initialListCount = len(result.Items)
566 }
567 }
568
569 lw.listForbidden = forbidden
570 })
571
572 return result, err
573 }
574
575 func (lw *lw) Watch(opts ListOptions) (watch.Interface, error) {
576 lw.once.Do(func() { lw.synced(lw) })
577 opts.FieldSelector = lw.query.FieldSelector
578 opts.LabelSelector = lw.query.LabelSelector
579
580 iface, err := lw.client.Watch(lw.ctx, opts)
581
582 if err != nil {
583
584
585
586 if lw.listForbidden {
587 err = errors.New(fmt.Sprintf("can't watch %s: forbidden", lw.query.Kind))
588 } else {
589
590
591 err = errors.Wrap(err, fmt.Sprintf("can't watch %s", lw.query.Kind))
592 }
593 }
594
595 return iface, err
596 }
597
598
599
600 func (c *Client) cliFor(mapping *meta.RESTMapping, namespace string) dynamic.ResourceInterface {
601 cli := c.cli.Resource(mapping.Resource)
602 if mapping.Scope.Name() == meta.RESTScopeNameNamespace && namespace != NamespaceAll {
603 return cli.Namespace(namespace)
604 } else {
605 return cli
606 }
607 }
608
609 func (c *Client) cliForResource(resource *Unstructured) (dynamic.ResourceInterface, error) {
610 mapping, err := c.mappingFor(resource.GroupVersionKind().GroupKind().String())
611 if err != nil {
612 return nil, err
613 }
614
615
616
617 resource.SetGroupVersionKind(mapping.GroupVersionKind)
618
619 ns := resource.GetNamespace()
620 if ns == "" {
621 ns = "default"
622 }
623 return c.cliFor(mapping, ns), nil
624 }
625
626 func (c *Client) newField(q Query) (*field, error) {
627 mapping, err := c.mappingFor(q.Kind)
628 if err != nil {
629 return nil, err
630 }
631 sel, err := ParseSelector(q.LabelSelector)
632 if err != nil {
633 return nil, err
634 }
635
636 return &field{
637 query: q,
638 mapping: mapping,
639 selector: sel,
640 values: make(map[string]*Unstructured),
641 deltas: make(map[string]*Delta),
642 }, nil
643 }
644
645
646
647
648
649
650
651 func (c *Client) mappingFor(resourceOrKind string) (*meta.RESTMapping, error) {
652 fullySpecifiedGVR, groupResource := schema.ParseResourceArg(resourceOrKind)
653 gvk := schema.GroupVersionKind{}
654
655
656 if fullySpecifiedGVR != nil {
657 gvk, _ = c.mapper.KindFor(*fullySpecifiedGVR)
658 }
659 if gvk.Empty() {
660 gvk, _ = c.mapper.KindFor(groupResource.WithVersion(""))
661 }
662 if !gvk.Empty() {
663 return c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
664 }
665
666 fullySpecifiedGVK, groupKind := schema.ParseKindArg(resourceOrKind)
667 if fullySpecifiedGVK == nil {
668 gvk := groupKind.WithVersion("")
669 fullySpecifiedGVK = &gvk
670 }
671
672 if !fullySpecifiedGVK.Empty() {
673 if mapping, err := c.mapper.RESTMapping(fullySpecifiedGVK.GroupKind(), fullySpecifiedGVK.Version); err == nil {
674 return mapping, nil
675 }
676 }
677
678 mapping, err := c.mapper.RESTMapping(groupKind, gvk.Version)
679 if err != nil {
680
681
682
683
684
685 if meta.IsNoMatchError(err) {
686 return nil, &unknownResource{resourceOrKind}
687 }
688 return nil, err
689 }
690
691 return mapping, nil
692 }
693
694 type unknownResource struct {
695 arg string
696 }
697
698 func (e *unknownResource) Error() string {
699 return fmt.Sprintf("the server doesn't have a resource type %q", e.arg)
700 }
701
702
703
704 func (c *Client) List(ctx context.Context, query Query, target interface{}) error {
705 mapping, err := c.mappingFor(query.Kind)
706 if err != nil {
707 return err
708 }
709
710 items := make([]*Unstructured, 0)
711 if err := func() error {
712 c.mutex.Lock()
713 defer c.mutex.Unlock()
714 cli := c.cliFor(mapping, query.Namespace)
715 res, err := cli.List(ctx, ListOptions{
716 FieldSelector: query.FieldSelector,
717 LabelSelector: query.LabelSelector,
718 })
719 if err != nil {
720 return err
721 }
722
723 for _, un := range res.Items {
724 copy := un.DeepCopy()
725 key := unKey(copy)
726
727
728 c.canonical[key] = copy
729 items = append(items, copy)
730 }
731 return nil
732 }(); err != nil {
733 return err
734 }
735
736 return convert(items, target)
737 }
738
739
740
741 func (c *Client) Get(ctx context.Context, resource interface{}, target interface{}) error {
742 var un Unstructured
743 err := convert(resource, &un)
744 if err != nil {
745 return err
746 }
747
748 var res *Unstructured
749 if err := func() error {
750 c.mutex.Lock()
751 defer c.mutex.Unlock()
752 cli, err := c.cliForResource(&un)
753 if err != nil {
754 return err
755 }
756 res, err = cli.Get(ctx, un.GetName(), GetOptions{})
757 if err != nil {
758 return err
759 }
760 key := unKey(res)
761
762
763 c.canonical[key] = res
764 return nil
765 }(); err != nil {
766 return err
767 }
768
769 return convert(res, target)
770 }
771
772
773
774 func (c *Client) Create(ctx context.Context, resource interface{}, target interface{}) error {
775 var un Unstructured
776 err := convert(resource, &un)
777 if err != nil {
778 return err
779 }
780
781 var res *Unstructured
782 if err := func() error {
783 c.mutex.Lock()
784 defer c.mutex.Unlock()
785 cli, err := c.cliForResource(&un)
786 if err != nil {
787 return err
788 }
789 res, err = cli.Create(ctx, &un, CreateOptions{})
790 if err != nil {
791 return err
792 }
793 key := unKey(res)
794 c.canonical[key] = res
795 return nil
796 }(); err != nil {
797 return err
798 }
799
800 return convert(res, target)
801 }
802
803
804
805 func (c *Client) Update(ctx context.Context, resource interface{}, target interface{}) error {
806 var un Unstructured
807 err := convert(resource, &un)
808 if err != nil {
809 return err
810 }
811
812 prev := un.GetResourceVersion()
813
814 var res *Unstructured
815 if err := func() error {
816 c.mutex.Lock()
817 defer c.mutex.Unlock()
818 cli, err := c.cliForResource(&un)
819 if err != nil {
820 return err
821 }
822 res, err = cli.Update(ctx, &un, UpdateOptions{})
823 if err != nil {
824 return err
825 }
826 if res.GetResourceVersion() != prev {
827 key := unKey(res)
828 c.canonical[key] = res
829 }
830 return nil
831 }(); err != nil {
832 return err
833 }
834
835 return convert(res, target)
836 }
837
838
839
840 func (c *Client) Patch(ctx context.Context, resource interface{}, pt PatchType, data []byte, target interface{}) error {
841 var un Unstructured
842 err := convert(resource, &un)
843 if err != nil {
844 return err
845 }
846
847 prev := un.GetResourceVersion()
848
849 var res *Unstructured
850 if err := func() error {
851 c.mutex.Lock()
852 defer c.mutex.Unlock()
853 cli, err := c.cliForResource(&un)
854 if err != nil {
855 return err
856 }
857 res, err = cli.Patch(ctx, un.GetName(), pt, data, PatchOptions{})
858 if err != nil {
859 return err
860 }
861 if res.GetResourceVersion() != prev {
862 key := unKey(res)
863 c.canonical[key] = res
864 }
865 return nil
866 }(); err != nil {
867 return err
868 }
869
870 return convert(res, target)
871 }
872
873
874
875 func (c *Client) Upsert(ctx context.Context, resource interface{}, source interface{}, target interface{}) error {
876 if resource == nil || reflect.ValueOf(resource).IsNil() {
877 resource = source
878 }
879
880 var un Unstructured
881 err := convert(resource, &un)
882 if err != nil {
883 return err
884 }
885
886 var unsrc Unstructured
887 err = convert(source, &unsrc)
888 if err != nil {
889 return err
890 }
891 MergeUpdate(&un, &unsrc)
892
893 prev := un.GetResourceVersion()
894
895 var res *Unstructured
896 if err := func() error {
897 c.mutex.Lock()
898 defer c.mutex.Unlock()
899 cli, err := c.cliForResource(&un)
900 if err != nil {
901 return err
902 }
903 create := false
904 rsrc := &un
905 if prev == "" {
906 stored, err := cli.Get(ctx, un.GetName(), GetOptions{})
907 if err != nil {
908 if IsNotFound(err) {
909 create = true
910 rsrc = &un
911 } else {
912 return err
913 }
914 } else {
915 rsrc = stored
916 MergeUpdate(rsrc, &unsrc)
917 }
918 }
919 if create {
920 res, err = cli.Create(ctx, rsrc, CreateOptions{})
921 } else {
922
923 update:
924 res, err = cli.Update(ctx, rsrc, UpdateOptions{})
925 if err != nil && IsConflict(err) {
926 stored, err := cli.Get(ctx, un.GetName(), GetOptions{})
927 if err != nil {
928 return err
929 }
930 rsrc = stored
931 MergeUpdate(rsrc, &unsrc)
932 goto update
933 }
934 }
935 if err != nil {
936 return err
937 }
938 if res.GetResourceVersion() != prev {
939 key := unKey(res)
940 c.canonical[key] = res
941 }
942 return nil
943 }(); err != nil {
944 return err
945 }
946
947 return convert(res, target)
948 }
949
950
951
952 func (c *Client) UpdateStatus(ctx context.Context, resource interface{}, target interface{}) error {
953 var un Unstructured
954 err := convert(resource, &un)
955 if err != nil {
956 return err
957 }
958
959 prev := un.GetResourceVersion()
960
961 var res *Unstructured
962 if err := func() error {
963 c.mutex.Lock()
964 defer c.mutex.Unlock()
965 cli, err := c.cliForResource(&un)
966 if err != nil {
967 return err
968 }
969 res, err = cli.UpdateStatus(ctx, &un, UpdateOptions{})
970 if err != nil {
971 return err
972 }
973 if res.GetResourceVersion() != prev {
974 key := unKey(res)
975 c.canonical[key] = res
976 }
977 return nil
978 }(); err != nil {
979 return err
980 }
981
982 return convert(res, target)
983 }
984
985
986
987 func (c *Client) Delete(ctx context.Context, resource interface{}, target interface{}) error {
988 var un Unstructured
989 err := convert(resource, &un)
990 if err != nil {
991 return err
992 }
993
994 if err := func() error {
995 c.mutex.Lock()
996 defer c.mutex.Unlock()
997 cli, err := c.cliForResource(&un)
998 if err != nil {
999 return err
1000 }
1001 err = cli.Delete(ctx, un.GetName(), DeleteOptions{})
1002 if err != nil {
1003 return err
1004 }
1005 key := unKey(&un)
1006 c.canonical[key] = nil
1007 return nil
1008 }(); err != nil {
1009 return err
1010 }
1011
1012 return convert(nil, target)
1013 }
1014
1015
1016
1017
1018
1019 func (c *Client) patchWatch(ctx context.Context, field *field) error {
1020 c.mutex.Lock()
1021 defer c.mutex.Unlock()
1022
1023
1024
1025
1026
1027
1028
1029
1030 for key, can := range c.canonical {
1031 item, ok := field.values[key]
1032 if ok {
1033
1034 if can == nil {
1035
1036
1037 dlog.Println(ctx, "Patching delete", field.mapping.GroupVersionKind.Kind, key)
1038 delete(field.values, key)
1039 field.deltas[key] = newDelta(ObjectDelete, item)
1040 } else if newer, err := gteq(item.GetResourceVersion(), can.GetResourceVersion()); err != nil {
1041 return err
1042 } else if newer {
1043
1044
1045 dlog.Println(ctx, "Patching synced", field.mapping.GroupVersionKind.Kind, key)
1046 delete(c.canonical, key)
1047 } else {
1048
1049
1050 dlog.Println(ctx, "Patching update", field.mapping.GroupVersionKind.Kind, key)
1051 field.values[key] = can
1052 field.deltas[key] = newDelta(ObjectUpdate, can)
1053 }
1054 } else if can != nil && can.GroupVersionKind() == field.mapping.GroupVersionKind &&
1055 field.selector.Matches(LabelSet(can.GetLabels())) {
1056
1057 dlog.Println(ctx, "Patching add", field.mapping.GroupVersionKind.Kind, key)
1058 field.values[key] = can
1059 field.deltas[key] = newDelta(ObjectAdd, can)
1060 }
1061 }
1062 return nil
1063 }
1064
1065
1066
1067
1068
1069 type LogEvent struct {
1070
1071 PodID string `json:"podID"`
1072
1073 Timestamp string `json:"timestamp"`
1074
1075
1076 Output string `json:"output,omitempty"`
1077
1078
1079
1080 Closed bool
1081
1082 Error error `json:"error,omitempty"`
1083 }
1084
1085 func parseLogLine(line string) (timestamp string, output string) {
1086 if parts := strings.SplitN(line, " ", 2); len(parts) == 2 {
1087 if _, err := time.Parse(time.RFC3339Nano, parts[0]); err == nil {
1088 timestamp = parts[0]
1089 output = parts[1]
1090 return
1091 }
1092 }
1093 output = line
1094 return
1095 }
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111 func (c *Client) PodLogs(ctx context.Context, pod *Pod, options *PodLogOptions, events chan<- LogEvent) error {
1112
1113 options.Timestamps = true
1114 timeout := 10 * time.Second
1115 allContainers := true
1116
1117 requests, err := polymorphichelpers.LogsForObjectFn(c.config, pod, options, timeout,
1118 allContainers)
1119 if err != nil {
1120 return err
1121 }
1122
1123 podID := string(pod.GetUID())
1124 for _, request := range requests {
1125 go func(request rest.ResponseWrapper) {
1126 readCloser, err := request.Stream(ctx)
1127 if err != nil {
1128 events <- LogEvent{PodID: podID, Error: err, Closed: true}
1129 return
1130 }
1131 defer readCloser.Close()
1132
1133 r := bufio.NewReader(readCloser)
1134 for {
1135 bytes, err := r.ReadBytes('\n')
1136 if len(bytes) > 0 {
1137 timestamp, output := parseLogLine(string(bytes))
1138 events <- LogEvent{
1139 PodID: podID,
1140 Timestamp: timestamp,
1141 Output: output,
1142 }
1143 }
1144 if err != nil {
1145 if err != io.EOF {
1146 events <- LogEvent{
1147 PodID: podID,
1148 Error: err,
1149 Closed: true,
1150 }
1151 } else {
1152 events <- LogEvent{PodID: podID, Closed: true}
1153 }
1154 return
1155 }
1156 }
1157 }(request)
1158 }
1159
1160 return nil
1161 }
1162
1163
1164
1165
1166
1167
1168
1169 func gteq(v1, v2 string) (bool, error) {
1170 i1, err := strconv.ParseInt(v1, 10, 64)
1171 if err != nil {
1172 return false, err
1173 }
1174 i2, err := strconv.ParseInt(v2, 10, 64)
1175 if err != nil {
1176 return false, err
1177 }
1178 return i1 >= i2, nil
1179 }
1180
1181 func convert(in interface{}, out interface{}) error {
1182 return kates_internal.Convert(in, out)
1183 }
1184
1185 func unKey(u *Unstructured) string {
1186 return string(u.GetUID())
1187 }
1188
1189 func (options ClientConfig) toConfigFlags() *ConfigFlags {
1190 result := NewConfigFlags(false)
1191
1192 if options.Kubeconfig != "" {
1193 result.KubeConfig = &options.Kubeconfig
1194 }
1195 if options.Context != "" {
1196 result.Context = &options.Context
1197 }
1198 if options.Namespace != "" {
1199 result.Namespace = &options.Namespace
1200 }
1201
1202 return result
1203 }
1204
View as plain text