...

Source file src/k8s.io/kubernetes/pkg/controller/podautoscaler/horizontal.go

Documentation: k8s.io/kubernetes/pkg/controller/podautoscaler

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package podautoscaler
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"math"
    24  	"sync"
    25  	"time"
    26  
    27  	autoscalingv1 "k8s.io/api/autoscaling/v1"
    28  	autoscalingv2 "k8s.io/api/autoscaling/v2"
    29  	v1 "k8s.io/api/core/v1"
    30  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    31  	k8serrors "k8s.io/apimachinery/pkg/api/errors"
    32  	apimeta "k8s.io/apimachinery/pkg/api/meta"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/labels"
    36  	"k8s.io/apimachinery/pkg/runtime"
    37  	"k8s.io/apimachinery/pkg/runtime/schema"
    38  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	autoscalinginformers "k8s.io/client-go/informers/autoscaling/v2"
    41  	coreinformers "k8s.io/client-go/informers/core/v1"
    42  	"k8s.io/client-go/kubernetes/scheme"
    43  	autoscalingclient "k8s.io/client-go/kubernetes/typed/autoscaling/v2"
    44  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    45  	autoscalinglisters "k8s.io/client-go/listers/autoscaling/v2"
    46  	corelisters "k8s.io/client-go/listers/core/v1"
    47  	scaleclient "k8s.io/client-go/scale"
    48  	"k8s.io/client-go/tools/cache"
    49  	"k8s.io/client-go/tools/record"
    50  	"k8s.io/client-go/util/workqueue"
    51  	"k8s.io/klog/v2"
    52  	"k8s.io/kubernetes/pkg/controller"
    53  	metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
    54  	"k8s.io/kubernetes/pkg/controller/podautoscaler/monitor"
    55  	"k8s.io/kubernetes/pkg/controller/util/selectors"
    56  )
    57  
    58  var (
    59  	scaleUpLimitFactor  = 2.0
    60  	scaleUpLimitMinimum = 4.0
    61  )
    62  
    63  var (
    64  	// errSpec is used to determine if the error comes from the spec of HPA object in reconcileAutoscaler.
    65  	// All such errors should have this error as a root error so that the upstream function can distinguish spec errors from internal errors.
    66  	// e.g., fmt.Errorf("invalid spec%w", errSpec)
    67  	errSpec error = errors.New("")
    68  )
    69  
    70  type timestampedRecommendation struct {
    71  	recommendation int32
    72  	timestamp      time.Time
    73  }
    74  
    75  type timestampedScaleEvent struct {
    76  	replicaChange int32 // absolute value, non-negative
    77  	timestamp     time.Time
    78  	outdated      bool
    79  }
    80  
    81  // HorizontalController is responsible for the synchronizing HPA objects stored
    82  // in the system with the actual deployments/replication controllers they
    83  // control.
    84  type HorizontalController struct {
    85  	scaleNamespacer scaleclient.ScalesGetter
    86  	hpaNamespacer   autoscalingclient.HorizontalPodAutoscalersGetter
    87  	mapper          apimeta.RESTMapper
    88  
    89  	replicaCalc   *ReplicaCalculator
    90  	eventRecorder record.EventRecorder
    91  
    92  	downscaleStabilisationWindow time.Duration
    93  
    94  	monitor monitor.Monitor
    95  
    96  	// hpaLister is able to list/get HPAs from the shared cache from the informer passed in to
    97  	// NewHorizontalController.
    98  	hpaLister       autoscalinglisters.HorizontalPodAutoscalerLister
    99  	hpaListerSynced cache.InformerSynced
   100  
   101  	// podLister is able to list/get Pods from the shared cache from the informer passed in to
   102  	// NewHorizontalController.
   103  	podLister       corelisters.PodLister
   104  	podListerSynced cache.InformerSynced
   105  
   106  	// Controllers that need to be synced
   107  	queue workqueue.RateLimitingInterface
   108  
   109  	// Latest unstabilized recommendations for each autoscaler.
   110  	recommendations     map[string][]timestampedRecommendation
   111  	recommendationsLock sync.Mutex
   112  
   113  	// Latest autoscaler events
   114  	scaleUpEvents       map[string][]timestampedScaleEvent
   115  	scaleUpEventsLock   sync.RWMutex
   116  	scaleDownEvents     map[string][]timestampedScaleEvent
   117  	scaleDownEventsLock sync.RWMutex
   118  
   119  	// Storage of HPAs and their selectors.
   120  	hpaSelectors    *selectors.BiMultimap
   121  	hpaSelectorsMux sync.Mutex
   122  }
   123  
   124  // NewHorizontalController creates a new HorizontalController.
   125  func NewHorizontalController(
   126  	ctx context.Context,
   127  	evtNamespacer v1core.EventsGetter,
   128  	scaleNamespacer scaleclient.ScalesGetter,
   129  	hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
   130  	mapper apimeta.RESTMapper,
   131  	metricsClient metricsclient.MetricsClient,
   132  	hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
   133  	podInformer coreinformers.PodInformer,
   134  	resyncPeriod time.Duration,
   135  	downscaleStabilisationWindow time.Duration,
   136  	tolerance float64,
   137  	cpuInitializationPeriod,
   138  	delayOfInitialReadinessStatus time.Duration,
   139  ) *HorizontalController {
   140  	broadcaster := record.NewBroadcaster(record.WithContext(ctx))
   141  	broadcaster.StartStructuredLogging(3)
   142  	broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
   143  	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})
   144  
   145  	hpaController := &HorizontalController{
   146  		eventRecorder:                recorder,
   147  		scaleNamespacer:              scaleNamespacer,
   148  		hpaNamespacer:                hpaNamespacer,
   149  		downscaleStabilisationWindow: downscaleStabilisationWindow,
   150  		monitor:                      monitor.New(),
   151  		queue:                        workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
   152  		mapper:                       mapper,
   153  		recommendations:              map[string][]timestampedRecommendation{},
   154  		recommendationsLock:          sync.Mutex{},
   155  		scaleUpEvents:                map[string][]timestampedScaleEvent{},
   156  		scaleUpEventsLock:            sync.RWMutex{},
   157  		scaleDownEvents:              map[string][]timestampedScaleEvent{},
   158  		scaleDownEventsLock:          sync.RWMutex{},
   159  		hpaSelectors:                 selectors.NewBiMultimap(),
   160  	}
   161  
   162  	hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
   163  		cache.ResourceEventHandlerFuncs{
   164  			AddFunc:    hpaController.enqueueHPA,
   165  			UpdateFunc: hpaController.updateHPA,
   166  			DeleteFunc: hpaController.deleteHPA,
   167  		},
   168  		resyncPeriod,
   169  	)
   170  	hpaController.hpaLister = hpaInformer.Lister()
   171  	hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced
   172  
   173  	hpaController.podLister = podInformer.Lister()
   174  	hpaController.podListerSynced = podInformer.Informer().HasSynced
   175  
   176  	replicaCalc := NewReplicaCalculator(
   177  		metricsClient,
   178  		hpaController.podLister,
   179  		tolerance,
   180  		cpuInitializationPeriod,
   181  		delayOfInitialReadinessStatus,
   182  	)
   183  	hpaController.replicaCalc = replicaCalc
   184  
   185  	monitor.Register()
   186  
   187  	return hpaController
   188  }
   189  
   190  // Run begins watching and syncing.
   191  func (a *HorizontalController) Run(ctx context.Context, workers int) {
   192  	defer utilruntime.HandleCrash()
   193  	defer a.queue.ShutDown()
   194  
   195  	logger := klog.FromContext(ctx)
   196  	logger.Info("Starting HPA controller")
   197  	defer logger.Info("Shutting down HPA controller")
   198  
   199  	if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
   200  		return
   201  	}
   202  
   203  	for i := 0; i < workers; i++ {
   204  		go wait.UntilWithContext(ctx, a.worker, time.Second)
   205  	}
   206  
   207  	<-ctx.Done()
   208  }
   209  
   210  // obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
   211  func (a *HorizontalController) updateHPA(old, cur interface{}) {
   212  	a.enqueueHPA(cur)
   213  }
   214  
   215  // obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item.
   216  func (a *HorizontalController) enqueueHPA(obj interface{}) {
   217  	key, err := controller.KeyFunc(obj)
   218  	if err != nil {
   219  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
   220  		return
   221  	}
   222  
   223  	// Requests are always added to queue with resyncPeriod delay.  If there's already
   224  	// request for the HPA in the queue then a new request is always dropped. Requests spend resync
   225  	// interval in queue so HPAs are processed every resync interval.
   226  	a.queue.AddRateLimited(key)
   227  
   228  	// Register HPA in the hpaSelectors map if it's not present yet. Attaching the Nothing selector
   229  	// that does not select objects. The actual selector is going to be updated
   230  	// when it's available during the autoscaler reconciliation.
   231  	a.hpaSelectorsMux.Lock()
   232  	defer a.hpaSelectorsMux.Unlock()
   233  	if hpaKey := selectors.Parse(key); !a.hpaSelectors.SelectorExists(hpaKey) {
   234  		a.hpaSelectors.PutSelector(hpaKey, labels.Nothing())
   235  	}
   236  }
   237  
   238  func (a *HorizontalController) deleteHPA(obj interface{}) {
   239  	key, err := controller.KeyFunc(obj)
   240  	if err != nil {
   241  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
   242  		return
   243  	}
   244  
   245  	// TODO: could we leak if we fail to get the key?
   246  	a.queue.Forget(key)
   247  
   248  	// Remove HPA and attached selector.
   249  	a.hpaSelectorsMux.Lock()
   250  	defer a.hpaSelectorsMux.Unlock()
   251  	a.hpaSelectors.DeleteSelector(selectors.Parse(key))
   252  }
   253  
   254  func (a *HorizontalController) worker(ctx context.Context) {
   255  	for a.processNextWorkItem(ctx) {
   256  	}
   257  	logger := klog.FromContext(ctx)
   258  	logger.Info("Horizontal Pod Autoscaler controller worker shutting down")
   259  }
   260  
   261  func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
   262  	key, quit := a.queue.Get()
   263  	if quit {
   264  		return false
   265  	}
   266  	defer a.queue.Done(key)
   267  
   268  	deleted, err := a.reconcileKey(ctx, key.(string))
   269  	if err != nil {
   270  		utilruntime.HandleError(err)
   271  	}
   272  	// Add request processing HPA to queue with resyncPeriod delay.
   273  	// Requests are always added to queue with resyncPeriod delay. If there's already request
   274  	// for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
   275  	// in queue so HPAs are processed every resyncPeriod.
   276  	// Request is added here just in case last resync didn't insert request into the queue. This
   277  	// happens quite often because there is race condition between adding request after resyncPeriod
   278  	// and removing them from queue. Request can be added by resync before previous request is
   279  	// removed from queue. If we didn't add request here then in this case one request would be dropped
   280  	// and HPA would process after 2 x resyncPeriod.
   281  	if !deleted {
   282  		a.queue.AddRateLimited(key)
   283  	}
   284  
   285  	return true
   286  }
   287  
   288  // computeReplicasForMetrics computes the desired number of replicas for the metric specifications listed in the HPA,
   289  // returning the maximum of the computed replica counts, a description of the associated metric, and the statuses of
   290  // all metrics computed.
   291  // It may return both valid metricDesiredReplicas and an error,
   292  // when some metrics still work and HPA should perform scaling based on them.
   293  // If HPA cannot do anything due to error, it returns -1 in metricDesiredReplicas as a failure signal.
   294  func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
   295  	metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
   296  
   297  	selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
   298  	if err != nil {
   299  		return -1, "", nil, time.Time{}, err
   300  	}
   301  
   302  	specReplicas := scale.Spec.Replicas
   303  	statusReplicas := scale.Status.Replicas
   304  	statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
   305  
   306  	invalidMetricsCount := 0
   307  	var invalidMetricError error
   308  	var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition
   309  
   310  	for i, metricSpec := range metricSpecs {
   311  		replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
   312  
   313  		if err != nil {
   314  			if invalidMetricsCount <= 0 {
   315  				invalidMetricCondition = condition
   316  				invalidMetricError = err
   317  			}
   318  			invalidMetricsCount++
   319  			continue
   320  		}
   321  		if replicas == 0 || replicaCountProposal > replicas {
   322  			timestamp = timestampProposal
   323  			replicas = replicaCountProposal
   324  			metric = metricNameProposal
   325  		}
   326  	}
   327  
   328  	if invalidMetricError != nil {
   329  		invalidMetricError = fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
   330  	}
   331  
   332  	// If all metrics are invalid or some are invalid and we would scale down,
   333  	// return an error and set the condition of the hpa based on the first invalid metric.
   334  	// Otherwise set the condition as scaling active as we're going to scale
   335  	if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
   336  		setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
   337  		return -1, "", statuses, time.Time{}, invalidMetricError
   338  	}
   339  	setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
   340  
   341  	return replicas, metric, statuses, timestamp, invalidMetricError
   342  }
   343  
   344  // hpasControllingPodsUnderSelector returns a list of keys of all HPAs that control a given list of pods.
   345  func (a *HorizontalController) hpasControllingPodsUnderSelector(pods []*v1.Pod) []selectors.Key {
   346  	a.hpaSelectorsMux.Lock()
   347  	defer a.hpaSelectorsMux.Unlock()
   348  
   349  	hpas := map[selectors.Key]struct{}{}
   350  	for _, p := range pods {
   351  		podKey := selectors.Key{Name: p.Name, Namespace: p.Namespace}
   352  		a.hpaSelectors.Put(podKey, p.Labels)
   353  
   354  		selectingHpas, ok := a.hpaSelectors.ReverseSelect(podKey)
   355  		if !ok {
   356  			continue
   357  		}
   358  		for _, hpa := range selectingHpas {
   359  			hpas[hpa] = struct{}{}
   360  		}
   361  	}
   362  	// Clean up all added pods.
   363  	a.hpaSelectors.KeepOnly([]selectors.Key{})
   364  
   365  	hpaList := []selectors.Key{}
   366  	for hpa := range hpas {
   367  		hpaList = append(hpaList, hpa)
   368  	}
   369  	return hpaList
   370  }
   371  
   372  // validateAndParseSelector verifies that:
   373  // - selector is not empty;
   374  // - selector format is valid;
   375  // - all pods by current selector are controlled by only one HPA.
   376  // Returns an error if the check has failed or the parsed selector if succeeded.
   377  // In case of an error the ScalingActive is set to false with the corresponding reason.
   378  func (a *HorizontalController) validateAndParseSelector(hpa *autoscalingv2.HorizontalPodAutoscaler, selector string) (labels.Selector, error) {
   379  	if selector == "" {
   380  		errMsg := "selector is required"
   381  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
   382  		setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
   383  		return nil, fmt.Errorf(errMsg)
   384  	}
   385  
   386  	parsedSelector, err := labels.Parse(selector)
   387  	if err != nil {
   388  		errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
   389  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
   390  		setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
   391  		return nil, fmt.Errorf(errMsg)
   392  	}
   393  
   394  	hpaKey := selectors.Key{Name: hpa.Name, Namespace: hpa.Namespace}
   395  	a.hpaSelectorsMux.Lock()
   396  	if a.hpaSelectors.SelectorExists(hpaKey) {
   397  		// Update HPA selector only if the HPA was registered in enqueueHPA.
   398  		a.hpaSelectors.PutSelector(hpaKey, parsedSelector)
   399  	}
   400  	a.hpaSelectorsMux.Unlock()
   401  
   402  	pods, err := a.podLister.Pods(hpa.Namespace).List(parsedSelector)
   403  	if err != nil {
   404  		return nil, err
   405  	}
   406  
   407  	selectingHpas := a.hpasControllingPodsUnderSelector(pods)
   408  	if len(selectingHpas) > 1 {
   409  		errMsg := fmt.Sprintf("pods by selector %v are controlled by multiple HPAs: %v", selector, selectingHpas)
   410  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "AmbiguousSelector", errMsg)
   411  		setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "AmbiguousSelector", errMsg)
   412  		return nil, fmt.Errorf(errMsg)
   413  	}
   414  
   415  	return parsedSelector, nil
   416  }
   417  
   418  // Computes the desired number of replicas for a specific hpa and metric specification,
   419  // returning the metric status and a proposed condition to be set on the HPA object.
   420  func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
   421  	specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
   422  	timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   423  	// actionLabel is used to report which actions this reconciliation has taken.
   424  	start := time.Now()
   425  	defer func() {
   426  		actionLabel := monitor.ActionLabelNone
   427  		switch {
   428  		case replicaCountProposal > hpa.Status.CurrentReplicas:
   429  			actionLabel = monitor.ActionLabelScaleUp
   430  		case replicaCountProposal < hpa.Status.CurrentReplicas:
   431  			actionLabel = monitor.ActionLabelScaleDown
   432  		}
   433  
   434  		errorLabel := monitor.ErrorLabelNone
   435  		if err != nil {
   436  			// In case of error, set "internal" as default.
   437  			errorLabel = monitor.ErrorLabelInternal
   438  			actionLabel = monitor.ActionLabelNone
   439  		}
   440  		if errors.Is(err, errSpec) {
   441  			errorLabel = monitor.ErrorLabelSpec
   442  		}
   443  
   444  		a.monitor.ObserveMetricComputationResult(actionLabel, errorLabel, time.Since(start), spec.Type)
   445  	}()
   446  
   447  	switch spec.Type {
   448  	case autoscalingv2.ObjectMetricSourceType:
   449  		metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
   450  		if err != nil {
   451  			condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
   452  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
   453  		}
   454  		replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
   455  		if err != nil {
   456  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
   457  		}
   458  	case autoscalingv2.PodsMetricSourceType:
   459  		metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
   460  		if err != nil {
   461  			condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
   462  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
   463  		}
   464  		replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
   465  		if err != nil {
   466  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
   467  		}
   468  	case autoscalingv2.ResourceMetricSourceType:
   469  		replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
   470  		if err != nil {
   471  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
   472  		}
   473  	case autoscalingv2.ContainerResourceMetricSourceType:
   474  		replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
   475  		if err != nil {
   476  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
   477  		}
   478  	case autoscalingv2.ExternalMetricSourceType:
   479  		replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
   480  		if err != nil {
   481  			return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
   482  		}
   483  	default:
   484  		// It shouldn't reach here as invalid metric source type is filtered out in the api-server's validation.
   485  		err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
   486  		condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
   487  		return 0, "", time.Time{}, condition, err
   488  	}
   489  	return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   490  }
   491  
   492  func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (deleted bool, err error) {
   493  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   494  	if err != nil {
   495  		return true, err
   496  	}
   497  
   498  	logger := klog.FromContext(ctx)
   499  
   500  	hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
   501  	if k8serrors.IsNotFound(err) {
   502  		logger.Info("Horizontal Pod Autoscaler has been deleted", "HPA", klog.KRef(namespace, name))
   503  
   504  		a.recommendationsLock.Lock()
   505  		delete(a.recommendations, key)
   506  		a.recommendationsLock.Unlock()
   507  
   508  		a.scaleUpEventsLock.Lock()
   509  		delete(a.scaleUpEvents, key)
   510  		a.scaleUpEventsLock.Unlock()
   511  
   512  		a.scaleDownEventsLock.Lock()
   513  		delete(a.scaleDownEvents, key)
   514  		a.scaleDownEventsLock.Unlock()
   515  
   516  		return true, nil
   517  	}
   518  	if err != nil {
   519  		return false, err
   520  	}
   521  
   522  	return false, a.reconcileAutoscaler(ctx, hpa, key)
   523  }
   524  
   525  // computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.
   526  func (a *HorizontalController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   527  	if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType && metricSpec.Object.Target.Value != nil {
   528  		replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, selector, metricSelector)
   529  		if err != nil {
   530  			condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
   531  			return 0, timestampProposal, "", condition, err
   532  		}
   533  		*status = autoscalingv2.MetricStatus{
   534  			Type: autoscalingv2.ObjectMetricSourceType,
   535  			Object: &autoscalingv2.ObjectMetricStatus{
   536  				DescribedObject: metricSpec.Object.DescribedObject,
   537  				Metric: autoscalingv2.MetricIdentifier{
   538  					Name:     metricSpec.Object.Metric.Name,
   539  					Selector: metricSpec.Object.Metric.Selector,
   540  				},
   541  				Current: autoscalingv2.MetricValueStatus{
   542  					Value: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
   543  				},
   544  			},
   545  		}
   546  		return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   547  	} else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType && metricSpec.Object.Target.AverageValue != nil {
   548  		replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector)
   549  		if err != nil {
   550  			condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
   551  			return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err)
   552  		}
   553  		*status = autoscalingv2.MetricStatus{
   554  			Type: autoscalingv2.ObjectMetricSourceType,
   555  			Object: &autoscalingv2.ObjectMetricStatus{
   556  				Metric: autoscalingv2.MetricIdentifier{
   557  					Name:     metricSpec.Object.Metric.Name,
   558  					Selector: metricSpec.Object.Metric.Selector,
   559  				},
   560  				Current: autoscalingv2.MetricValueStatus{
   561  					AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
   562  				},
   563  			},
   564  		}
   565  		return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   566  	}
   567  	errMsg := "invalid object metric source: neither a value target nor an average value target was set"
   568  	err = fmt.Errorf(errMsg)
   569  	condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
   570  	return 0, time.Time{}, "", condition, err
   571  }
   572  
   573  // computeStatusForPodsMetric computes the desired number of replicas for the specified metric of type PodsMetricSourceType.
   574  func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   575  	replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)
   576  	if err != nil {
   577  		condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
   578  		return 0, timestampProposal, "", condition, err
   579  	}
   580  	*status = autoscalingv2.MetricStatus{
   581  		Type: autoscalingv2.PodsMetricSourceType,
   582  		Pods: &autoscalingv2.PodsMetricStatus{
   583  			Metric: autoscalingv2.MetricIdentifier{
   584  				Name:     metricSpec.Pods.Metric.Name,
   585  				Selector: metricSpec.Pods.Metric.Selector,
   586  			},
   587  			Current: autoscalingv2.MetricValueStatus{
   588  				AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
   589  			},
   590  		},
   591  	}
   592  
   593  	return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   594  }
   595  
   596  func (a *HorizontalController) computeStatusForResourceMetricGeneric(ctx context.Context, currentReplicas int32, target autoscalingv2.MetricTarget,
   597  	resourceName v1.ResourceName, namespace string, container string, selector labels.Selector, sourceType autoscalingv2.MetricSourceType) (replicaCountProposal int32,
   598  	metricStatus *autoscalingv2.MetricValueStatus, timestampProposal time.Time, metricNameProposal string,
   599  	condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   600  	if target.AverageValue != nil {
   601  		var rawProposal int64
   602  		replicaCountProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetRawResourceReplicas(ctx, currentReplicas, target.AverageValue.MilliValue(), resourceName, namespace, selector, container)
   603  		if err != nil {
   604  			return 0, nil, time.Time{}, "", condition, fmt.Errorf("failed to get %s usage: %v", resourceName, err)
   605  		}
   606  		metricNameProposal = fmt.Sprintf("%s resource", resourceName.String())
   607  		status := autoscalingv2.MetricValueStatus{
   608  			AverageValue: resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
   609  		}
   610  		return replicaCountProposal, &status, timestampProposal, metricNameProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   611  	}
   612  
   613  	if target.AverageUtilization == nil {
   614  		errMsg := "invalid resource metric source: neither an average utilization target nor an average value (usage) target was set"
   615  		return 0, nil, time.Time{}, "", condition, fmt.Errorf(errMsg)
   616  	}
   617  
   618  	targetUtilization := *target.AverageUtilization
   619  	replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetResourceReplicas(ctx, currentReplicas, targetUtilization, resourceName, namespace, selector, container)
   620  	if err != nil {
   621  		return 0, nil, time.Time{}, "", condition, fmt.Errorf("failed to get %s utilization: %v", resourceName, err)
   622  	}
   623  
   624  	metricNameProposal = fmt.Sprintf("%s resource utilization (percentage of request)", resourceName)
   625  	if sourceType == autoscalingv2.ContainerResourceMetricSourceType {
   626  		metricNameProposal = fmt.Sprintf("%s container resource utilization (percentage of request)", resourceName)
   627  	}
   628  	status := autoscalingv2.MetricValueStatus{
   629  		AverageUtilization: &percentageProposal,
   630  		AverageValue:       resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
   631  	}
   632  	return replicaCountProposal, &status, timestampProposal, metricNameProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   633  }
   634  
   635  // computeStatusForResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType.
   636  func (a *HorizontalController) computeStatusForResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
   637  	selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time,
   638  	metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   639  	replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.Resource.Target, metricSpec.Resource.Name, hpa.Namespace, "", selector, autoscalingv2.ResourceMetricSourceType)
   640  	if err != nil {
   641  		condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetResourceMetric", err)
   642  		return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
   643  	}
   644  	*status = autoscalingv2.MetricStatus{
   645  		Type: autoscalingv2.ResourceMetricSourceType,
   646  		Resource: &autoscalingv2.ResourceMetricStatus{
   647  			Name:    metricSpec.Resource.Name,
   648  			Current: *metricValueStatus,
   649  		},
   650  	}
   651  	return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil
   652  }
   653  
   654  // computeStatusForContainerResourceMetric computes the desired number of replicas for the specified metric of type ResourceMetricSourceType.
   655  func (a *HorizontalController) computeStatusForContainerResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
   656  	selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time,
   657  	metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   658  	replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.ContainerResource.Target, metricSpec.ContainerResource.Name, hpa.Namespace, metricSpec.ContainerResource.Container, selector, autoscalingv2.ContainerResourceMetricSourceType)
   659  	if err != nil {
   660  		condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetContainerResourceMetric", err)
   661  		return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
   662  	}
   663  	*status = autoscalingv2.MetricStatus{
   664  		Type: autoscalingv2.ContainerResourceMetricSourceType,
   665  		ContainerResource: &autoscalingv2.ContainerResourceMetricStatus{
   666  			Name:      metricSpec.ContainerResource.Name,
   667  			Container: metricSpec.ContainerResource.Container,
   668  			Current:   *metricValueStatus,
   669  		},
   670  	}
   671  	return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil
   672  }
   673  
   674  // computeStatusForExternalMetric computes the desired number of replicas for the specified metric of type ExternalMetricSourceType.
   675  func (a *HorizontalController) computeStatusForExternalMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
   676  	if metricSpec.External.Target.AverageValue != nil {
   677  		replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetExternalPerPodMetricReplicas(statusReplicas, metricSpec.External.Target.AverageValue.MilliValue(), metricSpec.External.Metric.Name, hpa.Namespace, metricSpec.External.Metric.Selector)
   678  		if err != nil {
   679  			condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
   680  			return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s external metric: %v", metricSpec.External.Metric.Name, err)
   681  		}
   682  		*status = autoscalingv2.MetricStatus{
   683  			Type: autoscalingv2.ExternalMetricSourceType,
   684  			External: &autoscalingv2.ExternalMetricStatus{
   685  				Metric: autoscalingv2.MetricIdentifier{
   686  					Name:     metricSpec.External.Metric.Name,
   687  					Selector: metricSpec.External.Metric.Selector,
   688  				},
   689  				Current: autoscalingv2.MetricValueStatus{
   690  					AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
   691  				},
   692  			},
   693  		}
   694  		return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.External.Metric.Name, metricSpec.External.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   695  	}
   696  	if metricSpec.External.Target.Value != nil {
   697  		replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetExternalMetricReplicas(specReplicas, metricSpec.External.Target.Value.MilliValue(), metricSpec.External.Metric.Name, hpa.Namespace, metricSpec.External.Metric.Selector, selector)
   698  		if err != nil {
   699  			condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
   700  			return 0, time.Time{}, "", condition, fmt.Errorf("failed to get external metric %s: %v", metricSpec.External.Metric.Name, err)
   701  		}
   702  		*status = autoscalingv2.MetricStatus{
   703  			Type: autoscalingv2.ExternalMetricSourceType,
   704  			External: &autoscalingv2.ExternalMetricStatus{
   705  				Metric: autoscalingv2.MetricIdentifier{
   706  					Name:     metricSpec.External.Metric.Name,
   707  					Selector: metricSpec.External.Metric.Selector,
   708  				},
   709  				Current: autoscalingv2.MetricValueStatus{
   710  					Value: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
   711  				},
   712  			},
   713  		}
   714  		return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.External.Metric.Name, metricSpec.External.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
   715  	}
   716  	errMsg := "invalid external metric source: neither a value target nor an average value target was set"
   717  	err = fmt.Errorf(errMsg)
   718  	condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
   719  	return 0, time.Time{}, "", condition, fmt.Errorf(errMsg)
   720  }
   721  
   722  func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32, key string) {
   723  	a.recommendationsLock.Lock()
   724  	defer a.recommendationsLock.Unlock()
   725  	if a.recommendations[key] == nil {
   726  		a.recommendations[key] = []timestampedRecommendation{{currentReplicas, time.Now()}}
   727  	}
   728  }
   729  
   730  func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) (retErr error) {
   731  	// actionLabel is used to report which actions this reconciliation has taken.
   732  	actionLabel := monitor.ActionLabelNone
   733  	start := time.Now()
   734  	defer func() {
   735  		errorLabel := monitor.ErrorLabelNone
   736  		if retErr != nil {
   737  			// In case of error, set "internal" as default.
   738  			errorLabel = monitor.ErrorLabelInternal
   739  		}
   740  		if errors.Is(retErr, errSpec) {
   741  			errorLabel = monitor.ErrorLabelSpec
   742  		}
   743  
   744  		a.monitor.ObserveReconciliationResult(actionLabel, errorLabel, time.Since(start))
   745  	}()
   746  
   747  	// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
   748  	hpa := hpaShared.DeepCopy()
   749  	hpaStatusOriginal := hpa.Status.DeepCopy()
   750  
   751  	reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
   752  
   753  	targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
   754  	if err != nil {
   755  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
   756  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
   757  		if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
   758  			utilruntime.HandleError(err)
   759  		}
   760  		return fmt.Errorf("invalid API version in scale target reference: %v%w", err, errSpec)
   761  	}
   762  
   763  	targetGK := schema.GroupKind{
   764  		Group: targetGV.Group,
   765  		Kind:  hpa.Spec.ScaleTargetRef.Kind,
   766  	}
   767  
   768  	mappings, err := a.mapper.RESTMappings(targetGK)
   769  	if err != nil {
   770  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
   771  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
   772  		if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
   773  			utilruntime.HandleError(err)
   774  		}
   775  		return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
   776  	}
   777  
   778  	scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
   779  	if err != nil {
   780  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
   781  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
   782  		if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
   783  			utilruntime.HandleError(err)
   784  		}
   785  		return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
   786  	}
   787  	setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
   788  	currentReplicas := scale.Spec.Replicas
   789  	a.recordInitialRecommendation(currentReplicas, key)
   790  
   791  	var (
   792  		metricStatuses        []autoscalingv2.MetricStatus
   793  		metricDesiredReplicas int32
   794  		metricName            string
   795  	)
   796  
   797  	desiredReplicas := int32(0)
   798  	rescaleReason := ""
   799  
   800  	var minReplicas int32
   801  
   802  	if hpa.Spec.MinReplicas != nil {
   803  		minReplicas = *hpa.Spec.MinReplicas
   804  	} else {
   805  		// Default value
   806  		minReplicas = 1
   807  	}
   808  
   809  	rescale := true
   810  	logger := klog.FromContext(ctx)
   811  
   812  	if currentReplicas == 0 && minReplicas != 0 {
   813  		// Autoscaling is disabled for this resource
   814  		desiredReplicas = 0
   815  		rescale = false
   816  		setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
   817  	} else if currentReplicas > hpa.Spec.MaxReplicas {
   818  		rescaleReason = "Current number of replicas above Spec.MaxReplicas"
   819  		desiredReplicas = hpa.Spec.MaxReplicas
   820  	} else if currentReplicas < minReplicas {
   821  		rescaleReason = "Current number of replicas below Spec.MinReplicas"
   822  		desiredReplicas = minReplicas
   823  	} else {
   824  		var metricTimestamp time.Time
   825  		metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
   826  		// computeReplicasForMetrics may return both non-zero metricDesiredReplicas and an error.
   827  		// That means some metrics still work and HPA should perform scaling based on them.
   828  		if err != nil && metricDesiredReplicas == -1 {
   829  			a.setCurrentReplicasAndMetricsInStatus(hpa, currentReplicas, metricStatuses)
   830  			if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
   831  				utilruntime.HandleError(err)
   832  			}
   833  			a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
   834  			return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
   835  		}
   836  		if err != nil {
   837  			// We proceed to scaling, but return this error from reconcileAutoscaler() finally.
   838  			retErr = err
   839  		}
   840  
   841  		logger.V(4).Info("Proposing desired replicas",
   842  			"desiredReplicas", metricDesiredReplicas,
   843  			"metric", metricName,
   844  			"timestamp", metricTimestamp,
   845  			"scaleTarget", reference)
   846  
   847  		rescaleMetric := ""
   848  		if metricDesiredReplicas > desiredReplicas {
   849  			desiredReplicas = metricDesiredReplicas
   850  			rescaleMetric = metricName
   851  		}
   852  		if desiredReplicas > currentReplicas {
   853  			rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
   854  		}
   855  		if desiredReplicas < currentReplicas {
   856  			rescaleReason = "All metrics below target"
   857  		}
   858  		if hpa.Spec.Behavior == nil {
   859  			desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
   860  		} else {
   861  			desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
   862  		}
   863  		rescale = desiredReplicas != currentReplicas
   864  	}
   865  
   866  	if rescale {
   867  		scale.Spec.Replicas = desiredReplicas
   868  		_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
   869  		if err != nil {
   870  			a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
   871  			setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
   872  			a.setCurrentReplicasAndMetricsInStatus(hpa, currentReplicas, metricStatuses)
   873  			if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
   874  				utilruntime.HandleError(err)
   875  			}
   876  			return fmt.Errorf("failed to rescale %s: %v", reference, err)
   877  		}
   878  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
   879  		a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
   880  		a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
   881  		logger.Info("Successfully rescaled",
   882  			"HPA", klog.KObj(hpa),
   883  			"currentReplicas", currentReplicas,
   884  			"desiredReplicas", desiredReplicas,
   885  			"reason", rescaleReason)
   886  
   887  		if desiredReplicas > currentReplicas {
   888  			actionLabel = monitor.ActionLabelScaleUp
   889  		} else {
   890  			actionLabel = monitor.ActionLabelScaleDown
   891  		}
   892  	} else {
   893  		logger.V(4).Info("Decided not to scale",
   894  			"scaleTarget", reference,
   895  			"desiredReplicas", desiredReplicas,
   896  			"lastScaleTime", hpa.Status.LastScaleTime)
   897  		desiredReplicas = currentReplicas
   898  	}
   899  
   900  	a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
   901  
   902  	err = a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
   903  	if err != nil {
   904  		// we can overwrite retErr in this case because it's an internal error.
   905  		return err
   906  	}
   907  
   908  	return retErr
   909  }
   910  
   911  // stabilizeRecommendation:
   912  // - replaces old recommendation with the newest recommendation,
   913  // - returns max of recommendations that are not older than downscaleStabilisationWindow.
   914  func (a *HorizontalController) stabilizeRecommendation(key string, prenormalizedDesiredReplicas int32) int32 {
   915  	maxRecommendation := prenormalizedDesiredReplicas
   916  	foundOldSample := false
   917  	oldSampleIndex := 0
   918  	cutoff := time.Now().Add(-a.downscaleStabilisationWindow)
   919  
   920  	a.recommendationsLock.Lock()
   921  	defer a.recommendationsLock.Unlock()
   922  	for i, rec := range a.recommendations[key] {
   923  		if rec.timestamp.Before(cutoff) {
   924  			foundOldSample = true
   925  			oldSampleIndex = i
   926  		} else if rec.recommendation > maxRecommendation {
   927  			maxRecommendation = rec.recommendation
   928  		}
   929  	}
   930  	if foundOldSample {
   931  		a.recommendations[key][oldSampleIndex] = timestampedRecommendation{prenormalizedDesiredReplicas, time.Now()}
   932  	} else {
   933  		a.recommendations[key] = append(a.recommendations[key], timestampedRecommendation{prenormalizedDesiredReplicas, time.Now()})
   934  	}
   935  	return maxRecommendation
   936  }
   937  
   938  // normalizeDesiredReplicas takes the metrics desired replicas value and normalizes it based on the appropriate conditions (i.e. < maxReplicas, >
   939  // minReplicas, etc...)
   940  func (a *HorizontalController) normalizeDesiredReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas int32, prenormalizedDesiredReplicas int32, minReplicas int32) int32 {
   941  	stabilizedRecommendation := a.stabilizeRecommendation(key, prenormalizedDesiredReplicas)
   942  	if stabilizedRecommendation != prenormalizedDesiredReplicas {
   943  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ScaleDownStabilized", "recent recommendations were higher than current one, applying the highest recent recommendation")
   944  	} else {
   945  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
   946  	}
   947  
   948  	desiredReplicas, condition, reason := convertDesiredReplicasWithRules(currentReplicas, stabilizedRecommendation, minReplicas, hpa.Spec.MaxReplicas)
   949  
   950  	if desiredReplicas == stabilizedRecommendation {
   951  		setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, condition, reason)
   952  	} else {
   953  		setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, condition, reason)
   954  	}
   955  
   956  	return desiredReplicas
   957  }
   958  
   959  // NormalizationArg is used to pass all needed information between functions as one structure
   960  type NormalizationArg struct {
   961  	Key               string
   962  	ScaleUpBehavior   *autoscalingv2.HPAScalingRules
   963  	ScaleDownBehavior *autoscalingv2.HPAScalingRules
   964  	MinReplicas       int32
   965  	MaxReplicas       int32
   966  	CurrentReplicas   int32
   967  	DesiredReplicas   int32
   968  }
   969  
   970  // normalizeDesiredReplicasWithBehaviors takes the metrics desired replicas value and normalizes it:
   971  // 1. Apply the basic conditions (i.e. < maxReplicas, > minReplicas, etc...)
   972  // 2. Apply the scale up/down limits from the hpaSpec.Behaviors (i.e. add no more than 4 pods)
   973  // 3. Apply the constraints period (i.e. add no more than 4 pods per minute)
   974  // 4. Apply the stabilization (i.e. add no more than 4 pods per minute, and pick the smallest recommendation during last 5 minutes)
   975  func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 {
   976  	a.maybeInitScaleDownStabilizationWindow(hpa)
   977  	normalizationArg := NormalizationArg{
   978  		Key:               key,
   979  		ScaleUpBehavior:   hpa.Spec.Behavior.ScaleUp,
   980  		ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown,
   981  		MinReplicas:       minReplicas,
   982  		MaxReplicas:       hpa.Spec.MaxReplicas,
   983  		CurrentReplicas:   currentReplicas,
   984  		DesiredReplicas:   prenormalizedDesiredReplicas}
   985  	stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg)
   986  	normalizationArg.DesiredReplicas = stabilizedRecommendation
   987  	if stabilizedRecommendation != prenormalizedDesiredReplicas {
   988  		// "ScaleUpStabilized" || "ScaleDownStabilized"
   989  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, reason, message)
   990  	} else {
   991  		setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
   992  	}
   993  	desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg)
   994  	if desiredReplicas == stabilizedRecommendation {
   995  		setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, reason, message)
   996  	} else {
   997  		setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, reason, message)
   998  	}
   999  
  1000  	return desiredReplicas
  1001  }
  1002  
  1003  func (a *HorizontalController) maybeInitScaleDownStabilizationWindow(hpa *autoscalingv2.HorizontalPodAutoscaler) {
  1004  	behavior := hpa.Spec.Behavior
  1005  	if behavior != nil && behavior.ScaleDown != nil && behavior.ScaleDown.StabilizationWindowSeconds == nil {
  1006  		stabilizationWindowSeconds := (int32)(a.downscaleStabilisationWindow.Seconds())
  1007  		hpa.Spec.Behavior.ScaleDown.StabilizationWindowSeconds = &stabilizationWindowSeconds
  1008  	}
  1009  }
  1010  
  1011  // getReplicasChangePerPeriod function find all the replica changes per period
  1012  func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 {
  1013  	period := time.Second * time.Duration(periodSeconds)
  1014  	cutoff := time.Now().Add(-period)
  1015  	var replicas int32
  1016  	for _, rec := range scaleEvents {
  1017  		if rec.timestamp.After(cutoff) {
  1018  			replicas += rec.replicaChange
  1019  		}
  1020  	}
  1021  	return replicas
  1022  
  1023  }
  1024  
  1025  func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa runtime.Object, reason string, err error) (condition autoscalingv2.HorizontalPodAutoscalerCondition) {
  1026  	a.eventRecorder.Event(hpa, v1.EventTypeWarning, reason, err.Error())
  1027  	return autoscalingv2.HorizontalPodAutoscalerCondition{
  1028  		Type:    autoscalingv2.ScalingActive,
  1029  		Status:  v1.ConditionFalse,
  1030  		Reason:  reason,
  1031  		Message: fmt.Sprintf("the HPA was unable to compute the replica count: %v", err),
  1032  	}
  1033  }
  1034  
  1035  // storeScaleEvent stores (adds or replaces outdated) scale event.
  1036  // outdated events to be replaced were marked as outdated in the `markScaleEventsOutdated` function
  1037  func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.HorizontalPodAutoscalerBehavior, key string, prevReplicas, newReplicas int32) {
  1038  	if behavior == nil {
  1039  		return // we should not store any event as they will not be used
  1040  	}
  1041  	var oldSampleIndex int
  1042  	var longestPolicyPeriod int32
  1043  	foundOldSample := false
  1044  	if newReplicas > prevReplicas {
  1045  		longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp)
  1046  
  1047  		a.scaleUpEventsLock.Lock()
  1048  		defer a.scaleUpEventsLock.Unlock()
  1049  		markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod)
  1050  		replicaChange := newReplicas - prevReplicas
  1051  		for i, event := range a.scaleUpEvents[key] {
  1052  			if event.outdated {
  1053  				foundOldSample = true
  1054  				oldSampleIndex = i
  1055  			}
  1056  		}
  1057  		newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
  1058  		if foundOldSample {
  1059  			a.scaleUpEvents[key][oldSampleIndex] = newEvent
  1060  		} else {
  1061  			a.scaleUpEvents[key] = append(a.scaleUpEvents[key], newEvent)
  1062  		}
  1063  	} else {
  1064  		longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown)
  1065  
  1066  		a.scaleDownEventsLock.Lock()
  1067  		defer a.scaleDownEventsLock.Unlock()
  1068  		markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod)
  1069  		replicaChange := prevReplicas - newReplicas
  1070  		for i, event := range a.scaleDownEvents[key] {
  1071  			if event.outdated {
  1072  				foundOldSample = true
  1073  				oldSampleIndex = i
  1074  			}
  1075  		}
  1076  		newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
  1077  		if foundOldSample {
  1078  			a.scaleDownEvents[key][oldSampleIndex] = newEvent
  1079  		} else {
  1080  			a.scaleDownEvents[key] = append(a.scaleDownEvents[key], newEvent)
  1081  		}
  1082  	}
  1083  }
  1084  
  1085  // stabilizeRecommendationWithBehaviors:
  1086  // - replaces old recommendation with the newest recommendation,
  1087  // - returns {max,min} of recommendations that are not older than constraints.Scale{Up,Down}.DelaySeconds
  1088  func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) {
  1089  	now := time.Now()
  1090  
  1091  	foundOldSample := false
  1092  	oldSampleIndex := 0
  1093  
  1094  	upRecommendation := args.DesiredReplicas
  1095  	upDelaySeconds := *args.ScaleUpBehavior.StabilizationWindowSeconds
  1096  	upCutoff := now.Add(-time.Second * time.Duration(upDelaySeconds))
  1097  
  1098  	downRecommendation := args.DesiredReplicas
  1099  	downDelaySeconds := *args.ScaleDownBehavior.StabilizationWindowSeconds
  1100  	downCutoff := now.Add(-time.Second * time.Duration(downDelaySeconds))
  1101  
  1102  	// Calculate the upper and lower stabilization limits.
  1103  	a.recommendationsLock.Lock()
  1104  	defer a.recommendationsLock.Unlock()
  1105  	for i, rec := range a.recommendations[args.Key] {
  1106  		if rec.timestamp.After(upCutoff) {
  1107  			upRecommendation = min(rec.recommendation, upRecommendation)
  1108  		}
  1109  		if rec.timestamp.After(downCutoff) {
  1110  			downRecommendation = max(rec.recommendation, downRecommendation)
  1111  		}
  1112  		if rec.timestamp.Before(upCutoff) && rec.timestamp.Before(downCutoff) {
  1113  			foundOldSample = true
  1114  			oldSampleIndex = i
  1115  		}
  1116  	}
  1117  
  1118  	// Bring the recommendation to within the upper and lower limits (stabilize).
  1119  	recommendation := args.CurrentReplicas
  1120  	if recommendation < upRecommendation {
  1121  		recommendation = upRecommendation
  1122  	}
  1123  	if recommendation > downRecommendation {
  1124  		recommendation = downRecommendation
  1125  	}
  1126  
  1127  	// Record the unstabilized recommendation.
  1128  	if foundOldSample {
  1129  		a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()}
  1130  	} else {
  1131  		a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()})
  1132  	}
  1133  
  1134  	// Determine a human-friendly message.
  1135  	var reason, message string
  1136  	if args.DesiredReplicas >= args.CurrentReplicas {
  1137  		reason = "ScaleUpStabilized"
  1138  		message = "recent recommendations were lower than current one, applying the lowest recent recommendation"
  1139  	} else {
  1140  		reason = "ScaleDownStabilized"
  1141  		message = "recent recommendations were higher than current one, applying the highest recent recommendation"
  1142  	}
  1143  	return recommendation, reason, message
  1144  }
  1145  
  1146  // convertDesiredReplicasWithBehaviorRate performs the actual normalization, given the constraint rate
  1147  // It doesn't consider the stabilizationWindow, it is done separately
  1148  func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) {
  1149  	var possibleLimitingReason, possibleLimitingMessage string
  1150  
  1151  	if args.DesiredReplicas > args.CurrentReplicas {
  1152  		a.scaleUpEventsLock.RLock()
  1153  		defer a.scaleUpEventsLock.RUnlock()
  1154  		a.scaleDownEventsLock.RLock()
  1155  		defer a.scaleDownEventsLock.RUnlock()
  1156  		scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleUpBehavior)
  1157  
  1158  		if scaleUpLimit < args.CurrentReplicas {
  1159  			// We shouldn't scale up further until the scaleUpEvents will be cleaned up
  1160  			scaleUpLimit = args.CurrentReplicas
  1161  		}
  1162  		maximumAllowedReplicas := args.MaxReplicas
  1163  		if maximumAllowedReplicas > scaleUpLimit {
  1164  			maximumAllowedReplicas = scaleUpLimit
  1165  			possibleLimitingReason = "ScaleUpLimit"
  1166  			possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate"
  1167  		} else {
  1168  			possibleLimitingReason = "TooManyReplicas"
  1169  			possibleLimitingMessage = "the desired replica count is more than the maximum replica count"
  1170  		}
  1171  		if args.DesiredReplicas > maximumAllowedReplicas {
  1172  			return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
  1173  		}
  1174  	} else if args.DesiredReplicas < args.CurrentReplicas {
  1175  		a.scaleUpEventsLock.RLock()
  1176  		defer a.scaleUpEventsLock.RUnlock()
  1177  		a.scaleDownEventsLock.RLock()
  1178  		defer a.scaleDownEventsLock.RUnlock()
  1179  		scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleDownBehavior)
  1180  
  1181  		if scaleDownLimit > args.CurrentReplicas {
  1182  			// We shouldn't scale down further until the scaleDownEvents will be cleaned up
  1183  			scaleDownLimit = args.CurrentReplicas
  1184  		}
  1185  		minimumAllowedReplicas := args.MinReplicas
  1186  		if minimumAllowedReplicas < scaleDownLimit {
  1187  			minimumAllowedReplicas = scaleDownLimit
  1188  			possibleLimitingReason = "ScaleDownLimit"
  1189  			possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate"
  1190  		} else {
  1191  			possibleLimitingMessage = "the desired replica count is less than the minimum replica count"
  1192  			possibleLimitingReason = "TooFewReplicas"
  1193  		}
  1194  		if args.DesiredReplicas < minimumAllowedReplicas {
  1195  			return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
  1196  		}
  1197  	}
  1198  	return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
  1199  }
  1200  
  1201  // convertDesiredReplicas performs the actual normalization, without depending on `HorizontalController` or `HorizontalPodAutoscaler`
  1202  func convertDesiredReplicasWithRules(currentReplicas, desiredReplicas, hpaMinReplicas, hpaMaxReplicas int32) (int32, string, string) {
  1203  
  1204  	var minimumAllowedReplicas int32
  1205  	var maximumAllowedReplicas int32
  1206  
  1207  	var possibleLimitingCondition string
  1208  	var possibleLimitingReason string
  1209  
  1210  	minimumAllowedReplicas = hpaMinReplicas
  1211  
  1212  	// Do not scaleup too much to prevent incorrect rapid increase of the number of master replicas caused by
  1213  	// bogus CPU usage report from heapster/kubelet (like in issue #32304).
  1214  	scaleUpLimit := calculateScaleUpLimit(currentReplicas)
  1215  
  1216  	if hpaMaxReplicas > scaleUpLimit {
  1217  		maximumAllowedReplicas = scaleUpLimit
  1218  		possibleLimitingCondition = "ScaleUpLimit"
  1219  		possibleLimitingReason = "the desired replica count is increasing faster than the maximum scale rate"
  1220  	} else {
  1221  		maximumAllowedReplicas = hpaMaxReplicas
  1222  		possibleLimitingCondition = "TooManyReplicas"
  1223  		possibleLimitingReason = "the desired replica count is more than the maximum replica count"
  1224  	}
  1225  
  1226  	if desiredReplicas < minimumAllowedReplicas {
  1227  		possibleLimitingCondition = "TooFewReplicas"
  1228  		possibleLimitingReason = "the desired replica count is less than the minimum replica count"
  1229  
  1230  		return minimumAllowedReplicas, possibleLimitingCondition, possibleLimitingReason
  1231  	} else if desiredReplicas > maximumAllowedReplicas {
  1232  		return maximumAllowedReplicas, possibleLimitingCondition, possibleLimitingReason
  1233  	}
  1234  
  1235  	return desiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
  1236  }
  1237  
  1238  func calculateScaleUpLimit(currentReplicas int32) int32 {
  1239  	return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum))
  1240  }
  1241  
  1242  // markScaleEventsOutdated set 'outdated=true' flag for all scale events that are not used by any HPA object
  1243  func markScaleEventsOutdated(scaleEvents []timestampedScaleEvent, longestPolicyPeriod int32) {
  1244  	period := time.Second * time.Duration(longestPolicyPeriod)
  1245  	cutoff := time.Now().Add(-period)
  1246  	for i, event := range scaleEvents {
  1247  		if event.timestamp.Before(cutoff) {
  1248  			// outdated scale event are marked for later reuse
  1249  			scaleEvents[i].outdated = true
  1250  		}
  1251  	}
  1252  }
  1253  
  1254  func getLongestPolicyPeriod(scalingRules *autoscalingv2.HPAScalingRules) int32 {
  1255  	var longestPolicyPeriod int32
  1256  	for _, policy := range scalingRules.Policies {
  1257  		if policy.PeriodSeconds > longestPolicyPeriod {
  1258  			longestPolicyPeriod = policy.PeriodSeconds
  1259  		}
  1260  	}
  1261  	return longestPolicyPeriod
  1262  }
  1263  
  1264  // calculateScaleUpLimitWithScalingRules returns the maximum number of pods that could be added for the given HPAScalingRules
  1265  func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleUpEvents, scaleDownEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
  1266  	var result int32
  1267  	var proposed int32
  1268  	var selectPolicyFn func(int32, int32) int32
  1269  	if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
  1270  		return currentReplicas // Scaling is disabled
  1271  	} else if *scalingRules.SelectPolicy == autoscalingv2.MinChangePolicySelect {
  1272  		result = math.MaxInt32
  1273  		selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value
  1274  	} else {
  1275  		result = math.MinInt32
  1276  		selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change
  1277  	}
  1278  	for _, policy := range scalingRules.Policies {
  1279  		replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleUpEvents)
  1280  		replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleDownEvents)
  1281  		periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod + replicasDeletedInCurrentPeriod
  1282  		if policy.Type == autoscalingv2.PodsScalingPolicy {
  1283  			proposed = periodStartReplicas + policy.Value
  1284  		} else if policy.Type == autoscalingv2.PercentScalingPolicy {
  1285  			// the proposal has to be rounded up because the proposed change might not increase the replica count causing the target to never scale up
  1286  			proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100)))
  1287  		}
  1288  		result = selectPolicyFn(result, proposed)
  1289  	}
  1290  	return result
  1291  }
  1292  
  1293  // calculateScaleDownLimitWithBehavior returns the maximum number of pods that could be deleted for the given HPAScalingRules
  1294  func calculateScaleDownLimitWithBehaviors(currentReplicas int32, scaleUpEvents, scaleDownEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
  1295  	var result int32
  1296  	var proposed int32
  1297  	var selectPolicyFn func(int32, int32) int32
  1298  	if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
  1299  		return currentReplicas // Scaling is disabled
  1300  	} else if *scalingRules.SelectPolicy == autoscalingv2.MinChangePolicySelect {
  1301  		result = math.MinInt32
  1302  		selectPolicyFn = max // For scaling down, the lowest change ('min' policy) produces a maximum value
  1303  	} else {
  1304  		result = math.MaxInt32
  1305  		selectPolicyFn = min // Use the default policy otherwise to produce a highest possible change
  1306  	}
  1307  	for _, policy := range scalingRules.Policies {
  1308  		replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleUpEvents)
  1309  		replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleDownEvents)
  1310  		periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod + replicasDeletedInCurrentPeriod
  1311  		if policy.Type == autoscalingv2.PodsScalingPolicy {
  1312  			proposed = periodStartReplicas - policy.Value
  1313  		} else if policy.Type == autoscalingv2.PercentScalingPolicy {
  1314  			proposed = int32(float64(periodStartReplicas) * (1 - float64(policy.Value)/100))
  1315  		}
  1316  		result = selectPolicyFn(result, proposed)
  1317  	}
  1318  	return result
  1319  }
  1320  
  1321  // scaleForResourceMappings attempts to fetch the scale for the
  1322  // resource with the given name and namespace, trying each RESTMapping
  1323  // in turn until a working one is found.  If none work, the first error
  1324  // is returned.  It returns both the scale, as well as the group-resource from
  1325  // the working mapping.
  1326  func (a *HorizontalController) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) {
  1327  	var firstErr error
  1328  	for i, mapping := range mappings {
  1329  		targetGR := mapping.Resource.GroupResource()
  1330  		scale, err := a.scaleNamespacer.Scales(namespace).Get(ctx, targetGR, name, metav1.GetOptions{})
  1331  		if err == nil {
  1332  			return scale, targetGR, nil
  1333  		}
  1334  
  1335  		// if this is the first error, remember it,
  1336  		// then go on and try other mappings until we find a good one
  1337  		if i == 0 {
  1338  			firstErr = err
  1339  		}
  1340  	}
  1341  
  1342  	// make sure we handle an empty set of mappings
  1343  	if firstErr == nil {
  1344  		firstErr = fmt.Errorf("unrecognized resource")
  1345  	}
  1346  
  1347  	return nil, schema.GroupResource{}, firstErr
  1348  }
  1349  
  1350  // setCurrentReplicasAndMetricsInStatus sets the current replica count and metrics in the status of the HPA.
  1351  func (a *HorizontalController) setCurrentReplicasAndMetricsInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32, metricStatuses []autoscalingv2.MetricStatus) {
  1352  	a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, metricStatuses, false)
  1353  }
  1354  
  1355  // setStatus recreates the status of the given HPA, updating the current and
  1356  // desired replicas, as well as the metric statuses
  1357  func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
  1358  	hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
  1359  		CurrentReplicas: currentReplicas,
  1360  		DesiredReplicas: desiredReplicas,
  1361  		LastScaleTime:   hpa.Status.LastScaleTime,
  1362  		CurrentMetrics:  metricStatuses,
  1363  		Conditions:      hpa.Status.Conditions,
  1364  	}
  1365  
  1366  	if rescale {
  1367  		now := metav1.NewTime(time.Now())
  1368  		hpa.Status.LastScaleTime = &now
  1369  	}
  1370  }
  1371  
  1372  // updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
  1373  func (a *HorizontalController) updateStatusIfNeeded(ctx context.Context, oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
  1374  	// skip a write if we wouldn't need to update
  1375  	if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
  1376  		return nil
  1377  	}
  1378  	return a.updateStatus(ctx, newHPA)
  1379  }
  1380  
  1381  // updateStatus actually does the update request for the status of the given HPA
  1382  func (a *HorizontalController) updateStatus(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
  1383  	_, err := a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(ctx, hpa, metav1.UpdateOptions{})
  1384  	if err != nil {
  1385  		a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
  1386  		return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
  1387  	}
  1388  	logger := klog.FromContext(ctx)
  1389  	logger.V(2).Info("Successfully updated status", "HPA", klog.KObj(hpa))
  1390  	return nil
  1391  }
  1392  
  1393  // setCondition sets the specific condition type on the given HPA to the specified value with the given reason
  1394  // and message.  The message and args are treated like a format string.  The condition will be added if it is
  1395  // not present.
  1396  func setCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) {
  1397  	hpa.Status.Conditions = setConditionInList(hpa.Status.Conditions, conditionType, status, reason, message, args...)
  1398  }
  1399  
  1400  // setConditionInList sets the specific condition type on the given HPA to the specified value with the given
  1401  // reason and message.  The message and args are treated like a format string.  The condition will be added if
  1402  // it is not present.  The new list will be returned.
  1403  func setConditionInList(inputList []autoscalingv2.HorizontalPodAutoscalerCondition, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) []autoscalingv2.HorizontalPodAutoscalerCondition {
  1404  	resList := inputList
  1405  	var existingCond *autoscalingv2.HorizontalPodAutoscalerCondition
  1406  	for i, condition := range resList {
  1407  		if condition.Type == conditionType {
  1408  			// can't take a pointer to an iteration variable
  1409  			existingCond = &resList[i]
  1410  			break
  1411  		}
  1412  	}
  1413  
  1414  	if existingCond == nil {
  1415  		resList = append(resList, autoscalingv2.HorizontalPodAutoscalerCondition{
  1416  			Type: conditionType,
  1417  		})
  1418  		existingCond = &resList[len(resList)-1]
  1419  	}
  1420  
  1421  	if existingCond.Status != status {
  1422  		existingCond.LastTransitionTime = metav1.Now()
  1423  	}
  1424  
  1425  	existingCond.Status = status
  1426  	existingCond.Reason = reason
  1427  	existingCond.Message = fmt.Sprintf(message, args...)
  1428  
  1429  	return resList
  1430  }
  1431  
  1432  func max(a, b int32) int32 {
  1433  	if a >= b {
  1434  		return a
  1435  	}
  1436  	return b
  1437  }
  1438  
  1439  func min(a, b int32) int32 {
  1440  	if a <= b {
  1441  		return a
  1442  	}
  1443  	return b
  1444  }
  1445  

View as plain text