1
16
17 package plugin
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "fmt"
24 "os"
25 "os/exec"
26 "path/filepath"
27 "strings"
28 "sync"
29 "time"
30
31 "golang.org/x/sync/singleflight"
32
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/runtime/serializer"
36 "k8s.io/apimachinery/pkg/runtime/serializer/json"
37 "k8s.io/client-go/tools/cache"
38 "k8s.io/klog/v2"
39 credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
40 "k8s.io/kubelet/pkg/apis/credentialprovider/install"
41 credentialproviderv1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1"
42 credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1"
43 credentialproviderv1beta1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1beta1"
44 "k8s.io/kubernetes/pkg/credentialprovider"
45 kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
46 kubeletconfigv1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1"
47 kubeletconfigv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1alpha1"
48 kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
49 "k8s.io/utils/clock"
50 )
51
52 const (
53 globalCacheKey = "global"
54 cachePurgeInterval = time.Minute * 15
55 )
56
57 var (
58 scheme = runtime.NewScheme()
59 codecs = serializer.NewCodecFactory(scheme)
60
61 apiVersions = map[string]schema.GroupVersion{
62 credentialproviderv1alpha1.SchemeGroupVersion.String(): credentialproviderv1alpha1.SchemeGroupVersion,
63 credentialproviderv1beta1.SchemeGroupVersion.String(): credentialproviderv1beta1.SchemeGroupVersion,
64 credentialproviderv1.SchemeGroupVersion.String(): credentialproviderv1.SchemeGroupVersion,
65 }
66 )
67
68 func init() {
69 install.Install(scheme)
70 kubeletconfig.AddToScheme(scheme)
71 kubeletconfigv1alpha1.AddToScheme(scheme)
72 kubeletconfigv1beta1.AddToScheme(scheme)
73 kubeletconfigv1.AddToScheme(scheme)
74 }
75
76
77
78 func RegisterCredentialProviderPlugins(pluginConfigFile, pluginBinDir string) error {
79 if _, err := os.Stat(pluginBinDir); err != nil {
80 if os.IsNotExist(err) {
81 return fmt.Errorf("plugin binary directory %s did not exist", pluginBinDir)
82 }
83
84 return fmt.Errorf("error inspecting binary directory %s: %w", pluginBinDir, err)
85 }
86
87 credentialProviderConfig, err := readCredentialProviderConfigFile(pluginConfigFile)
88 if err != nil {
89 return err
90 }
91
92 errs := validateCredentialProviderConfig(credentialProviderConfig)
93 if len(errs) > 0 {
94 return fmt.Errorf("failed to validate credential provider config: %v", errs.ToAggregate())
95 }
96
97
98 registerMetrics()
99
100 for _, provider := range credentialProviderConfig.Providers {
101
102
103 pluginBin, err := exec.LookPath(filepath.Join(pluginBinDir, provider.Name))
104 if err != nil {
105 if errors.Is(err, os.ErrNotExist) || errors.Is(err, exec.ErrNotFound) {
106 return fmt.Errorf("plugin binary executable %s did not exist", pluginBin)
107 }
108
109 return fmt.Errorf("error inspecting binary executable %s: %w", pluginBin, err)
110 }
111
112 plugin, err := newPluginProvider(pluginBinDir, provider)
113 if err != nil {
114 return fmt.Errorf("error initializing plugin provider %s: %w", provider.Name, err)
115 }
116
117 credentialprovider.RegisterCredentialProvider(provider.Name, plugin)
118 }
119
120 return nil
121 }
122
123
124 func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialProvider) (*pluginProvider, error) {
125 mediaType := "application/json"
126 info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
127 if !ok {
128 return nil, fmt.Errorf("unsupported media type %q", mediaType)
129 }
130
131 gv, ok := apiVersions[provider.APIVersion]
132 if !ok {
133 return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion)
134 }
135
136 clock := clock.RealClock{}
137
138 return &pluginProvider{
139 clock: clock,
140 matchImages: provider.MatchImages,
141 cache: cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}),
142 defaultCacheDuration: provider.DefaultCacheDuration.Duration,
143 lastCachePurge: clock.Now(),
144 plugin: &execPlugin{
145 name: provider.Name,
146 apiVersion: provider.APIVersion,
147 encoder: codecs.EncoderForVersion(info.Serializer, gv),
148 pluginBinDir: pluginBinDir,
149 args: provider.Args,
150 envVars: provider.Env,
151 environ: os.Environ,
152 },
153 }, nil
154 }
155
156
157 type pluginProvider struct {
158 clock clock.Clock
159
160 sync.Mutex
161
162 group singleflight.Group
163
164
165
166
167 matchImages []string
168
169
170
171 cache cache.Store
172
173
174 defaultCacheDuration time.Duration
175
176
177 plugin Plugin
178
179
180 lastCachePurge time.Time
181 }
182
183
184 type cacheEntry struct {
185 key string
186 credentials credentialprovider.DockerConfig
187 expiresAt time.Time
188 }
189
190
191 func cacheKeyFunc(obj interface{}) (string, error) {
192 key := obj.(*cacheEntry).key
193 return key, nil
194 }
195
196
197 type cacheExpirationPolicy struct {
198 clock clock.Clock
199 }
200
201
202
203 func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
204 return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
205 }
206
207
208
209 func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
210 if !p.isImageAllowed(image) {
211 return credentialprovider.DockerConfig{}
212 }
213
214 cachedConfig, found, err := p.getCachedCredentials(image)
215 if err != nil {
216 klog.Errorf("Failed to get cached docker config: %v", err)
217 return credentialprovider.DockerConfig{}
218 }
219
220 if found {
221 return cachedConfig
222 }
223
224
225
226
227
228
229
230 res, err, _ := p.group.Do(image, func() (interface{}, error) {
231 return p.plugin.ExecPlugin(context.Background(), image)
232 })
233
234 if err != nil {
235 klog.Errorf("Failed getting credential from external registry credential provider: %v", err)
236 return credentialprovider.DockerConfig{}
237 }
238
239 response, ok := res.(*credentialproviderapi.CredentialProviderResponse)
240 if !ok {
241 klog.Errorf("Invalid response type returned by external credential provider")
242 return credentialprovider.DockerConfig{}
243 }
244
245 var cacheKey string
246 switch cacheKeyType := response.CacheKeyType; cacheKeyType {
247 case credentialproviderapi.ImagePluginCacheKeyType:
248 cacheKey = image
249 case credentialproviderapi.RegistryPluginCacheKeyType:
250 registry := parseRegistry(image)
251 cacheKey = registry
252 case credentialproviderapi.GlobalPluginCacheKeyType:
253 cacheKey = globalCacheKey
254 default:
255 klog.Errorf("credential provider plugin did not return a valid cacheKeyType: %q", cacheKeyType)
256 return credentialprovider.DockerConfig{}
257 }
258
259 dockerConfig := make(credentialprovider.DockerConfig, len(response.Auth))
260 for matchImage, authConfig := range response.Auth {
261 dockerConfig[matchImage] = credentialprovider.DockerConfigEntry{
262 Username: authConfig.Username,
263 Password: authConfig.Password,
264 }
265 }
266
267
268 if response.CacheDuration != nil && response.CacheDuration.Duration == 0 {
269 return dockerConfig
270 }
271
272 var expiresAt time.Time
273
274 if response.CacheDuration == nil {
275 if p.defaultCacheDuration == 0 {
276 return dockerConfig
277 }
278 expiresAt = p.clock.Now().Add(p.defaultCacheDuration)
279 } else {
280 expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
281 }
282
283 cachedEntry := &cacheEntry{
284 key: cacheKey,
285 credentials: dockerConfig,
286 expiresAt: expiresAt,
287 }
288
289 if err := p.cache.Add(cachedEntry); err != nil {
290 klog.Errorf("Error adding auth entry to cache: %v", err)
291 }
292
293 return dockerConfig
294 }
295
296
297 func (p *pluginProvider) Enabled() bool {
298 return true
299 }
300
301
302 func (p *pluginProvider) isImageAllowed(image string) bool {
303 for _, matchImage := range p.matchImages {
304 if matched, _ := credentialprovider.URLsMatchStr(matchImage, image); matched {
305 return true
306 }
307 }
308
309 return false
310 }
311
312
313 func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
314 p.Lock()
315 if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
316
317
318
319 _ = p.cache.List()
320 p.lastCachePurge = p.clock.Now()
321 }
322 p.Unlock()
323
324 obj, found, err := p.cache.GetByKey(image)
325 if err != nil {
326 return nil, false, err
327 }
328
329 if found {
330 return obj.(*cacheEntry).credentials, true, nil
331 }
332
333 registry := parseRegistry(image)
334 obj, found, err = p.cache.GetByKey(registry)
335 if err != nil {
336 return nil, false, err
337 }
338
339 if found {
340 return obj.(*cacheEntry).credentials, true, nil
341 }
342
343 obj, found, err = p.cache.GetByKey(globalCacheKey)
344 if err != nil {
345 return nil, false, err
346 }
347
348 if found {
349 return obj.(*cacheEntry).credentials, true, nil
350 }
351
352 return nil, false, nil
353 }
354
355
356
357 type Plugin interface {
358 ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error)
359 }
360
361
362
363
364 type execPlugin struct {
365 name string
366 apiVersion string
367 encoder runtime.Encoder
368 args []string
369 envVars []kubeletconfig.ExecEnvVar
370 pluginBinDir string
371 environ func() []string
372 }
373
374
375
376
377
378
379
380 func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
381 klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)
382
383 authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
384 data, err := e.encodeRequest(authRequest)
385 if err != nil {
386 return nil, fmt.Errorf("failed to encode auth request: %w", err)
387 }
388
389 stdout := &bytes.Buffer{}
390 stderr := &bytes.Buffer{}
391 stdin := bytes.NewBuffer(data)
392
393
394
395
396
397 ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
398 defer cancel()
399
400 cmd := exec.CommandContext(ctx, filepath.Join(e.pluginBinDir, e.name), e.args...)
401 cmd.Stdout, cmd.Stderr, cmd.Stdin = stdout, stderr, stdin
402
403 var configEnvVars []string
404 for _, v := range e.envVars {
405 configEnvVars = append(configEnvVars, fmt.Sprintf("%s=%s", v.Name, v.Value))
406 }
407
408
409
410
411
412 cmd.Env = mergeEnvVars(e.environ(), configEnvVars)
413
414 if err = e.runPlugin(ctx, cmd, image); err != nil {
415 return nil, fmt.Errorf("%w: %s", err, stderr.String())
416 }
417
418 data = stdout.Bytes()
419
420 gvk, err := json.DefaultMetaFactory.Interpret(data)
421 if err != nil {
422 return nil, fmt.Errorf("error reading GVK from response: %w", err)
423 }
424
425 if gvk.GroupVersion().String() != e.apiVersion {
426 return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String())
427 }
428
429 response, err := e.decodeResponse(data)
430 if err != nil {
431
432 return nil, errors.New("error decoding credential provider plugin response from stdout")
433 }
434
435 return response, nil
436 }
437
438 func (e *execPlugin) runPlugin(ctx context.Context, cmd *exec.Cmd, image string) error {
439 startTime := time.Now()
440 defer func() {
441 kubeletCredentialProviderPluginDuration.WithLabelValues(e.name).Observe(time.Since(startTime).Seconds())
442 }()
443
444 err := cmd.Run()
445 if ctx.Err() != nil {
446 kubeletCredentialProviderPluginErrors.WithLabelValues(e.name).Inc()
447 return fmt.Errorf("error execing credential provider plugin %s for image %s: %w", e.name, image, ctx.Err())
448 }
449 if err != nil {
450 kubeletCredentialProviderPluginErrors.WithLabelValues(e.name).Inc()
451 return fmt.Errorf("error execing credential provider plugin %s for image %s: %w", e.name, image, err)
452 }
453 return nil
454 }
455
456
457 func (e *execPlugin) encodeRequest(request *credentialproviderapi.CredentialProviderRequest) ([]byte, error) {
458 data, err := runtime.Encode(e.encoder, request)
459 if err != nil {
460 return nil, fmt.Errorf("error encoding request: %w", err)
461 }
462
463 return data, nil
464 }
465
466
467 func (e *execPlugin) decodeResponse(data []byte) (*credentialproviderapi.CredentialProviderResponse, error) {
468 obj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
469 if err != nil {
470 return nil, err
471 }
472
473 if gvk.Kind != "CredentialProviderResponse" {
474 return nil, fmt.Errorf("failed to decode CredentialProviderResponse, unexpected Kind: %q", gvk.Kind)
475 }
476
477 if gvk.Group != credentialproviderapi.GroupName {
478 return nil, fmt.Errorf("failed to decode CredentialProviderResponse, unexpected Group: %s", gvk.Group)
479 }
480
481 if internalResponse, ok := obj.(*credentialproviderapi.CredentialProviderResponse); ok {
482 return internalResponse, nil
483 }
484
485 return nil, fmt.Errorf("unable to convert %T to *CredentialProviderResponse", obj)
486 }
487
488
489 func parseRegistry(image string) string {
490 imageParts := strings.Split(image, "/")
491 return imageParts[0]
492 }
493
494
495
496
497 func mergeEnvVars(sysEnvVars, credProviderVars []string) []string {
498 mergedEnvVars := sysEnvVars
499 mergedEnvVars = append(mergedEnvVars, credProviderVars...)
500 return mergedEnvVars
501 }
502
View as plain text