...

Source file src/k8s.io/kubernetes/pkg/kubelet/token/token_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/token

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package token implements a manager of serviceaccount tokens for pods running
    18  // on the node.
    19  package token
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"math/rand"
    26  	"sync"
    27  	"time"
    28  
    29  	authenticationv1 "k8s.io/api/authentication/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/klog/v2"
    36  	"k8s.io/utils/clock"
    37  )
    38  
    39  const (
    40  	maxTTL    = 24 * time.Hour
    41  	gcPeriod  = time.Minute
    42  	maxJitter = 10 * time.Second
    43  )
    44  
    45  // NewManager returns a new token manager.
    46  func NewManager(c clientset.Interface) *Manager {
    47  	// check whether the server supports token requests so we can give a more helpful error message
    48  	supported := false
    49  	once := &sync.Once{}
    50  	tokenRequestsSupported := func() bool {
    51  		once.Do(func() {
    52  			resources, err := c.Discovery().ServerResourcesForGroupVersion("v1")
    53  			if err != nil {
    54  				return
    55  			}
    56  			for _, resource := range resources.APIResources {
    57  				if resource.Name == "serviceaccounts/token" {
    58  					supported = true
    59  					return
    60  				}
    61  			}
    62  		})
    63  		return supported
    64  	}
    65  
    66  	m := &Manager{
    67  		getToken: func(name, namespace string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
    68  			if c == nil {
    69  				return nil, errors.New("cannot use TokenManager when kubelet is in standalone mode")
    70  			}
    71  			tokenRequest, err := c.CoreV1().ServiceAccounts(namespace).CreateToken(context.TODO(), name, tr, metav1.CreateOptions{})
    72  			if apierrors.IsNotFound(err) && !tokenRequestsSupported() {
    73  				return nil, fmt.Errorf("the API server does not have TokenRequest endpoints enabled")
    74  			}
    75  			return tokenRequest, err
    76  		},
    77  		cache: make(map[string]*authenticationv1.TokenRequest),
    78  		clock: clock.RealClock{},
    79  	}
    80  	go wait.Forever(m.cleanup, gcPeriod)
    81  	return m
    82  }
    83  
    84  // Manager manages service account tokens for pods.
    85  type Manager struct {
    86  
    87  	// cacheMutex guards the cache
    88  	cacheMutex sync.RWMutex
    89  	cache      map[string]*authenticationv1.TokenRequest
    90  
    91  	// mocked for testing
    92  	getToken func(name, namespace string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error)
    93  	clock    clock.Clock
    94  }
    95  
    96  // GetServiceAccountToken gets a service account token for a pod from cache or
    97  // from the TokenRequest API. This process is as follows:
    98  // * Check the cache for the current token request.
    99  // * If the token exists and does not require a refresh, return the current token.
   100  // * Attempt to refresh the token.
   101  // * If the token is refreshed successfully, save it in the cache and return the token.
   102  // * If refresh fails and the old token is still valid, log an error and return the old token.
   103  // * If refresh fails and the old token is no longer valid, return an error
   104  func (m *Manager) GetServiceAccountToken(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
   105  	key := keyFunc(name, namespace, tr)
   106  
   107  	ctr, ok := m.get(key)
   108  
   109  	if ok && !m.requiresRefresh(ctr) {
   110  		return ctr, nil
   111  	}
   112  
   113  	tr, err := m.getToken(name, namespace, tr)
   114  	if err != nil {
   115  		switch {
   116  		case !ok:
   117  			return nil, fmt.Errorf("failed to fetch token: %v", err)
   118  		case m.expired(ctr):
   119  			return nil, fmt.Errorf("token %s expired and refresh failed: %v", key, err)
   120  		default:
   121  			klog.ErrorS(err, "Couldn't update token", "cacheKey", key)
   122  			return ctr, nil
   123  		}
   124  	}
   125  
   126  	m.set(key, tr)
   127  	return tr, nil
   128  }
   129  
   130  // DeleteServiceAccountToken should be invoked when pod got deleted. It simply
   131  // clean token manager cache.
   132  func (m *Manager) DeleteServiceAccountToken(podUID types.UID) {
   133  	m.cacheMutex.Lock()
   134  	defer m.cacheMutex.Unlock()
   135  	for k, tr := range m.cache {
   136  		if tr.Spec.BoundObjectRef.UID == podUID {
   137  			delete(m.cache, k)
   138  		}
   139  	}
   140  }
   141  
   142  func (m *Manager) cleanup() {
   143  	m.cacheMutex.Lock()
   144  	defer m.cacheMutex.Unlock()
   145  	for k, tr := range m.cache {
   146  		if m.expired(tr) {
   147  			delete(m.cache, k)
   148  		}
   149  	}
   150  }
   151  
   152  func (m *Manager) get(key string) (*authenticationv1.TokenRequest, bool) {
   153  	m.cacheMutex.RLock()
   154  	defer m.cacheMutex.RUnlock()
   155  	ctr, ok := m.cache[key]
   156  	return ctr, ok
   157  }
   158  
   159  func (m *Manager) set(key string, tr *authenticationv1.TokenRequest) {
   160  	m.cacheMutex.Lock()
   161  	defer m.cacheMutex.Unlock()
   162  	m.cache[key] = tr
   163  }
   164  
   165  func (m *Manager) expired(t *authenticationv1.TokenRequest) bool {
   166  	return m.clock.Now().After(t.Status.ExpirationTimestamp.Time)
   167  }
   168  
   169  // requiresRefresh returns true if the token is older than 80% of its total
   170  // ttl, or if the token is older than 24 hours.
   171  func (m *Manager) requiresRefresh(tr *authenticationv1.TokenRequest) bool {
   172  	if tr.Spec.ExpirationSeconds == nil {
   173  		cpy := tr.DeepCopy()
   174  		cpy.Status.Token = ""
   175  		klog.ErrorS(nil, "Expiration seconds was nil for token request", "tokenRequest", cpy)
   176  		return false
   177  	}
   178  	now := m.clock.Now()
   179  	exp := tr.Status.ExpirationTimestamp.Time
   180  	iat := exp.Add(-1 * time.Duration(*tr.Spec.ExpirationSeconds) * time.Second)
   181  
   182  	jitter := time.Duration(rand.Float64()*maxJitter.Seconds()) * time.Second
   183  	if now.After(iat.Add(maxTTL - jitter)) {
   184  		return true
   185  	}
   186  	// Require a refresh if within 20% of the TTL plus a jitter from the expiration time.
   187  	if now.After(exp.Add(-1*time.Duration((*tr.Spec.ExpirationSeconds*20)/100)*time.Second - jitter)) {
   188  		return true
   189  	}
   190  	return false
   191  }
   192  
   193  // keys should be nonconfidential and safe to log
   194  func keyFunc(name, namespace string, tr *authenticationv1.TokenRequest) string {
   195  	var exp int64
   196  	if tr.Spec.ExpirationSeconds != nil {
   197  		exp = *tr.Spec.ExpirationSeconds
   198  	}
   199  
   200  	var ref authenticationv1.BoundObjectReference
   201  	if tr.Spec.BoundObjectRef != nil {
   202  		ref = *tr.Spec.BoundObjectRef
   203  	}
   204  
   205  	return fmt.Sprintf("%q/%q/%#v/%#v/%#v", name, namespace, tr.Spec.Audiences, exp, ref)
   206  }
   207  

View as plain text