...
1
2
3
4 package polling
5
6 import (
7 "context"
8 "fmt"
9 "time"
10
11 "k8s.io/apimachinery/pkg/api/meta"
12 cmdutil "k8s.io/kubectl/pkg/cmd/util"
13 "k8s.io/kubectl/pkg/scheme"
14 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
15 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
16 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
17 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
18 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
19 "sigs.k8s.io/cli-utils/pkg/object"
20 "sigs.k8s.io/controller-runtime/pkg/client"
21 )
22
23
24
25 func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper, o Options) *StatusPoller {
26 setDefaults(&o)
27 var statusReaders []engine.StatusReader
28
29 statusReaders = append(statusReaders, o.CustomStatusReaders...)
30
31 srs, defaultStatusReader := createStatusReaders(mapper)
32 statusReaders = append(statusReaders, srs...)
33
34 return &StatusPoller{
35 engine: &engine.PollerEngine{
36 Reader: reader,
37 Mapper: mapper,
38 DefaultStatusReader: defaultStatusReader,
39 StatusReaders: statusReaders,
40 ClusterReaderFactory: o.ClusterReaderFactory,
41 },
42 }
43 }
44
45
46
47 func NewStatusPollerFromFactory(f cmdutil.Factory, o Options) (*StatusPoller, error) {
48 config, err := f.ToRESTConfig()
49 if err != nil {
50 return nil, fmt.Errorf("error getting RESTConfig: %w", err)
51 }
52
53 mapper, err := f.ToRESTMapper()
54 if err != nil {
55 return nil, fmt.Errorf("error getting RESTMapper: %w", err)
56 }
57
58 c, err := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
59 if err != nil {
60 return nil, fmt.Errorf("error creating client: %w", err)
61 }
62
63 return NewStatusPoller(c, mapper, o), nil
64 }
65
66 func setDefaults(o *Options) {
67 if o.ClusterReaderFactory == nil {
68 o.ClusterReaderFactory = engine.ClusterReaderFactoryFunc(clusterreader.NewCachingClusterReader)
69 }
70 }
71
72
73
74 type Options struct {
75
76
77 CustomStatusReaders []engine.StatusReader
78
79
80
81 ClusterReaderFactory engine.ClusterReaderFactory
82 }
83
84
85 type StatusPoller struct {
86 engine *engine.PollerEngine
87 }
88
89
90
91
92 func (s *StatusPoller) Poll(ctx context.Context, identifiers object.ObjMetadataSet, options PollOptions) <-chan event.Event {
93 return s.engine.Poll(ctx, identifiers, engine.Options{
94 PollInterval: options.PollInterval,
95 })
96 }
97
98
99
100 type PollOptions struct {
101
102
103 PollInterval time.Duration
104 }
105
106
107
108
109
110
111 func createStatusReaders(mapper meta.RESTMapper) ([]engine.StatusReader, engine.StatusReader) {
112 defaultStatusReader := statusreaders.NewGenericStatusReader(mapper, status.Compute)
113
114 replicaSetStatusReader := statusreaders.NewReplicaSetStatusReader(mapper, defaultStatusReader)
115 deploymentStatusReader := statusreaders.NewDeploymentResourceReader(mapper, replicaSetStatusReader)
116 statefulSetStatusReader := statusreaders.NewStatefulSetResourceReader(mapper, defaultStatusReader)
117
118 statusReaders := []engine.StatusReader{
119 deploymentStatusReader,
120 statefulSetStatusReader,
121 replicaSetStatusReader,
122 }
123
124 return statusReaders, defaultStatusReader
125 }
126
View as plain text