1
16
17 package podautoscaler
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math"
24 "sync"
25 "time"
26
27 autoscalingv1 "k8s.io/api/autoscaling/v1"
28 autoscalingv2 "k8s.io/api/autoscaling/v2"
29 v1 "k8s.io/api/core/v1"
30 apiequality "k8s.io/apimachinery/pkg/api/equality"
31 k8serrors "k8s.io/apimachinery/pkg/api/errors"
32 apimeta "k8s.io/apimachinery/pkg/api/meta"
33 "k8s.io/apimachinery/pkg/api/resource"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/labels"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
39 "k8s.io/apimachinery/pkg/util/wait"
40 autoscalinginformers "k8s.io/client-go/informers/autoscaling/v2"
41 coreinformers "k8s.io/client-go/informers/core/v1"
42 "k8s.io/client-go/kubernetes/scheme"
43 autoscalingclient "k8s.io/client-go/kubernetes/typed/autoscaling/v2"
44 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
45 autoscalinglisters "k8s.io/client-go/listers/autoscaling/v2"
46 corelisters "k8s.io/client-go/listers/core/v1"
47 scaleclient "k8s.io/client-go/scale"
48 "k8s.io/client-go/tools/cache"
49 "k8s.io/client-go/tools/record"
50 "k8s.io/client-go/util/workqueue"
51 "k8s.io/klog/v2"
52 "k8s.io/kubernetes/pkg/controller"
53 metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
54 "k8s.io/kubernetes/pkg/controller/podautoscaler/monitor"
55 "k8s.io/kubernetes/pkg/controller/util/selectors"
56 )
57
58 var (
59 scaleUpLimitFactor = 2.0
60 scaleUpLimitMinimum = 4.0
61 )
62
63 var (
64
65
66
67 errSpec error = errors.New("")
68 )
69
70 type timestampedRecommendation struct {
71 recommendation int32
72 timestamp time.Time
73 }
74
75 type timestampedScaleEvent struct {
76 replicaChange int32
77 timestamp time.Time
78 outdated bool
79 }
80
81
82
83
84 type HorizontalController struct {
85 scaleNamespacer scaleclient.ScalesGetter
86 hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter
87 mapper apimeta.RESTMapper
88
89 replicaCalc *ReplicaCalculator
90 eventRecorder record.EventRecorder
91
92 downscaleStabilisationWindow time.Duration
93
94 monitor monitor.Monitor
95
96
97
98 hpaLister autoscalinglisters.HorizontalPodAutoscalerLister
99 hpaListerSynced cache.InformerSynced
100
101
102
103 podLister corelisters.PodLister
104 podListerSynced cache.InformerSynced
105
106
107 queue workqueue.RateLimitingInterface
108
109
110 recommendations map[string][]timestampedRecommendation
111 recommendationsLock sync.Mutex
112
113
114 scaleUpEvents map[string][]timestampedScaleEvent
115 scaleUpEventsLock sync.RWMutex
116 scaleDownEvents map[string][]timestampedScaleEvent
117 scaleDownEventsLock sync.RWMutex
118
119
120 hpaSelectors *selectors.BiMultimap
121 hpaSelectorsMux sync.Mutex
122 }
123
124
125 func NewHorizontalController(
126 ctx context.Context,
127 evtNamespacer v1core.EventsGetter,
128 scaleNamespacer scaleclient.ScalesGetter,
129 hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
130 mapper apimeta.RESTMapper,
131 metricsClient metricsclient.MetricsClient,
132 hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
133 podInformer coreinformers.PodInformer,
134 resyncPeriod time.Duration,
135 downscaleStabilisationWindow time.Duration,
136 tolerance float64,
137 cpuInitializationPeriod,
138 delayOfInitialReadinessStatus time.Duration,
139 ) *HorizontalController {
140 broadcaster := record.NewBroadcaster(record.WithContext(ctx))
141 broadcaster.StartStructuredLogging(3)
142 broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
143 recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})
144
145 hpaController := &HorizontalController{
146 eventRecorder: recorder,
147 scaleNamespacer: scaleNamespacer,
148 hpaNamespacer: hpaNamespacer,
149 downscaleStabilisationWindow: downscaleStabilisationWindow,
150 monitor: monitor.New(),
151 queue: workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
152 mapper: mapper,
153 recommendations: map[string][]timestampedRecommendation{},
154 recommendationsLock: sync.Mutex{},
155 scaleUpEvents: map[string][]timestampedScaleEvent{},
156 scaleUpEventsLock: sync.RWMutex{},
157 scaleDownEvents: map[string][]timestampedScaleEvent{},
158 scaleDownEventsLock: sync.RWMutex{},
159 hpaSelectors: selectors.NewBiMultimap(),
160 }
161
162 hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
163 cache.ResourceEventHandlerFuncs{
164 AddFunc: hpaController.enqueueHPA,
165 UpdateFunc: hpaController.updateHPA,
166 DeleteFunc: hpaController.deleteHPA,
167 },
168 resyncPeriod,
169 )
170 hpaController.hpaLister = hpaInformer.Lister()
171 hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced
172
173 hpaController.podLister = podInformer.Lister()
174 hpaController.podListerSynced = podInformer.Informer().HasSynced
175
176 replicaCalc := NewReplicaCalculator(
177 metricsClient,
178 hpaController.podLister,
179 tolerance,
180 cpuInitializationPeriod,
181 delayOfInitialReadinessStatus,
182 )
183 hpaController.replicaCalc = replicaCalc
184
185 monitor.Register()
186
187 return hpaController
188 }
189
190
191 func (a *HorizontalController) Run(ctx context.Context, workers int) {
192 defer utilruntime.HandleCrash()
193 defer a.queue.ShutDown()
194
195 logger := klog.FromContext(ctx)
196 logger.Info("Starting HPA controller")
197 defer logger.Info("Shutting down HPA controller")
198
199 if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
200 return
201 }
202
203 for i := 0; i < workers; i++ {
204 go wait.UntilWithContext(ctx, a.worker, time.Second)
205 }
206
207 <-ctx.Done()
208 }
209
210
211 func (a *HorizontalController) updateHPA(old, cur interface{}) {
212 a.enqueueHPA(cur)
213 }
214
215
216 func (a *HorizontalController) enqueueHPA(obj interface{}) {
217 key, err := controller.KeyFunc(obj)
218 if err != nil {
219 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
220 return
221 }
222
223
224
225
226 a.queue.AddRateLimited(key)
227
228
229
230
231 a.hpaSelectorsMux.Lock()
232 defer a.hpaSelectorsMux.Unlock()
233 if hpaKey := selectors.Parse(key); !a.hpaSelectors.SelectorExists(hpaKey) {
234 a.hpaSelectors.PutSelector(hpaKey, labels.Nothing())
235 }
236 }
237
238 func (a *HorizontalController) deleteHPA(obj interface{}) {
239 key, err := controller.KeyFunc(obj)
240 if err != nil {
241 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
242 return
243 }
244
245
246 a.queue.Forget(key)
247
248
249 a.hpaSelectorsMux.Lock()
250 defer a.hpaSelectorsMux.Unlock()
251 a.hpaSelectors.DeleteSelector(selectors.Parse(key))
252 }
253
254 func (a *HorizontalController) worker(ctx context.Context) {
255 for a.processNextWorkItem(ctx) {
256 }
257 logger := klog.FromContext(ctx)
258 logger.Info("Horizontal Pod Autoscaler controller worker shutting down")
259 }
260
261 func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
262 key, quit := a.queue.Get()
263 if quit {
264 return false
265 }
266 defer a.queue.Done(key)
267
268 deleted, err := a.reconcileKey(ctx, key.(string))
269 if err != nil {
270 utilruntime.HandleError(err)
271 }
272
273
274
275
276
277
278
279
280
281 if !deleted {
282 a.queue.AddRateLimited(key)
283 }
284
285 return true
286 }
287
288
289
290
291
292
293
294 func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
295 metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
296
297 selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
298 if err != nil {
299 return -1, "", nil, time.Time{}, err
300 }
301
302 specReplicas := scale.Spec.Replicas
303 statusReplicas := scale.Status.Replicas
304 statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
305
306 invalidMetricsCount := 0
307 var invalidMetricError error
308 var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition
309
310 for i, metricSpec := range metricSpecs {
311 replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(ctx, hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
312
313 if err != nil {
314 if invalidMetricsCount <= 0 {
315 invalidMetricCondition = condition
316 invalidMetricError = err
317 }
318 invalidMetricsCount++
319 continue
320 }
321 if replicas == 0 || replicaCountProposal > replicas {
322 timestamp = timestampProposal
323 replicas = replicaCountProposal
324 metric = metricNameProposal
325 }
326 }
327
328 if invalidMetricError != nil {
329 invalidMetricError = fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
330 }
331
332
333
334
335 if invalidMetricsCount >= len(metricSpecs) || (invalidMetricsCount > 0 && replicas < specReplicas) {
336 setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
337 return -1, "", statuses, time.Time{}, invalidMetricError
338 }
339 setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
340
341 return replicas, metric, statuses, timestamp, invalidMetricError
342 }
343
344
345 func (a *HorizontalController) hpasControllingPodsUnderSelector(pods []*v1.Pod) []selectors.Key {
346 a.hpaSelectorsMux.Lock()
347 defer a.hpaSelectorsMux.Unlock()
348
349 hpas := map[selectors.Key]struct{}{}
350 for _, p := range pods {
351 podKey := selectors.Key{Name: p.Name, Namespace: p.Namespace}
352 a.hpaSelectors.Put(podKey, p.Labels)
353
354 selectingHpas, ok := a.hpaSelectors.ReverseSelect(podKey)
355 if !ok {
356 continue
357 }
358 for _, hpa := range selectingHpas {
359 hpas[hpa] = struct{}{}
360 }
361 }
362
363 a.hpaSelectors.KeepOnly([]selectors.Key{})
364
365 hpaList := []selectors.Key{}
366 for hpa := range hpas {
367 hpaList = append(hpaList, hpa)
368 }
369 return hpaList
370 }
371
372
373
374
375
376
377
378 func (a *HorizontalController) validateAndParseSelector(hpa *autoscalingv2.HorizontalPodAutoscaler, selector string) (labels.Selector, error) {
379 if selector == "" {
380 errMsg := "selector is required"
381 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
382 setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
383 return nil, fmt.Errorf(errMsg)
384 }
385
386 parsedSelector, err := labels.Parse(selector)
387 if err != nil {
388 errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
389 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
390 setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
391 return nil, fmt.Errorf(errMsg)
392 }
393
394 hpaKey := selectors.Key{Name: hpa.Name, Namespace: hpa.Namespace}
395 a.hpaSelectorsMux.Lock()
396 if a.hpaSelectors.SelectorExists(hpaKey) {
397
398 a.hpaSelectors.PutSelector(hpaKey, parsedSelector)
399 }
400 a.hpaSelectorsMux.Unlock()
401
402 pods, err := a.podLister.Pods(hpa.Namespace).List(parsedSelector)
403 if err != nil {
404 return nil, err
405 }
406
407 selectingHpas := a.hpasControllingPodsUnderSelector(pods)
408 if len(selectingHpas) > 1 {
409 errMsg := fmt.Sprintf("pods by selector %v are controlled by multiple HPAs: %v", selector, selectingHpas)
410 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "AmbiguousSelector", errMsg)
411 setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "AmbiguousSelector", errMsg)
412 return nil, fmt.Errorf(errMsg)
413 }
414
415 return parsedSelector, nil
416 }
417
418
419
420 func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
421 specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
422 timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
423
424 start := time.Now()
425 defer func() {
426 actionLabel := monitor.ActionLabelNone
427 switch {
428 case replicaCountProposal > hpa.Status.CurrentReplicas:
429 actionLabel = monitor.ActionLabelScaleUp
430 case replicaCountProposal < hpa.Status.CurrentReplicas:
431 actionLabel = monitor.ActionLabelScaleDown
432 }
433
434 errorLabel := monitor.ErrorLabelNone
435 if err != nil {
436
437 errorLabel = monitor.ErrorLabelInternal
438 actionLabel = monitor.ActionLabelNone
439 }
440 if errors.Is(err, errSpec) {
441 errorLabel = monitor.ErrorLabelSpec
442 }
443
444 a.monitor.ObserveMetricComputationResult(actionLabel, errorLabel, time.Since(start), spec.Type)
445 }()
446
447 switch spec.Type {
448 case autoscalingv2.ObjectMetricSourceType:
449 metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
450 if err != nil {
451 condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
452 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
453 }
454 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
455 if err != nil {
456 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
457 }
458 case autoscalingv2.PodsMetricSourceType:
459 metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
460 if err != nil {
461 condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
462 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
463 }
464 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
465 if err != nil {
466 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
467 }
468 case autoscalingv2.ResourceMetricSourceType:
469 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
470 if err != nil {
471 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
472 }
473 case autoscalingv2.ContainerResourceMetricSourceType:
474 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
475 if err != nil {
476 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
477 }
478 case autoscalingv2.ExternalMetricSourceType:
479 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
480 if err != nil {
481 return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
482 }
483 default:
484
485 err = fmt.Errorf("unknown metric source type %q%w", string(spec.Type), errSpec)
486 condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
487 return 0, "", time.Time{}, condition, err
488 }
489 return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
490 }
491
492 func (a *HorizontalController) reconcileKey(ctx context.Context, key string) (deleted bool, err error) {
493 namespace, name, err := cache.SplitMetaNamespaceKey(key)
494 if err != nil {
495 return true, err
496 }
497
498 logger := klog.FromContext(ctx)
499
500 hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
501 if k8serrors.IsNotFound(err) {
502 logger.Info("Horizontal Pod Autoscaler has been deleted", "HPA", klog.KRef(namespace, name))
503
504 a.recommendationsLock.Lock()
505 delete(a.recommendations, key)
506 a.recommendationsLock.Unlock()
507
508 a.scaleUpEventsLock.Lock()
509 delete(a.scaleUpEvents, key)
510 a.scaleUpEventsLock.Unlock()
511
512 a.scaleDownEventsLock.Lock()
513 delete(a.scaleDownEvents, key)
514 a.scaleDownEventsLock.Unlock()
515
516 return true, nil
517 }
518 if err != nil {
519 return false, err
520 }
521
522 return false, a.reconcileAutoscaler(ctx, hpa, key)
523 }
524
525
526 func (a *HorizontalController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
527 if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType && metricSpec.Object.Target.Value != nil {
528 replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, selector, metricSelector)
529 if err != nil {
530 condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
531 return 0, timestampProposal, "", condition, err
532 }
533 *status = autoscalingv2.MetricStatus{
534 Type: autoscalingv2.ObjectMetricSourceType,
535 Object: &autoscalingv2.ObjectMetricStatus{
536 DescribedObject: metricSpec.Object.DescribedObject,
537 Metric: autoscalingv2.MetricIdentifier{
538 Name: metricSpec.Object.Metric.Name,
539 Selector: metricSpec.Object.Metric.Selector,
540 },
541 Current: autoscalingv2.MetricValueStatus{
542 Value: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
543 },
544 },
545 }
546 return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
547 } else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType && metricSpec.Object.Target.AverageValue != nil {
548 replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector)
549 if err != nil {
550 condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
551 return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err)
552 }
553 *status = autoscalingv2.MetricStatus{
554 Type: autoscalingv2.ObjectMetricSourceType,
555 Object: &autoscalingv2.ObjectMetricStatus{
556 Metric: autoscalingv2.MetricIdentifier{
557 Name: metricSpec.Object.Metric.Name,
558 Selector: metricSpec.Object.Metric.Selector,
559 },
560 Current: autoscalingv2.MetricValueStatus{
561 AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
562 },
563 },
564 }
565 return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
566 }
567 errMsg := "invalid object metric source: neither a value target nor an average value target was set"
568 err = fmt.Errorf(errMsg)
569 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
570 return 0, time.Time{}, "", condition, err
571 }
572
573
574 func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
575 replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)
576 if err != nil {
577 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
578 return 0, timestampProposal, "", condition, err
579 }
580 *status = autoscalingv2.MetricStatus{
581 Type: autoscalingv2.PodsMetricSourceType,
582 Pods: &autoscalingv2.PodsMetricStatus{
583 Metric: autoscalingv2.MetricIdentifier{
584 Name: metricSpec.Pods.Metric.Name,
585 Selector: metricSpec.Pods.Metric.Selector,
586 },
587 Current: autoscalingv2.MetricValueStatus{
588 AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
589 },
590 },
591 }
592
593 return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
594 }
595
596 func (a *HorizontalController) computeStatusForResourceMetricGeneric(ctx context.Context, currentReplicas int32, target autoscalingv2.MetricTarget,
597 resourceName v1.ResourceName, namespace string, container string, selector labels.Selector, sourceType autoscalingv2.MetricSourceType) (replicaCountProposal int32,
598 metricStatus *autoscalingv2.MetricValueStatus, timestampProposal time.Time, metricNameProposal string,
599 condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
600 if target.AverageValue != nil {
601 var rawProposal int64
602 replicaCountProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetRawResourceReplicas(ctx, currentReplicas, target.AverageValue.MilliValue(), resourceName, namespace, selector, container)
603 if err != nil {
604 return 0, nil, time.Time{}, "", condition, fmt.Errorf("failed to get %s usage: %v", resourceName, err)
605 }
606 metricNameProposal = fmt.Sprintf("%s resource", resourceName.String())
607 status := autoscalingv2.MetricValueStatus{
608 AverageValue: resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
609 }
610 return replicaCountProposal, &status, timestampProposal, metricNameProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
611 }
612
613 if target.AverageUtilization == nil {
614 errMsg := "invalid resource metric source: neither an average utilization target nor an average value (usage) target was set"
615 return 0, nil, time.Time{}, "", condition, fmt.Errorf(errMsg)
616 }
617
618 targetUtilization := *target.AverageUtilization
619 replicaCountProposal, percentageProposal, rawProposal, timestampProposal, err := a.replicaCalc.GetResourceReplicas(ctx, currentReplicas, targetUtilization, resourceName, namespace, selector, container)
620 if err != nil {
621 return 0, nil, time.Time{}, "", condition, fmt.Errorf("failed to get %s utilization: %v", resourceName, err)
622 }
623
624 metricNameProposal = fmt.Sprintf("%s resource utilization (percentage of request)", resourceName)
625 if sourceType == autoscalingv2.ContainerResourceMetricSourceType {
626 metricNameProposal = fmt.Sprintf("%s container resource utilization (percentage of request)", resourceName)
627 }
628 status := autoscalingv2.MetricValueStatus{
629 AverageUtilization: &percentageProposal,
630 AverageValue: resource.NewMilliQuantity(rawProposal, resource.DecimalSI),
631 }
632 return replicaCountProposal, &status, timestampProposal, metricNameProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
633 }
634
635
636 func (a *HorizontalController) computeStatusForResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
637 selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time,
638 metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
639 replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.Resource.Target, metricSpec.Resource.Name, hpa.Namespace, "", selector, autoscalingv2.ResourceMetricSourceType)
640 if err != nil {
641 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetResourceMetric", err)
642 return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
643 }
644 *status = autoscalingv2.MetricStatus{
645 Type: autoscalingv2.ResourceMetricSourceType,
646 Resource: &autoscalingv2.ResourceMetricStatus{
647 Name: metricSpec.Resource.Name,
648 Current: *metricValueStatus,
649 },
650 }
651 return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil
652 }
653
654
655 func (a *HorizontalController) computeStatusForContainerResourceMetric(ctx context.Context, currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler,
656 selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time,
657 metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
658 replicaCountProposal, metricValueStatus, timestampProposal, metricNameProposal, condition, err := a.computeStatusForResourceMetricGeneric(ctx, currentReplicas, metricSpec.ContainerResource.Target, metricSpec.ContainerResource.Name, hpa.Namespace, metricSpec.ContainerResource.Container, selector, autoscalingv2.ContainerResourceMetricSourceType)
659 if err != nil {
660 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetContainerResourceMetric", err)
661 return replicaCountProposal, timestampProposal, metricNameProposal, condition, err
662 }
663 *status = autoscalingv2.MetricStatus{
664 Type: autoscalingv2.ContainerResourceMetricSourceType,
665 ContainerResource: &autoscalingv2.ContainerResourceMetricStatus{
666 Name: metricSpec.ContainerResource.Name,
667 Container: metricSpec.ContainerResource.Container,
668 Current: *metricValueStatus,
669 },
670 }
671 return replicaCountProposal, timestampProposal, metricNameProposal, condition, nil
672 }
673
674
675 func (a *HorizontalController) computeStatusForExternalMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
676 if metricSpec.External.Target.AverageValue != nil {
677 replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetExternalPerPodMetricReplicas(statusReplicas, metricSpec.External.Target.AverageValue.MilliValue(), metricSpec.External.Metric.Name, hpa.Namespace, metricSpec.External.Metric.Selector)
678 if err != nil {
679 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
680 return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s external metric: %v", metricSpec.External.Metric.Name, err)
681 }
682 *status = autoscalingv2.MetricStatus{
683 Type: autoscalingv2.ExternalMetricSourceType,
684 External: &autoscalingv2.ExternalMetricStatus{
685 Metric: autoscalingv2.MetricIdentifier{
686 Name: metricSpec.External.Metric.Name,
687 Selector: metricSpec.External.Metric.Selector,
688 },
689 Current: autoscalingv2.MetricValueStatus{
690 AverageValue: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
691 },
692 },
693 }
694 return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.External.Metric.Name, metricSpec.External.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
695 }
696 if metricSpec.External.Target.Value != nil {
697 replicaCountProposal, usageProposal, timestampProposal, err := a.replicaCalc.GetExternalMetricReplicas(specReplicas, metricSpec.External.Target.Value.MilliValue(), metricSpec.External.Metric.Name, hpa.Namespace, metricSpec.External.Metric.Selector, selector)
698 if err != nil {
699 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
700 return 0, time.Time{}, "", condition, fmt.Errorf("failed to get external metric %s: %v", metricSpec.External.Metric.Name, err)
701 }
702 *status = autoscalingv2.MetricStatus{
703 Type: autoscalingv2.ExternalMetricSourceType,
704 External: &autoscalingv2.ExternalMetricStatus{
705 Metric: autoscalingv2.MetricIdentifier{
706 Name: metricSpec.External.Metric.Name,
707 Selector: metricSpec.External.Metric.Selector,
708 },
709 Current: autoscalingv2.MetricValueStatus{
710 Value: resource.NewMilliQuantity(usageProposal, resource.DecimalSI),
711 },
712 },
713 }
714 return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.External.Metric.Name, metricSpec.External.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
715 }
716 errMsg := "invalid external metric source: neither a value target nor an average value target was set"
717 err = fmt.Errorf(errMsg)
718 condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetExternalMetric", err)
719 return 0, time.Time{}, "", condition, fmt.Errorf(errMsg)
720 }
721
722 func (a *HorizontalController) recordInitialRecommendation(currentReplicas int32, key string) {
723 a.recommendationsLock.Lock()
724 defer a.recommendationsLock.Unlock()
725 if a.recommendations[key] == nil {
726 a.recommendations[key] = []timestampedRecommendation{{currentReplicas, time.Now()}}
727 }
728 }
729
730 func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) (retErr error) {
731
732 actionLabel := monitor.ActionLabelNone
733 start := time.Now()
734 defer func() {
735 errorLabel := monitor.ErrorLabelNone
736 if retErr != nil {
737
738 errorLabel = monitor.ErrorLabelInternal
739 }
740 if errors.Is(retErr, errSpec) {
741 errorLabel = monitor.ErrorLabelSpec
742 }
743
744 a.monitor.ObserveReconciliationResult(actionLabel, errorLabel, time.Since(start))
745 }()
746
747
748 hpa := hpaShared.DeepCopy()
749 hpaStatusOriginal := hpa.Status.DeepCopy()
750
751 reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
752
753 targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
754 if err != nil {
755 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
756 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
757 if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
758 utilruntime.HandleError(err)
759 }
760 return fmt.Errorf("invalid API version in scale target reference: %v%w", err, errSpec)
761 }
762
763 targetGK := schema.GroupKind{
764 Group: targetGV.Group,
765 Kind: hpa.Spec.ScaleTargetRef.Kind,
766 }
767
768 mappings, err := a.mapper.RESTMappings(targetGK)
769 if err != nil {
770 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
771 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
772 if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
773 utilruntime.HandleError(err)
774 }
775 return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
776 }
777
778 scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
779 if err != nil {
780 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
781 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
782 if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
783 utilruntime.HandleError(err)
784 }
785 return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
786 }
787 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
788 currentReplicas := scale.Spec.Replicas
789 a.recordInitialRecommendation(currentReplicas, key)
790
791 var (
792 metricStatuses []autoscalingv2.MetricStatus
793 metricDesiredReplicas int32
794 metricName string
795 )
796
797 desiredReplicas := int32(0)
798 rescaleReason := ""
799
800 var minReplicas int32
801
802 if hpa.Spec.MinReplicas != nil {
803 minReplicas = *hpa.Spec.MinReplicas
804 } else {
805
806 minReplicas = 1
807 }
808
809 rescale := true
810 logger := klog.FromContext(ctx)
811
812 if currentReplicas == 0 && minReplicas != 0 {
813
814 desiredReplicas = 0
815 rescale = false
816 setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
817 } else if currentReplicas > hpa.Spec.MaxReplicas {
818 rescaleReason = "Current number of replicas above Spec.MaxReplicas"
819 desiredReplicas = hpa.Spec.MaxReplicas
820 } else if currentReplicas < minReplicas {
821 rescaleReason = "Current number of replicas below Spec.MinReplicas"
822 desiredReplicas = minReplicas
823 } else {
824 var metricTimestamp time.Time
825 metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
826
827
828 if err != nil && metricDesiredReplicas == -1 {
829 a.setCurrentReplicasAndMetricsInStatus(hpa, currentReplicas, metricStatuses)
830 if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
831 utilruntime.HandleError(err)
832 }
833 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
834 return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
835 }
836 if err != nil {
837
838 retErr = err
839 }
840
841 logger.V(4).Info("Proposing desired replicas",
842 "desiredReplicas", metricDesiredReplicas,
843 "metric", metricName,
844 "timestamp", metricTimestamp,
845 "scaleTarget", reference)
846
847 rescaleMetric := ""
848 if metricDesiredReplicas > desiredReplicas {
849 desiredReplicas = metricDesiredReplicas
850 rescaleMetric = metricName
851 }
852 if desiredReplicas > currentReplicas {
853 rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
854 }
855 if desiredReplicas < currentReplicas {
856 rescaleReason = "All metrics below target"
857 }
858 if hpa.Spec.Behavior == nil {
859 desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
860 } else {
861 desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
862 }
863 rescale = desiredReplicas != currentReplicas
864 }
865
866 if rescale {
867 scale.Spec.Replicas = desiredReplicas
868 _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
869 if err != nil {
870 a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
871 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
872 a.setCurrentReplicasAndMetricsInStatus(hpa, currentReplicas, metricStatuses)
873 if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
874 utilruntime.HandleError(err)
875 }
876 return fmt.Errorf("failed to rescale %s: %v", reference, err)
877 }
878 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
879 a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
880 a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
881 logger.Info("Successfully rescaled",
882 "HPA", klog.KObj(hpa),
883 "currentReplicas", currentReplicas,
884 "desiredReplicas", desiredReplicas,
885 "reason", rescaleReason)
886
887 if desiredReplicas > currentReplicas {
888 actionLabel = monitor.ActionLabelScaleUp
889 } else {
890 actionLabel = monitor.ActionLabelScaleDown
891 }
892 } else {
893 logger.V(4).Info("Decided not to scale",
894 "scaleTarget", reference,
895 "desiredReplicas", desiredReplicas,
896 "lastScaleTime", hpa.Status.LastScaleTime)
897 desiredReplicas = currentReplicas
898 }
899
900 a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
901
902 err = a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
903 if err != nil {
904
905 return err
906 }
907
908 return retErr
909 }
910
911
912
913
914 func (a *HorizontalController) stabilizeRecommendation(key string, prenormalizedDesiredReplicas int32) int32 {
915 maxRecommendation := prenormalizedDesiredReplicas
916 foundOldSample := false
917 oldSampleIndex := 0
918 cutoff := time.Now().Add(-a.downscaleStabilisationWindow)
919
920 a.recommendationsLock.Lock()
921 defer a.recommendationsLock.Unlock()
922 for i, rec := range a.recommendations[key] {
923 if rec.timestamp.Before(cutoff) {
924 foundOldSample = true
925 oldSampleIndex = i
926 } else if rec.recommendation > maxRecommendation {
927 maxRecommendation = rec.recommendation
928 }
929 }
930 if foundOldSample {
931 a.recommendations[key][oldSampleIndex] = timestampedRecommendation{prenormalizedDesiredReplicas, time.Now()}
932 } else {
933 a.recommendations[key] = append(a.recommendations[key], timestampedRecommendation{prenormalizedDesiredReplicas, time.Now()})
934 }
935 return maxRecommendation
936 }
937
938
939
940 func (a *HorizontalController) normalizeDesiredReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas int32, prenormalizedDesiredReplicas int32, minReplicas int32) int32 {
941 stabilizedRecommendation := a.stabilizeRecommendation(key, prenormalizedDesiredReplicas)
942 if stabilizedRecommendation != prenormalizedDesiredReplicas {
943 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ScaleDownStabilized", "recent recommendations were higher than current one, applying the highest recent recommendation")
944 } else {
945 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
946 }
947
948 desiredReplicas, condition, reason := convertDesiredReplicasWithRules(currentReplicas, stabilizedRecommendation, minReplicas, hpa.Spec.MaxReplicas)
949
950 if desiredReplicas == stabilizedRecommendation {
951 setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, condition, reason)
952 } else {
953 setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, condition, reason)
954 }
955
956 return desiredReplicas
957 }
958
959
960 type NormalizationArg struct {
961 Key string
962 ScaleUpBehavior *autoscalingv2.HPAScalingRules
963 ScaleDownBehavior *autoscalingv2.HPAScalingRules
964 MinReplicas int32
965 MaxReplicas int32
966 CurrentReplicas int32
967 DesiredReplicas int32
968 }
969
970
971
972
973
974
975 func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 {
976 a.maybeInitScaleDownStabilizationWindow(hpa)
977 normalizationArg := NormalizationArg{
978 Key: key,
979 ScaleUpBehavior: hpa.Spec.Behavior.ScaleUp,
980 ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown,
981 MinReplicas: minReplicas,
982 MaxReplicas: hpa.Spec.MaxReplicas,
983 CurrentReplicas: currentReplicas,
984 DesiredReplicas: prenormalizedDesiredReplicas}
985 stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg)
986 normalizationArg.DesiredReplicas = stabilizedRecommendation
987 if stabilizedRecommendation != prenormalizedDesiredReplicas {
988
989 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, reason, message)
990 } else {
991 setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "ReadyForNewScale", "recommended size matches current size")
992 }
993 desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg)
994 if desiredReplicas == stabilizedRecommendation {
995 setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionFalse, reason, message)
996 } else {
997 setCondition(hpa, autoscalingv2.ScalingLimited, v1.ConditionTrue, reason, message)
998 }
999
1000 return desiredReplicas
1001 }
1002
1003 func (a *HorizontalController) maybeInitScaleDownStabilizationWindow(hpa *autoscalingv2.HorizontalPodAutoscaler) {
1004 behavior := hpa.Spec.Behavior
1005 if behavior != nil && behavior.ScaleDown != nil && behavior.ScaleDown.StabilizationWindowSeconds == nil {
1006 stabilizationWindowSeconds := (int32)(a.downscaleStabilisationWindow.Seconds())
1007 hpa.Spec.Behavior.ScaleDown.StabilizationWindowSeconds = &stabilizationWindowSeconds
1008 }
1009 }
1010
1011
1012 func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 {
1013 period := time.Second * time.Duration(periodSeconds)
1014 cutoff := time.Now().Add(-period)
1015 var replicas int32
1016 for _, rec := range scaleEvents {
1017 if rec.timestamp.After(cutoff) {
1018 replicas += rec.replicaChange
1019 }
1020 }
1021 return replicas
1022
1023 }
1024
1025 func (a *HorizontalController) getUnableComputeReplicaCountCondition(hpa runtime.Object, reason string, err error) (condition autoscalingv2.HorizontalPodAutoscalerCondition) {
1026 a.eventRecorder.Event(hpa, v1.EventTypeWarning, reason, err.Error())
1027 return autoscalingv2.HorizontalPodAutoscalerCondition{
1028 Type: autoscalingv2.ScalingActive,
1029 Status: v1.ConditionFalse,
1030 Reason: reason,
1031 Message: fmt.Sprintf("the HPA was unable to compute the replica count: %v", err),
1032 }
1033 }
1034
1035
1036
1037 func (a *HorizontalController) storeScaleEvent(behavior *autoscalingv2.HorizontalPodAutoscalerBehavior, key string, prevReplicas, newReplicas int32) {
1038 if behavior == nil {
1039 return
1040 }
1041 var oldSampleIndex int
1042 var longestPolicyPeriod int32
1043 foundOldSample := false
1044 if newReplicas > prevReplicas {
1045 longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleUp)
1046
1047 a.scaleUpEventsLock.Lock()
1048 defer a.scaleUpEventsLock.Unlock()
1049 markScaleEventsOutdated(a.scaleUpEvents[key], longestPolicyPeriod)
1050 replicaChange := newReplicas - prevReplicas
1051 for i, event := range a.scaleUpEvents[key] {
1052 if event.outdated {
1053 foundOldSample = true
1054 oldSampleIndex = i
1055 }
1056 }
1057 newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
1058 if foundOldSample {
1059 a.scaleUpEvents[key][oldSampleIndex] = newEvent
1060 } else {
1061 a.scaleUpEvents[key] = append(a.scaleUpEvents[key], newEvent)
1062 }
1063 } else {
1064 longestPolicyPeriod = getLongestPolicyPeriod(behavior.ScaleDown)
1065
1066 a.scaleDownEventsLock.Lock()
1067 defer a.scaleDownEventsLock.Unlock()
1068 markScaleEventsOutdated(a.scaleDownEvents[key], longestPolicyPeriod)
1069 replicaChange := prevReplicas - newReplicas
1070 for i, event := range a.scaleDownEvents[key] {
1071 if event.outdated {
1072 foundOldSample = true
1073 oldSampleIndex = i
1074 }
1075 }
1076 newEvent := timestampedScaleEvent{replicaChange, time.Now(), false}
1077 if foundOldSample {
1078 a.scaleDownEvents[key][oldSampleIndex] = newEvent
1079 } else {
1080 a.scaleDownEvents[key] = append(a.scaleDownEvents[key], newEvent)
1081 }
1082 }
1083 }
1084
1085
1086
1087
1088 func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) {
1089 now := time.Now()
1090
1091 foundOldSample := false
1092 oldSampleIndex := 0
1093
1094 upRecommendation := args.DesiredReplicas
1095 upDelaySeconds := *args.ScaleUpBehavior.StabilizationWindowSeconds
1096 upCutoff := now.Add(-time.Second * time.Duration(upDelaySeconds))
1097
1098 downRecommendation := args.DesiredReplicas
1099 downDelaySeconds := *args.ScaleDownBehavior.StabilizationWindowSeconds
1100 downCutoff := now.Add(-time.Second * time.Duration(downDelaySeconds))
1101
1102
1103 a.recommendationsLock.Lock()
1104 defer a.recommendationsLock.Unlock()
1105 for i, rec := range a.recommendations[args.Key] {
1106 if rec.timestamp.After(upCutoff) {
1107 upRecommendation = min(rec.recommendation, upRecommendation)
1108 }
1109 if rec.timestamp.After(downCutoff) {
1110 downRecommendation = max(rec.recommendation, downRecommendation)
1111 }
1112 if rec.timestamp.Before(upCutoff) && rec.timestamp.Before(downCutoff) {
1113 foundOldSample = true
1114 oldSampleIndex = i
1115 }
1116 }
1117
1118
1119 recommendation := args.CurrentReplicas
1120 if recommendation < upRecommendation {
1121 recommendation = upRecommendation
1122 }
1123 if recommendation > downRecommendation {
1124 recommendation = downRecommendation
1125 }
1126
1127
1128 if foundOldSample {
1129 a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()}
1130 } else {
1131 a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()})
1132 }
1133
1134
1135 var reason, message string
1136 if args.DesiredReplicas >= args.CurrentReplicas {
1137 reason = "ScaleUpStabilized"
1138 message = "recent recommendations were lower than current one, applying the lowest recent recommendation"
1139 } else {
1140 reason = "ScaleDownStabilized"
1141 message = "recent recommendations were higher than current one, applying the highest recent recommendation"
1142 }
1143 return recommendation, reason, message
1144 }
1145
1146
1147
1148 func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) {
1149 var possibleLimitingReason, possibleLimitingMessage string
1150
1151 if args.DesiredReplicas > args.CurrentReplicas {
1152 a.scaleUpEventsLock.RLock()
1153 defer a.scaleUpEventsLock.RUnlock()
1154 a.scaleDownEventsLock.RLock()
1155 defer a.scaleDownEventsLock.RUnlock()
1156 scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleUpBehavior)
1157
1158 if scaleUpLimit < args.CurrentReplicas {
1159
1160 scaleUpLimit = args.CurrentReplicas
1161 }
1162 maximumAllowedReplicas := args.MaxReplicas
1163 if maximumAllowedReplicas > scaleUpLimit {
1164 maximumAllowedReplicas = scaleUpLimit
1165 possibleLimitingReason = "ScaleUpLimit"
1166 possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate"
1167 } else {
1168 possibleLimitingReason = "TooManyReplicas"
1169 possibleLimitingMessage = "the desired replica count is more than the maximum replica count"
1170 }
1171 if args.DesiredReplicas > maximumAllowedReplicas {
1172 return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
1173 }
1174 } else if args.DesiredReplicas < args.CurrentReplicas {
1175 a.scaleUpEventsLock.RLock()
1176 defer a.scaleUpEventsLock.RUnlock()
1177 a.scaleDownEventsLock.RLock()
1178 defer a.scaleDownEventsLock.RUnlock()
1179 scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleUpEvents[args.Key], a.scaleDownEvents[args.Key], args.ScaleDownBehavior)
1180
1181 if scaleDownLimit > args.CurrentReplicas {
1182
1183 scaleDownLimit = args.CurrentReplicas
1184 }
1185 minimumAllowedReplicas := args.MinReplicas
1186 if minimumAllowedReplicas < scaleDownLimit {
1187 minimumAllowedReplicas = scaleDownLimit
1188 possibleLimitingReason = "ScaleDownLimit"
1189 possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate"
1190 } else {
1191 possibleLimitingMessage = "the desired replica count is less than the minimum replica count"
1192 possibleLimitingReason = "TooFewReplicas"
1193 }
1194 if args.DesiredReplicas < minimumAllowedReplicas {
1195 return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage
1196 }
1197 }
1198 return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
1199 }
1200
1201
1202 func convertDesiredReplicasWithRules(currentReplicas, desiredReplicas, hpaMinReplicas, hpaMaxReplicas int32) (int32, string, string) {
1203
1204 var minimumAllowedReplicas int32
1205 var maximumAllowedReplicas int32
1206
1207 var possibleLimitingCondition string
1208 var possibleLimitingReason string
1209
1210 minimumAllowedReplicas = hpaMinReplicas
1211
1212
1213
1214 scaleUpLimit := calculateScaleUpLimit(currentReplicas)
1215
1216 if hpaMaxReplicas > scaleUpLimit {
1217 maximumAllowedReplicas = scaleUpLimit
1218 possibleLimitingCondition = "ScaleUpLimit"
1219 possibleLimitingReason = "the desired replica count is increasing faster than the maximum scale rate"
1220 } else {
1221 maximumAllowedReplicas = hpaMaxReplicas
1222 possibleLimitingCondition = "TooManyReplicas"
1223 possibleLimitingReason = "the desired replica count is more than the maximum replica count"
1224 }
1225
1226 if desiredReplicas < minimumAllowedReplicas {
1227 possibleLimitingCondition = "TooFewReplicas"
1228 possibleLimitingReason = "the desired replica count is less than the minimum replica count"
1229
1230 return minimumAllowedReplicas, possibleLimitingCondition, possibleLimitingReason
1231 } else if desiredReplicas > maximumAllowedReplicas {
1232 return maximumAllowedReplicas, possibleLimitingCondition, possibleLimitingReason
1233 }
1234
1235 return desiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range"
1236 }
1237
1238 func calculateScaleUpLimit(currentReplicas int32) int32 {
1239 return int32(math.Max(scaleUpLimitFactor*float64(currentReplicas), scaleUpLimitMinimum))
1240 }
1241
1242
1243 func markScaleEventsOutdated(scaleEvents []timestampedScaleEvent, longestPolicyPeriod int32) {
1244 period := time.Second * time.Duration(longestPolicyPeriod)
1245 cutoff := time.Now().Add(-period)
1246 for i, event := range scaleEvents {
1247 if event.timestamp.Before(cutoff) {
1248
1249 scaleEvents[i].outdated = true
1250 }
1251 }
1252 }
1253
1254 func getLongestPolicyPeriod(scalingRules *autoscalingv2.HPAScalingRules) int32 {
1255 var longestPolicyPeriod int32
1256 for _, policy := range scalingRules.Policies {
1257 if policy.PeriodSeconds > longestPolicyPeriod {
1258 longestPolicyPeriod = policy.PeriodSeconds
1259 }
1260 }
1261 return longestPolicyPeriod
1262 }
1263
1264
1265 func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleUpEvents, scaleDownEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
1266 var result int32
1267 var proposed int32
1268 var selectPolicyFn func(int32, int32) int32
1269 if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
1270 return currentReplicas
1271 } else if *scalingRules.SelectPolicy == autoscalingv2.MinChangePolicySelect {
1272 result = math.MaxInt32
1273 selectPolicyFn = min
1274 } else {
1275 result = math.MinInt32
1276 selectPolicyFn = max
1277 }
1278 for _, policy := range scalingRules.Policies {
1279 replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleUpEvents)
1280 replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleDownEvents)
1281 periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod + replicasDeletedInCurrentPeriod
1282 if policy.Type == autoscalingv2.PodsScalingPolicy {
1283 proposed = periodStartReplicas + policy.Value
1284 } else if policy.Type == autoscalingv2.PercentScalingPolicy {
1285
1286 proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100)))
1287 }
1288 result = selectPolicyFn(result, proposed)
1289 }
1290 return result
1291 }
1292
1293
1294 func calculateScaleDownLimitWithBehaviors(currentReplicas int32, scaleUpEvents, scaleDownEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 {
1295 var result int32
1296 var proposed int32
1297 var selectPolicyFn func(int32, int32) int32
1298 if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect {
1299 return currentReplicas
1300 } else if *scalingRules.SelectPolicy == autoscalingv2.MinChangePolicySelect {
1301 result = math.MinInt32
1302 selectPolicyFn = max
1303 } else {
1304 result = math.MaxInt32
1305 selectPolicyFn = min
1306 }
1307 for _, policy := range scalingRules.Policies {
1308 replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleUpEvents)
1309 replicasDeletedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleDownEvents)
1310 periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod + replicasDeletedInCurrentPeriod
1311 if policy.Type == autoscalingv2.PodsScalingPolicy {
1312 proposed = periodStartReplicas - policy.Value
1313 } else if policy.Type == autoscalingv2.PercentScalingPolicy {
1314 proposed = int32(float64(periodStartReplicas) * (1 - float64(policy.Value)/100))
1315 }
1316 result = selectPolicyFn(result, proposed)
1317 }
1318 return result
1319 }
1320
1321
1322
1323
1324
1325
1326 func (a *HorizontalController) scaleForResourceMappings(ctx context.Context, namespace, name string, mappings []*apimeta.RESTMapping) (*autoscalingv1.Scale, schema.GroupResource, error) {
1327 var firstErr error
1328 for i, mapping := range mappings {
1329 targetGR := mapping.Resource.GroupResource()
1330 scale, err := a.scaleNamespacer.Scales(namespace).Get(ctx, targetGR, name, metav1.GetOptions{})
1331 if err == nil {
1332 return scale, targetGR, nil
1333 }
1334
1335
1336
1337 if i == 0 {
1338 firstErr = err
1339 }
1340 }
1341
1342
1343 if firstErr == nil {
1344 firstErr = fmt.Errorf("unrecognized resource")
1345 }
1346
1347 return nil, schema.GroupResource{}, firstErr
1348 }
1349
1350
1351 func (a *HorizontalController) setCurrentReplicasAndMetricsInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32, metricStatuses []autoscalingv2.MetricStatus) {
1352 a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, metricStatuses, false)
1353 }
1354
1355
1356
1357 func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
1358 hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
1359 CurrentReplicas: currentReplicas,
1360 DesiredReplicas: desiredReplicas,
1361 LastScaleTime: hpa.Status.LastScaleTime,
1362 CurrentMetrics: metricStatuses,
1363 Conditions: hpa.Status.Conditions,
1364 }
1365
1366 if rescale {
1367 now := metav1.NewTime(time.Now())
1368 hpa.Status.LastScaleTime = &now
1369 }
1370 }
1371
1372
1373 func (a *HorizontalController) updateStatusIfNeeded(ctx context.Context, oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
1374
1375 if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
1376 return nil
1377 }
1378 return a.updateStatus(ctx, newHPA)
1379 }
1380
1381
1382 func (a *HorizontalController) updateStatus(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler) error {
1383 _, err := a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(ctx, hpa, metav1.UpdateOptions{})
1384 if err != nil {
1385 a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedUpdateStatus", err.Error())
1386 return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
1387 }
1388 logger := klog.FromContext(ctx)
1389 logger.V(2).Info("Successfully updated status", "HPA", klog.KObj(hpa))
1390 return nil
1391 }
1392
1393
1394
1395
1396 func setCondition(hpa *autoscalingv2.HorizontalPodAutoscaler, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) {
1397 hpa.Status.Conditions = setConditionInList(hpa.Status.Conditions, conditionType, status, reason, message, args...)
1398 }
1399
1400
1401
1402
1403 func setConditionInList(inputList []autoscalingv2.HorizontalPodAutoscalerCondition, conditionType autoscalingv2.HorizontalPodAutoscalerConditionType, status v1.ConditionStatus, reason, message string, args ...interface{}) []autoscalingv2.HorizontalPodAutoscalerCondition {
1404 resList := inputList
1405 var existingCond *autoscalingv2.HorizontalPodAutoscalerCondition
1406 for i, condition := range resList {
1407 if condition.Type == conditionType {
1408
1409 existingCond = &resList[i]
1410 break
1411 }
1412 }
1413
1414 if existingCond == nil {
1415 resList = append(resList, autoscalingv2.HorizontalPodAutoscalerCondition{
1416 Type: conditionType,
1417 })
1418 existingCond = &resList[len(resList)-1]
1419 }
1420
1421 if existingCond.Status != status {
1422 existingCond.LastTransitionTime = metav1.Now()
1423 }
1424
1425 existingCond.Status = status
1426 existingCond.Reason = reason
1427 existingCond.Message = fmt.Sprintf(message, args...)
1428
1429 return resList
1430 }
1431
1432 func max(a, b int32) int32 {
1433 if a >= b {
1434 return a
1435 }
1436 return b
1437 }
1438
1439 func min(a, b int32) int32 {
1440 if a <= b {
1441 return a
1442 }
1443 return b
1444 }
1445
View as plain text