...

Source file src/k8s.io/kubernetes/pkg/credentialprovider/plugin/plugin.go

Documentation: k8s.io/kubernetes/pkg/credentialprovider/plugin

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package plugin
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"os"
    25  	"os/exec"
    26  	"path/filepath"
    27  	"strings"
    28  	"sync"
    29  	"time"
    30  
    31  	"golang.org/x/sync/singleflight"
    32  
    33  	"k8s.io/apimachinery/pkg/runtime"
    34  	"k8s.io/apimachinery/pkg/runtime/schema"
    35  	"k8s.io/apimachinery/pkg/runtime/serializer"
    36  	"k8s.io/apimachinery/pkg/runtime/serializer/json"
    37  	"k8s.io/client-go/tools/cache"
    38  	"k8s.io/klog/v2"
    39  	credentialproviderapi "k8s.io/kubelet/pkg/apis/credentialprovider"
    40  	"k8s.io/kubelet/pkg/apis/credentialprovider/install"
    41  	credentialproviderv1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1"
    42  	credentialproviderv1alpha1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1alpha1"
    43  	credentialproviderv1beta1 "k8s.io/kubelet/pkg/apis/credentialprovider/v1beta1"
    44  	"k8s.io/kubernetes/pkg/credentialprovider"
    45  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    46  	kubeletconfigv1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1"
    47  	kubeletconfigv1alpha1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1alpha1"
    48  	kubeletconfigv1beta1 "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
    49  	"k8s.io/utils/clock"
    50  )
    51  
    52  const (
    53  	globalCacheKey     = "global"
    54  	cachePurgeInterval = time.Minute * 15
    55  )
    56  
    57  var (
    58  	scheme = runtime.NewScheme()
    59  	codecs = serializer.NewCodecFactory(scheme)
    60  
    61  	apiVersions = map[string]schema.GroupVersion{
    62  		credentialproviderv1alpha1.SchemeGroupVersion.String(): credentialproviderv1alpha1.SchemeGroupVersion,
    63  		credentialproviderv1beta1.SchemeGroupVersion.String():  credentialproviderv1beta1.SchemeGroupVersion,
    64  		credentialproviderv1.SchemeGroupVersion.String():       credentialproviderv1.SchemeGroupVersion,
    65  	}
    66  )
    67  
    68  func init() {
    69  	install.Install(scheme)
    70  	kubeletconfig.AddToScheme(scheme)
    71  	kubeletconfigv1alpha1.AddToScheme(scheme)
    72  	kubeletconfigv1beta1.AddToScheme(scheme)
    73  	kubeletconfigv1.AddToScheme(scheme)
    74  }
    75  
    76  // RegisterCredentialProviderPlugins is called from kubelet to register external credential provider
    77  // plugins according to the CredentialProviderConfig config file.
    78  func RegisterCredentialProviderPlugins(pluginConfigFile, pluginBinDir string) error {
    79  	if _, err := os.Stat(pluginBinDir); err != nil {
    80  		if os.IsNotExist(err) {
    81  			return fmt.Errorf("plugin binary directory %s did not exist", pluginBinDir)
    82  		}
    83  
    84  		return fmt.Errorf("error inspecting binary directory %s: %w", pluginBinDir, err)
    85  	}
    86  
    87  	credentialProviderConfig, err := readCredentialProviderConfigFile(pluginConfigFile)
    88  	if err != nil {
    89  		return err
    90  	}
    91  
    92  	errs := validateCredentialProviderConfig(credentialProviderConfig)
    93  	if len(errs) > 0 {
    94  		return fmt.Errorf("failed to validate credential provider config: %v", errs.ToAggregate())
    95  	}
    96  
    97  	// Register metrics for credential providers
    98  	registerMetrics()
    99  
   100  	for _, provider := range credentialProviderConfig.Providers {
   101  		// Considering Windows binary with suffix ".exe", LookPath() helps to find the correct path.
   102  		// LookPath() also calls os.Stat().
   103  		pluginBin, err := exec.LookPath(filepath.Join(pluginBinDir, provider.Name))
   104  		if err != nil {
   105  			if errors.Is(err, os.ErrNotExist) || errors.Is(err, exec.ErrNotFound) {
   106  				return fmt.Errorf("plugin binary executable %s did not exist", pluginBin)
   107  			}
   108  
   109  			return fmt.Errorf("error inspecting binary executable %s: %w", pluginBin, err)
   110  		}
   111  
   112  		plugin, err := newPluginProvider(pluginBinDir, provider)
   113  		if err != nil {
   114  			return fmt.Errorf("error initializing plugin provider %s: %w", provider.Name, err)
   115  		}
   116  
   117  		credentialprovider.RegisterCredentialProvider(provider.Name, plugin)
   118  	}
   119  
   120  	return nil
   121  }
   122  
   123  // newPluginProvider returns a new pluginProvider based on the credential provider config.
   124  func newPluginProvider(pluginBinDir string, provider kubeletconfig.CredentialProvider) (*pluginProvider, error) {
   125  	mediaType := "application/json"
   126  	info, ok := runtime.SerializerInfoForMediaType(codecs.SupportedMediaTypes(), mediaType)
   127  	if !ok {
   128  		return nil, fmt.Errorf("unsupported media type %q", mediaType)
   129  	}
   130  
   131  	gv, ok := apiVersions[provider.APIVersion]
   132  	if !ok {
   133  		return nil, fmt.Errorf("invalid apiVersion: %q", provider.APIVersion)
   134  	}
   135  
   136  	clock := clock.RealClock{}
   137  
   138  	return &pluginProvider{
   139  		clock:                clock,
   140  		matchImages:          provider.MatchImages,
   141  		cache:                cache.NewExpirationStore(cacheKeyFunc, &cacheExpirationPolicy{clock: clock}),
   142  		defaultCacheDuration: provider.DefaultCacheDuration.Duration,
   143  		lastCachePurge:       clock.Now(),
   144  		plugin: &execPlugin{
   145  			name:         provider.Name,
   146  			apiVersion:   provider.APIVersion,
   147  			encoder:      codecs.EncoderForVersion(info.Serializer, gv),
   148  			pluginBinDir: pluginBinDir,
   149  			args:         provider.Args,
   150  			envVars:      provider.Env,
   151  			environ:      os.Environ,
   152  		},
   153  	}, nil
   154  }
   155  
   156  // pluginProvider is the plugin-based implementation of the DockerConfigProvider interface.
   157  type pluginProvider struct {
   158  	clock clock.Clock
   159  
   160  	sync.Mutex
   161  
   162  	group singleflight.Group
   163  
   164  	// matchImages defines the matching image URLs this plugin should operate against.
   165  	// The plugin provider will not return any credentials for images that do not match
   166  	// against this list of match URLs.
   167  	matchImages []string
   168  
   169  	// cache stores DockerConfig entries with an expiration time based on the cache duration
   170  	// returned from the credential provider plugin.
   171  	cache cache.Store
   172  	// defaultCacheDuration is the default duration credentials are cached in-memory if the auth plugin
   173  	// response did not provide a cache duration for credentials.
   174  	defaultCacheDuration time.Duration
   175  
   176  	// plugin is the exec implementation of the credential providing plugin.
   177  	plugin Plugin
   178  
   179  	// lastCachePurge is the last time cache is cleaned for expired entries.
   180  	lastCachePurge time.Time
   181  }
   182  
   183  // cacheEntry is the cache object that will be stored in cache.Store.
   184  type cacheEntry struct {
   185  	key         string
   186  	credentials credentialprovider.DockerConfig
   187  	expiresAt   time.Time
   188  }
   189  
   190  // cacheKeyFunc extracts AuthEntry.MatchKey as the cache key function for the plugin provider.
   191  func cacheKeyFunc(obj interface{}) (string, error) {
   192  	key := obj.(*cacheEntry).key
   193  	return key, nil
   194  }
   195  
   196  // cacheExpirationPolicy defines implements cache.ExpirationPolicy, determining expiration based on the expiresAt timestamp.
   197  type cacheExpirationPolicy struct {
   198  	clock clock.Clock
   199  }
   200  
   201  // IsExpired returns true if the current time is after cacheEntry.expiresAt, which is determined by the
   202  // cache duration returned from the credential provider plugin response.
   203  func (c *cacheExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
   204  	return c.clock.Now().After(entry.Obj.(*cacheEntry).expiresAt)
   205  }
   206  
   207  // Provide returns a credentialprovider.DockerConfig based on the credentials returned
   208  // from cache or the exec plugin.
   209  func (p *pluginProvider) Provide(image string) credentialprovider.DockerConfig {
   210  	if !p.isImageAllowed(image) {
   211  		return credentialprovider.DockerConfig{}
   212  	}
   213  
   214  	cachedConfig, found, err := p.getCachedCredentials(image)
   215  	if err != nil {
   216  		klog.Errorf("Failed to get cached docker config: %v", err)
   217  		return credentialprovider.DockerConfig{}
   218  	}
   219  
   220  	if found {
   221  		return cachedConfig
   222  	}
   223  
   224  	// ExecPlugin is wrapped in single flight to exec plugin once for concurrent same image request.
   225  	// The caveat here is we don't know cacheKeyType yet, so if cacheKeyType is registry/global and credentials saved in cache
   226  	// on per registry/global basis then exec will be called for all requests if requests are made concurrently.
   227  	// foo.bar.registry
   228  	// foo.bar.registry/image1
   229  	// foo.bar.registry/image2
   230  	res, err, _ := p.group.Do(image, func() (interface{}, error) {
   231  		return p.plugin.ExecPlugin(context.Background(), image)
   232  	})
   233  
   234  	if err != nil {
   235  		klog.Errorf("Failed getting credential from external registry credential provider: %v", err)
   236  		return credentialprovider.DockerConfig{}
   237  	}
   238  
   239  	response, ok := res.(*credentialproviderapi.CredentialProviderResponse)
   240  	if !ok {
   241  		klog.Errorf("Invalid response type returned by external credential provider")
   242  		return credentialprovider.DockerConfig{}
   243  	}
   244  
   245  	var cacheKey string
   246  	switch cacheKeyType := response.CacheKeyType; cacheKeyType {
   247  	case credentialproviderapi.ImagePluginCacheKeyType:
   248  		cacheKey = image
   249  	case credentialproviderapi.RegistryPluginCacheKeyType:
   250  		registry := parseRegistry(image)
   251  		cacheKey = registry
   252  	case credentialproviderapi.GlobalPluginCacheKeyType:
   253  		cacheKey = globalCacheKey
   254  	default:
   255  		klog.Errorf("credential provider plugin did not return a valid cacheKeyType: %q", cacheKeyType)
   256  		return credentialprovider.DockerConfig{}
   257  	}
   258  
   259  	dockerConfig := make(credentialprovider.DockerConfig, len(response.Auth))
   260  	for matchImage, authConfig := range response.Auth {
   261  		dockerConfig[matchImage] = credentialprovider.DockerConfigEntry{
   262  			Username: authConfig.Username,
   263  			Password: authConfig.Password,
   264  		}
   265  	}
   266  
   267  	// cache duration was explicitly 0 so don't cache this response at all.
   268  	if response.CacheDuration != nil && response.CacheDuration.Duration == 0 {
   269  		return dockerConfig
   270  	}
   271  
   272  	var expiresAt time.Time
   273  	// nil cache duration means use the default cache duration
   274  	if response.CacheDuration == nil {
   275  		if p.defaultCacheDuration == 0 {
   276  			return dockerConfig
   277  		}
   278  		expiresAt = p.clock.Now().Add(p.defaultCacheDuration)
   279  	} else {
   280  		expiresAt = p.clock.Now().Add(response.CacheDuration.Duration)
   281  	}
   282  
   283  	cachedEntry := &cacheEntry{
   284  		key:         cacheKey,
   285  		credentials: dockerConfig,
   286  		expiresAt:   expiresAt,
   287  	}
   288  
   289  	if err := p.cache.Add(cachedEntry); err != nil {
   290  		klog.Errorf("Error adding auth entry to cache: %v", err)
   291  	}
   292  
   293  	return dockerConfig
   294  }
   295  
   296  // Enabled always returns true since registration of the plugin via kubelet implies it should be enabled.
   297  func (p *pluginProvider) Enabled() bool {
   298  	return true
   299  }
   300  
   301  // isImageAllowed returns true if the image matches against the list of allowed matches by the plugin.
   302  func (p *pluginProvider) isImageAllowed(image string) bool {
   303  	for _, matchImage := range p.matchImages {
   304  		if matched, _ := credentialprovider.URLsMatchStr(matchImage, image); matched {
   305  			return true
   306  		}
   307  	}
   308  
   309  	return false
   310  }
   311  
   312  // getCachedCredentials returns a credentialprovider.DockerConfig if cached from the plugin.
   313  func (p *pluginProvider) getCachedCredentials(image string) (credentialprovider.DockerConfig, bool, error) {
   314  	p.Lock()
   315  	if p.clock.Now().After(p.lastCachePurge.Add(cachePurgeInterval)) {
   316  		// NewExpirationCache purges expired entries when List() is called
   317  		// The expired entry in the cache is removed only when Get or List called on it.
   318  		// List() is called on some interval to remove those expired entries on which Get is never called.
   319  		_ = p.cache.List()
   320  		p.lastCachePurge = p.clock.Now()
   321  	}
   322  	p.Unlock()
   323  
   324  	obj, found, err := p.cache.GetByKey(image)
   325  	if err != nil {
   326  		return nil, false, err
   327  	}
   328  
   329  	if found {
   330  		return obj.(*cacheEntry).credentials, true, nil
   331  	}
   332  
   333  	registry := parseRegistry(image)
   334  	obj, found, err = p.cache.GetByKey(registry)
   335  	if err != nil {
   336  		return nil, false, err
   337  	}
   338  
   339  	if found {
   340  		return obj.(*cacheEntry).credentials, true, nil
   341  	}
   342  
   343  	obj, found, err = p.cache.GetByKey(globalCacheKey)
   344  	if err != nil {
   345  		return nil, false, err
   346  	}
   347  
   348  	if found {
   349  		return obj.(*cacheEntry).credentials, true, nil
   350  	}
   351  
   352  	return nil, false, nil
   353  }
   354  
   355  // Plugin is the interface calling ExecPlugin. This is mainly for testability
   356  // so tests don't have to actually exec any processes.
   357  type Plugin interface {
   358  	ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error)
   359  }
   360  
   361  // execPlugin is the implementation of the Plugin interface that execs a credential provider plugin based
   362  // on it's name provided in CredentialProviderConfig. It is assumed that the executable is available in the
   363  // plugin directory provided by the kubelet.
   364  type execPlugin struct {
   365  	name         string
   366  	apiVersion   string
   367  	encoder      runtime.Encoder
   368  	args         []string
   369  	envVars      []kubeletconfig.ExecEnvVar
   370  	pluginBinDir string
   371  	environ      func() []string
   372  }
   373  
   374  // ExecPlugin executes the plugin binary with arguments and environment variables specified in CredentialProviderConfig:
   375  //
   376  //	$ ENV_NAME=ENV_VALUE <plugin-name> args[0] args[1] <<<request
   377  //
   378  // The plugin is expected to receive the CredentialProviderRequest API via stdin from the kubelet and
   379  // return CredentialProviderResponse via stdout.
   380  func (e *execPlugin) ExecPlugin(ctx context.Context, image string) (*credentialproviderapi.CredentialProviderResponse, error) {
   381  	klog.V(5).Infof("Getting image %s credentials from external exec plugin %s", image, e.name)
   382  
   383  	authRequest := &credentialproviderapi.CredentialProviderRequest{Image: image}
   384  	data, err := e.encodeRequest(authRequest)
   385  	if err != nil {
   386  		return nil, fmt.Errorf("failed to encode auth request: %w", err)
   387  	}
   388  
   389  	stdout := &bytes.Buffer{}
   390  	stderr := &bytes.Buffer{}
   391  	stdin := bytes.NewBuffer(data)
   392  
   393  	// Use a catch-all timeout of 1 minute for all exec-based plugins, this should leave enough
   394  	// head room in case a plugin needs to retry a failed request while ensuring an exec plugin
   395  	// does not run forever. In the future we may want this timeout to be tweakable from the plugin
   396  	// config file.
   397  	ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
   398  	defer cancel()
   399  
   400  	cmd := exec.CommandContext(ctx, filepath.Join(e.pluginBinDir, e.name), e.args...)
   401  	cmd.Stdout, cmd.Stderr, cmd.Stdin = stdout, stderr, stdin
   402  
   403  	var configEnvVars []string
   404  	for _, v := range e.envVars {
   405  		configEnvVars = append(configEnvVars, fmt.Sprintf("%s=%s", v.Name, v.Value))
   406  	}
   407  
   408  	// Append current system environment variables, to the ones configured in the
   409  	// credential provider file. Failing to do so may result in unsuccessful execution
   410  	// of the provider binary, see https://github.com/kubernetes/kubernetes/issues/102750
   411  	// also, this behaviour is inline with Credential Provider Config spec
   412  	cmd.Env = mergeEnvVars(e.environ(), configEnvVars)
   413  
   414  	if err = e.runPlugin(ctx, cmd, image); err != nil {
   415  		return nil, fmt.Errorf("%w: %s", err, stderr.String())
   416  	}
   417  
   418  	data = stdout.Bytes()
   419  	// check that the response apiVersion matches what is expected
   420  	gvk, err := json.DefaultMetaFactory.Interpret(data)
   421  	if err != nil {
   422  		return nil, fmt.Errorf("error reading GVK from response: %w", err)
   423  	}
   424  
   425  	if gvk.GroupVersion().String() != e.apiVersion {
   426  		return nil, fmt.Errorf("apiVersion from credential plugin response did not match expected apiVersion:%s, actual apiVersion:%s", e.apiVersion, gvk.GroupVersion().String())
   427  	}
   428  
   429  	response, err := e.decodeResponse(data)
   430  	if err != nil {
   431  		// err is explicitly not wrapped since it may contain credentials in the response.
   432  		return nil, errors.New("error decoding credential provider plugin response from stdout")
   433  	}
   434  
   435  	return response, nil
   436  }
   437  
   438  func (e *execPlugin) runPlugin(ctx context.Context, cmd *exec.Cmd, image string) error {
   439  	startTime := time.Now()
   440  	defer func() {
   441  		kubeletCredentialProviderPluginDuration.WithLabelValues(e.name).Observe(time.Since(startTime).Seconds())
   442  	}()
   443  
   444  	err := cmd.Run()
   445  	if ctx.Err() != nil {
   446  		kubeletCredentialProviderPluginErrors.WithLabelValues(e.name).Inc()
   447  		return fmt.Errorf("error execing credential provider plugin %s for image %s: %w", e.name, image, ctx.Err())
   448  	}
   449  	if err != nil {
   450  		kubeletCredentialProviderPluginErrors.WithLabelValues(e.name).Inc()
   451  		return fmt.Errorf("error execing credential provider plugin %s for image %s: %w", e.name, image, err)
   452  	}
   453  	return nil
   454  }
   455  
   456  // encodeRequest encodes the internal CredentialProviderRequest type into the v1alpha1 version in json
   457  func (e *execPlugin) encodeRequest(request *credentialproviderapi.CredentialProviderRequest) ([]byte, error) {
   458  	data, err := runtime.Encode(e.encoder, request)
   459  	if err != nil {
   460  		return nil, fmt.Errorf("error encoding request: %w", err)
   461  	}
   462  
   463  	return data, nil
   464  }
   465  
   466  // decodeResponse decodes data into the internal CredentialProviderResponse type
   467  func (e *execPlugin) decodeResponse(data []byte) (*credentialproviderapi.CredentialProviderResponse, error) {
   468  	obj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
   469  	if err != nil {
   470  		return nil, err
   471  	}
   472  
   473  	if gvk.Kind != "CredentialProviderResponse" {
   474  		return nil, fmt.Errorf("failed to decode CredentialProviderResponse, unexpected Kind: %q", gvk.Kind)
   475  	}
   476  
   477  	if gvk.Group != credentialproviderapi.GroupName {
   478  		return nil, fmt.Errorf("failed to decode CredentialProviderResponse, unexpected Group: %s", gvk.Group)
   479  	}
   480  
   481  	if internalResponse, ok := obj.(*credentialproviderapi.CredentialProviderResponse); ok {
   482  		return internalResponse, nil
   483  	}
   484  
   485  	return nil, fmt.Errorf("unable to convert %T to *CredentialProviderResponse", obj)
   486  }
   487  
   488  // parseRegistry extracts the registry hostname of an image (including port if specified).
   489  func parseRegistry(image string) string {
   490  	imageParts := strings.Split(image, "/")
   491  	return imageParts[0]
   492  }
   493  
   494  // mergedEnvVars overlays system defined env vars with credential provider env vars,
   495  // it gives priority to the credential provider vars allowing user to override system
   496  // env vars
   497  func mergeEnvVars(sysEnvVars, credProviderVars []string) []string {
   498  	mergedEnvVars := sysEnvVars
   499  	mergedEnvVars = append(mergedEnvVars, credProviderVars...)
   500  	return mergedEnvVars
   501  }
   502  

View as plain text