...

Source file src/k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/limitenforcer.go

Documentation: k8s.io/kubernetes/plugin/pkg/admission/eventratelimit

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package eventratelimit
    18  
    19  import (
    20  	"fmt"
    21  	"strings"
    22  
    23  	"k8s.io/apiserver/pkg/admission"
    24  	"k8s.io/client-go/util/flowcontrol"
    25  	api "k8s.io/kubernetes/pkg/apis/core"
    26  	eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
    27  	"k8s.io/utils/lru"
    28  )
    29  
    30  const (
    31  	// cache size to use if the user did not specify a cache size
    32  	defaultCacheSize = 4096
    33  )
    34  
    35  // limitEnforcer enforces a single type of event rate limit, such as server, namespace, or source+object
    36  type limitEnforcer struct {
    37  	// type of this limit
    38  	limitType eventratelimitapi.LimitType
    39  	// cache for holding the rate limiters
    40  	cache cache
    41  	// a keyFunc which is responsible for computing a single key based on input
    42  	keyFunc func(admission.Attributes) string
    43  }
    44  
    45  func newLimitEnforcer(config eventratelimitapi.Limit, clock flowcontrol.Clock) (*limitEnforcer, error) {
    46  	rateLimiterFactory := func() flowcontrol.RateLimiter {
    47  		return flowcontrol.NewTokenBucketRateLimiterWithClock(float32(config.QPS), int(config.Burst), clock)
    48  	}
    49  
    50  	if config.Type == eventratelimitapi.ServerLimitType {
    51  		return &limitEnforcer{
    52  			limitType: config.Type,
    53  			cache: &singleCache{
    54  				rateLimiter: rateLimiterFactory(),
    55  			},
    56  			keyFunc: getServerKey,
    57  		}, nil
    58  	}
    59  
    60  	cacheSize := int(config.CacheSize)
    61  	if cacheSize == 0 {
    62  		cacheSize = defaultCacheSize
    63  	}
    64  	underlyingCache := lru.New(cacheSize)
    65  	cache := &lruCache{
    66  		rateLimiterFactory: rateLimiterFactory,
    67  		cache:              underlyingCache,
    68  	}
    69  
    70  	var keyFunc func(admission.Attributes) string
    71  	switch t := config.Type; t {
    72  	case eventratelimitapi.NamespaceLimitType:
    73  		keyFunc = getNamespaceKey
    74  	case eventratelimitapi.UserLimitType:
    75  		keyFunc = getUserKey
    76  	case eventratelimitapi.SourceAndObjectLimitType:
    77  		keyFunc = getSourceAndObjectKey
    78  	default:
    79  		return nil, fmt.Errorf("unknown event rate limit type: %v", t)
    80  	}
    81  
    82  	return &limitEnforcer{
    83  		limitType: config.Type,
    84  		cache:     cache,
    85  		keyFunc:   keyFunc,
    86  	}, nil
    87  }
    88  
    89  func (enforcer *limitEnforcer) accept(attr admission.Attributes) error {
    90  	key := enforcer.keyFunc(attr)
    91  	rateLimiter := enforcer.cache.get(key)
    92  
    93  	// ensure we have available rate
    94  	allow := rateLimiter.TryAccept()
    95  
    96  	if !allow {
    97  		return fmt.Errorf("limit reached on type %v for key %v", enforcer.limitType, key)
    98  	}
    99  
   100  	return nil
   101  }
   102  
   103  func getServerKey(attr admission.Attributes) string {
   104  	return ""
   105  }
   106  
   107  // getNamespaceKey returns a cache key that is based on the namespace of the event request
   108  func getNamespaceKey(attr admission.Attributes) string {
   109  	return attr.GetNamespace()
   110  }
   111  
   112  // getUserKey returns a cache key that is based on the user of the event request
   113  func getUserKey(attr admission.Attributes) string {
   114  	userInfo := attr.GetUserInfo()
   115  	if userInfo == nil {
   116  		return ""
   117  	}
   118  	return userInfo.GetName()
   119  }
   120  
   121  // getSourceAndObjectKey returns a cache key that is based on the source+object of the event
   122  func getSourceAndObjectKey(attr admission.Attributes) string {
   123  	object := attr.GetObject()
   124  	if object == nil {
   125  		return ""
   126  	}
   127  	event, ok := object.(*api.Event)
   128  	if !ok {
   129  		return ""
   130  	}
   131  	return strings.Join([]string{
   132  		event.Source.Component,
   133  		event.Source.Host,
   134  		event.InvolvedObject.Kind,
   135  		event.InvolvedObject.Namespace,
   136  		event.InvolvedObject.Name,
   137  		string(event.InvolvedObject.UID),
   138  		event.InvolvedObject.APIVersion,
   139  	}, "")
   140  }
   141  

View as plain text