1
16
17 package cluster
18
19 import (
20 "context"
21 "errors"
22 "net/http"
23 "time"
24
25 "github.com/go-logr/logr"
26 "k8s.io/apimachinery/pkg/api/meta"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/client-go/kubernetes/scheme"
29 "k8s.io/client-go/rest"
30 "k8s.io/client-go/tools/record"
31
32 "sigs.k8s.io/controller-runtime/pkg/cache"
33 "sigs.k8s.io/controller-runtime/pkg/client"
34 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
35 logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
36 intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
37 )
38
39
40 type Cluster interface {
41
42 GetHTTPClient() *http.Client
43
44
45 GetConfig() *rest.Config
46
47
48 GetCache() cache.Cache
49
50
51 GetScheme() *runtime.Scheme
52
53
54
55
56
57 GetClient() client.Client
58
59
60 GetFieldIndexer() client.FieldIndexer
61
62
63 GetEventRecorderFor(name string) record.EventRecorder
64
65
66 GetRESTMapper() meta.RESTMapper
67
68
69
70
71 GetAPIReader() client.Reader
72
73
74 Start(ctx context.Context) error
75 }
76
77
78 type Options struct {
79
80
81
82 Scheme *runtime.Scheme
83
84
85 MapperProvider func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error)
86
87
88
89 Logger logr.Logger
90
91
92
93
94
95
96
97
98
99 SyncPeriod *time.Duration
100
101
102
103
104 HTTPClient *http.Client
105
106
107
108 Cache cache.Options
109
110
111
112
113
114
115
116
117
118 NewCache cache.NewCacheFunc
119
120
121
122 Client client.Options
123
124
125
126
127
128
129
130
131
132
133 NewClient client.NewClientFunc
134
135
136
137
138
139
140 EventBroadcaster record.EventBroadcaster
141
142
143
144
145
146 makeBroadcaster intrec.EventBroadcasterProducer
147
148
149 newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
150 }
151
152
153 type Option func(*Options)
154
155
156 func New(config *rest.Config, opts ...Option) (Cluster, error) {
157 if config == nil {
158 return nil, errors.New("must specify Config")
159 }
160
161 originalConfig := config
162
163 config = rest.CopyConfig(config)
164 if config.UserAgent == "" {
165 config.UserAgent = rest.DefaultKubernetesUserAgent()
166 }
167
168 options := Options{}
169 for _, opt := range opts {
170 opt(&options)
171 }
172 options, err := setOptionsDefaults(options, config)
173 if err != nil {
174 options.Logger.Error(err, "Failed to set defaults")
175 return nil, err
176 }
177
178
179 mapper, err := options.MapperProvider(config, options.HTTPClient)
180 if err != nil {
181 options.Logger.Error(err, "Failed to get API Group-Resources")
182 return nil, err
183 }
184
185
186 cacheOpts := options.Cache
187 {
188 if cacheOpts.Scheme == nil {
189 cacheOpts.Scheme = options.Scheme
190 }
191 if cacheOpts.Mapper == nil {
192 cacheOpts.Mapper = mapper
193 }
194 if cacheOpts.HTTPClient == nil {
195 cacheOpts.HTTPClient = options.HTTPClient
196 }
197 if cacheOpts.SyncPeriod == nil {
198 cacheOpts.SyncPeriod = options.SyncPeriod
199 }
200 }
201 cache, err := options.NewCache(config, cacheOpts)
202 if err != nil {
203 return nil, err
204 }
205
206
207 clientOpts := options.Client
208 {
209 if clientOpts.Scheme == nil {
210 clientOpts.Scheme = options.Scheme
211 }
212 if clientOpts.Mapper == nil {
213 clientOpts.Mapper = mapper
214 }
215 if clientOpts.HTTPClient == nil {
216 clientOpts.HTTPClient = options.HTTPClient
217 }
218 if clientOpts.Cache == nil {
219 clientOpts.Cache = &client.CacheOptions{
220 Unstructured: false,
221 }
222 }
223 if clientOpts.Cache.Reader == nil {
224 clientOpts.Cache.Reader = cache
225 }
226 }
227 clientWriter, err := options.NewClient(config, clientOpts)
228 if err != nil {
229 return nil, err
230 }
231
232
233 clientReader, err := client.New(config, client.Options{
234 HTTPClient: options.HTTPClient,
235 Scheme: options.Scheme,
236 Mapper: mapper,
237 })
238 if err != nil {
239 return nil, err
240 }
241
242
243
244
245 recorderProvider, err := options.newRecorderProvider(config, options.HTTPClient, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
246 if err != nil {
247 return nil, err
248 }
249
250 return &cluster{
251 config: originalConfig,
252 httpClient: options.HTTPClient,
253 scheme: options.Scheme,
254 cache: cache,
255 fieldIndexes: cache,
256 client: clientWriter,
257 apiReader: clientReader,
258 recorderProvider: recorderProvider,
259 mapper: mapper,
260 logger: options.Logger,
261 }, nil
262 }
263
264
265 func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
266 if options.HTTPClient == nil {
267 var err error
268 options.HTTPClient, err = rest.HTTPClientFor(config)
269 if err != nil {
270 return options, err
271 }
272 }
273
274
275 if options.Scheme == nil {
276 options.Scheme = scheme.Scheme
277 }
278
279 if options.MapperProvider == nil {
280 options.MapperProvider = apiutil.NewDynamicRESTMapper
281 }
282
283
284 if options.NewClient == nil {
285 options.NewClient = client.New
286 }
287
288
289 if options.NewCache == nil {
290 options.NewCache = cache.New
291 }
292
293
294 if options.newRecorderProvider == nil {
295 options.newRecorderProvider = intrec.NewProvider
296 }
297
298
299
300 if options.EventBroadcaster == nil {
301
302 options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
303 return record.NewBroadcaster(), true
304 }
305 } else {
306 options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
307 return options.EventBroadcaster, false
308 }
309 }
310
311 if options.Logger.GetSink() == nil {
312 options.Logger = logf.RuntimeLog.WithName("cluster")
313 }
314
315 return options, nil
316 }
317
View as plain text