1
16
17 package rest
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "net/http"
25 "net/url"
26 "os"
27 "path/filepath"
28 gruntime "runtime"
29 "strings"
30 "time"
31
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/client-go/pkg/version"
36 clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
37 "k8s.io/client-go/transport"
38 certutil "k8s.io/client-go/util/cert"
39 "k8s.io/client-go/util/flowcontrol"
40 "k8s.io/klog/v2"
41 )
42
43 const (
44 DefaultQPS float32 = 5.0
45 DefaultBurst int = 10
46 )
47
48 var ErrNotInCluster = errors.New("unable to load in-cluster configuration, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined")
49
50
51
52 type Config struct {
53
54
55
56
57 Host string
58
59 APIPath string
60
61
62
63 ContentConfig
64
65
66 Username string
67 Password string `datapolicy:"password"`
68
69
70
71
72 BearerToken string `datapolicy:"token"`
73
74
75
76
77 BearerTokenFile string
78
79
80 Impersonate ImpersonationConfig
81
82
83 AuthProvider *clientcmdapi.AuthProviderConfig
84
85
86 AuthConfigPersister AuthProviderConfigPersister
87
88
89 ExecProvider *clientcmdapi.ExecConfig
90
91
92 TLSClientConfig
93
94
95 UserAgent string
96
97
98
99 DisableCompression bool
100
101
102
103
104 Transport http.RoundTripper
105
106
107
108
109
110
111
112 WrapTransport transport.WrapperFunc
113
114
115
116 QPS float32
117
118
119
120 Burst int
121
122
123 RateLimiter flowcontrol.RateLimiter
124
125
126
127
128 WarningHandler WarningHandler
129
130
131 Timeout time.Duration
132
133
134 Dial func(ctx context.Context, network, address string) (net.Conn, error)
135
136
137
138
139
140
141 Proxy func(*http.Request) (*url.URL, error)
142
143
144
145
146 }
147
148 var _ fmt.Stringer = new(Config)
149 var _ fmt.GoStringer = new(Config)
150
151 type sanitizedConfig *Config
152
153 type sanitizedAuthConfigPersister struct{ AuthProviderConfigPersister }
154
155 func (sanitizedAuthConfigPersister) GoString() string {
156 return "rest.AuthProviderConfigPersister(--- REDACTED ---)"
157 }
158 func (sanitizedAuthConfigPersister) String() string {
159 return "rest.AuthProviderConfigPersister(--- REDACTED ---)"
160 }
161
162 type sanitizedObject struct{ runtime.Object }
163
164 func (sanitizedObject) GoString() string {
165 return "runtime.Object(--- REDACTED ---)"
166 }
167 func (sanitizedObject) String() string {
168 return "runtime.Object(--- REDACTED ---)"
169 }
170
171
172
173 func (c *Config) GoString() string {
174 return c.String()
175 }
176
177
178
179 func (c *Config) String() string {
180 if c == nil {
181 return "<nil>"
182 }
183 cc := sanitizedConfig(CopyConfig(c))
184
185 if cc.Password != "" {
186 cc.Password = "--- REDACTED ---"
187 }
188 if cc.BearerToken != "" {
189 cc.BearerToken = "--- REDACTED ---"
190 }
191 if cc.AuthConfigPersister != nil {
192 cc.AuthConfigPersister = sanitizedAuthConfigPersister{cc.AuthConfigPersister}
193 }
194 if cc.ExecProvider != nil && cc.ExecProvider.Config != nil {
195 cc.ExecProvider.Config = sanitizedObject{Object: cc.ExecProvider.Config}
196 }
197 return fmt.Sprintf("%#v", cc)
198 }
199
200
201 type ImpersonationConfig struct {
202
203 UserName string
204
205 UID string
206
207 Groups []string
208
209
210 Extra map[string][]string
211 }
212
213
214
215 type TLSClientConfig struct {
216
217 Insecure bool
218
219
220
221 ServerName string
222
223
224 CertFile string
225
226 KeyFile string
227
228 CAFile string
229
230
231
232 CertData []byte
233
234
235 KeyData []byte `datapolicy:"security-key"`
236
237
238 CAData []byte
239
240
241
242
243
244 NextProtos []string
245 }
246
247 var _ fmt.Stringer = TLSClientConfig{}
248 var _ fmt.GoStringer = TLSClientConfig{}
249
250 type sanitizedTLSClientConfig TLSClientConfig
251
252
253
254 func (c TLSClientConfig) GoString() string {
255 return c.String()
256 }
257
258
259
260 func (c TLSClientConfig) String() string {
261 cc := sanitizedTLSClientConfig{
262 Insecure: c.Insecure,
263 ServerName: c.ServerName,
264 CertFile: c.CertFile,
265 KeyFile: c.KeyFile,
266 CAFile: c.CAFile,
267 CertData: c.CertData,
268 KeyData: c.KeyData,
269 CAData: c.CAData,
270 NextProtos: c.NextProtos,
271 }
272
273 if len(cc.CertData) != 0 {
274 cc.CertData = []byte("--- TRUNCATED ---")
275 }
276 if len(cc.KeyData) != 0 {
277 cc.KeyData = []byte("--- REDACTED ---")
278 }
279 return fmt.Sprintf("%#v", cc)
280 }
281
282 type ContentConfig struct {
283
284
285 AcceptContentTypes string
286
287
288
289
290 ContentType string
291
292
293
294 GroupVersion *schema.GroupVersion
295
296
297
298
299
300 NegotiatedSerializer runtime.NegotiatedSerializer
301 }
302
303
304
305
306
307
308
309 func RESTClientFor(config *Config) (*RESTClient, error) {
310 if config.GroupVersion == nil {
311 return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
312 }
313 if config.NegotiatedSerializer == nil {
314 return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
315 }
316
317
318
319 _, _, err := DefaultServerUrlFor(config)
320 if err != nil {
321 return nil, err
322 }
323
324 httpClient, err := HTTPClientFor(config)
325 if err != nil {
326 return nil, err
327 }
328
329 return RESTClientForConfigAndClient(config, httpClient)
330 }
331
332
333
334
335
336
337
338 func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
339 if config.GroupVersion == nil {
340 return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
341 }
342 if config.NegotiatedSerializer == nil {
343 return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
344 }
345
346 baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
347 if err != nil {
348 return nil, err
349 }
350
351 rateLimiter := config.RateLimiter
352 if rateLimiter == nil {
353 qps := config.QPS
354 if config.QPS == 0.0 {
355 qps = DefaultQPS
356 }
357 burst := config.Burst
358 if config.Burst == 0 {
359 burst = DefaultBurst
360 }
361 if qps > 0 {
362 rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
363 }
364 }
365
366 var gv schema.GroupVersion
367 if config.GroupVersion != nil {
368 gv = *config.GroupVersion
369 }
370 clientContent := ClientContentConfig{
371 AcceptContentTypes: config.AcceptContentTypes,
372 ContentType: config.ContentType,
373 GroupVersion: gv,
374 Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
375 }
376
377 restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
378 if err == nil && config.WarningHandler != nil {
379 restClient.warningHandler = config.WarningHandler
380 }
381 return restClient, err
382 }
383
384
385
386 func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
387 if config.NegotiatedSerializer == nil {
388 return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
389 }
390
391
392
393 _, _, err := DefaultServerUrlFor(config)
394 if err != nil {
395 return nil, err
396 }
397
398 httpClient, err := HTTPClientFor(config)
399 if err != nil {
400 return nil, err
401 }
402
403 return UnversionedRESTClientForConfigAndClient(config, httpClient)
404 }
405
406
407
408 func UnversionedRESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
409 if config.NegotiatedSerializer == nil {
410 return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
411 }
412
413 baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
414 if err != nil {
415 return nil, err
416 }
417
418 rateLimiter := config.RateLimiter
419 if rateLimiter == nil {
420 qps := config.QPS
421 if config.QPS == 0.0 {
422 qps = DefaultQPS
423 }
424 burst := config.Burst
425 if config.Burst == 0 {
426 burst = DefaultBurst
427 }
428 if qps > 0 {
429 rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
430 }
431 }
432
433 gv := metav1.SchemeGroupVersion
434 if config.GroupVersion != nil {
435 gv = *config.GroupVersion
436 }
437 clientContent := ClientContentConfig{
438 AcceptContentTypes: config.AcceptContentTypes,
439 ContentType: config.ContentType,
440 GroupVersion: gv,
441 Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
442 }
443
444 restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
445 if err == nil && config.WarningHandler != nil {
446 restClient.warningHandler = config.WarningHandler
447 }
448 return restClient, err
449 }
450
451
452
453 func SetKubernetesDefaults(config *Config) error {
454 if len(config.UserAgent) == 0 {
455 config.UserAgent = DefaultKubernetesUserAgent()
456 }
457 return nil
458 }
459
460
461 func adjustCommit(c string) string {
462 if len(c) == 0 {
463 return "unknown"
464 }
465 if len(c) > 7 {
466 return c[:7]
467 }
468 return c
469 }
470
471
472
473 func adjustVersion(v string) string {
474 if len(v) == 0 {
475 return "unknown"
476 }
477 seg := strings.SplitN(v, "-", 2)
478 return seg[0]
479 }
480
481
482
483 func adjustCommand(p string) string {
484
485 if len(p) == 0 {
486 return "unknown"
487 }
488 return filepath.Base(p)
489 }
490
491
492 func buildUserAgent(command, version, os, arch, commit string) string {
493 return fmt.Sprintf(
494 "%s/%s (%s/%s) kubernetes/%s", command, version, os, arch, commit)
495 }
496
497
498 func DefaultKubernetesUserAgent() string {
499 return buildUserAgent(
500 adjustCommand(os.Args[0]),
501 adjustVersion(version.Get().GitVersion),
502 gruntime.GOOS,
503 gruntime.GOARCH,
504 adjustCommit(version.Get().GitCommit))
505 }
506
507
508
509
510
511 func InClusterConfig() (*Config, error) {
512 const (
513 tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
514 rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
515 )
516 host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
517 if len(host) == 0 || len(port) == 0 {
518 return nil, ErrNotInCluster
519 }
520
521 token, err := os.ReadFile(tokenFile)
522 if err != nil {
523 return nil, err
524 }
525
526 tlsClientConfig := TLSClientConfig{}
527
528 if _, err := certutil.NewPool(rootCAFile); err != nil {
529 klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
530 } else {
531 tlsClientConfig.CAFile = rootCAFile
532 }
533
534 return &Config{
535
536 Host: "https://" + net.JoinHostPort(host, port),
537 TLSClientConfig: tlsClientConfig,
538 BearerToken: string(token),
539 BearerTokenFile: tokenFile,
540 }, nil
541 }
542
543
544
545
546
547
548
549
550 func IsConfigTransportTLS(config Config) bool {
551 baseURL, _, err := DefaultServerUrlFor(&config)
552 if err != nil {
553 return false
554 }
555 return baseURL.Scheme == "https"
556 }
557
558
559
560
561 func LoadTLSFiles(c *Config) error {
562 var err error
563 c.CAData, err = dataFromSliceOrFile(c.CAData, c.CAFile)
564 if err != nil {
565 return err
566 }
567
568 c.CertData, err = dataFromSliceOrFile(c.CertData, c.CertFile)
569 if err != nil {
570 return err
571 }
572
573 c.KeyData, err = dataFromSliceOrFile(c.KeyData, c.KeyFile)
574 return err
575 }
576
577
578
579 func dataFromSliceOrFile(data []byte, file string) ([]byte, error) {
580 if len(data) > 0 {
581 return data, nil
582 }
583 if len(file) > 0 {
584 fileData, err := os.ReadFile(file)
585 if err != nil {
586 return []byte{}, err
587 }
588 return fileData, nil
589 }
590 return nil, nil
591 }
592
593 func AddUserAgent(config *Config, userAgent string) *Config {
594 fullUserAgent := DefaultKubernetesUserAgent() + "/" + userAgent
595 config.UserAgent = fullUserAgent
596 return config
597 }
598
599
600 func AnonymousClientConfig(config *Config) *Config {
601
602 return &Config{
603 Host: config.Host,
604 APIPath: config.APIPath,
605 ContentConfig: config.ContentConfig,
606 TLSClientConfig: TLSClientConfig{
607 Insecure: config.Insecure,
608 ServerName: config.ServerName,
609 CAFile: config.TLSClientConfig.CAFile,
610 CAData: config.TLSClientConfig.CAData,
611 NextProtos: config.TLSClientConfig.NextProtos,
612 },
613 RateLimiter: config.RateLimiter,
614 WarningHandler: config.WarningHandler,
615 UserAgent: config.UserAgent,
616 DisableCompression: config.DisableCompression,
617 QPS: config.QPS,
618 Burst: config.Burst,
619 Timeout: config.Timeout,
620 Dial: config.Dial,
621 Proxy: config.Proxy,
622 }
623 }
624
625
626 func CopyConfig(config *Config) *Config {
627 c := &Config{
628 Host: config.Host,
629 APIPath: config.APIPath,
630 ContentConfig: config.ContentConfig,
631 Username: config.Username,
632 Password: config.Password,
633 BearerToken: config.BearerToken,
634 BearerTokenFile: config.BearerTokenFile,
635 Impersonate: ImpersonationConfig{
636 UserName: config.Impersonate.UserName,
637 UID: config.Impersonate.UID,
638 Groups: config.Impersonate.Groups,
639 Extra: config.Impersonate.Extra,
640 },
641 AuthProvider: config.AuthProvider,
642 AuthConfigPersister: config.AuthConfigPersister,
643 ExecProvider: config.ExecProvider,
644 TLSClientConfig: TLSClientConfig{
645 Insecure: config.TLSClientConfig.Insecure,
646 ServerName: config.TLSClientConfig.ServerName,
647 CertFile: config.TLSClientConfig.CertFile,
648 KeyFile: config.TLSClientConfig.KeyFile,
649 CAFile: config.TLSClientConfig.CAFile,
650 CertData: config.TLSClientConfig.CertData,
651 KeyData: config.TLSClientConfig.KeyData,
652 CAData: config.TLSClientConfig.CAData,
653 NextProtos: config.TLSClientConfig.NextProtos,
654 },
655 UserAgent: config.UserAgent,
656 DisableCompression: config.DisableCompression,
657 Transport: config.Transport,
658 WrapTransport: config.WrapTransport,
659 QPS: config.QPS,
660 Burst: config.Burst,
661 RateLimiter: config.RateLimiter,
662 WarningHandler: config.WarningHandler,
663 Timeout: config.Timeout,
664 Dial: config.Dial,
665 Proxy: config.Proxy,
666 }
667 if config.ExecProvider != nil && config.ExecProvider.Config != nil {
668 c.ExecProvider.Config = config.ExecProvider.Config.DeepCopyObject()
669 }
670 return c
671 }
672
View as plain text