1
16
17 package podautoscaler
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "time"
24
25 autoscaling "k8s.io/api/autoscaling/v2"
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/labels"
29 "k8s.io/apimachinery/pkg/util/sets"
30 corelisters "k8s.io/client-go/listers/core/v1"
31 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
32 metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
33 )
34
35 const (
36
37
38 defaultTestingTolerance = 0.1
39 defaultTestingCPUInitializationPeriod = 2 * time.Minute
40 defaultTestingDelayOfInitialReadinessStatus = 10 * time.Second
41 )
42
43
44 type ReplicaCalculator struct {
45 metricsClient metricsclient.MetricsClient
46 podLister corelisters.PodLister
47 tolerance float64
48 cpuInitializationPeriod time.Duration
49 delayOfInitialReadinessStatus time.Duration
50 }
51
52
53 func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podLister corelisters.PodLister, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator {
54 return &ReplicaCalculator{
55 metricsClient: metricsClient,
56 podLister: podLister,
57 tolerance: tolerance,
58 cpuInitializationPeriod: cpuInitializationPeriod,
59 delayOfInitialReadinessStatus: delayOfInitialReadinessStatus,
60 }
61 }
62
63
64
65 func (c *ReplicaCalculator) GetResourceReplicas(ctx context.Context, currentReplicas int32, targetUtilization int32, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, utilization int32, rawUtilization int64, timestamp time.Time, err error) {
66 metrics, timestamp, err := c.metricsClient.GetResourceMetric(ctx, resource, namespace, selector, container)
67 if err != nil {
68 return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
69 }
70 podList, err := c.podLister.Pods(namespace).List(selector)
71 if err != nil {
72 return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
73 }
74 if len(podList) == 0 {
75 return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count")
76 }
77
78 readyPodCount, unreadyPods, missingPods, ignoredPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
79 removeMetricsForPods(metrics, ignoredPods)
80 removeMetricsForPods(metrics, unreadyPods)
81 if len(metrics) == 0 {
82 return 0, 0, 0, time.Time{}, fmt.Errorf("did not receive metrics for targeted pods (pods might be unready)")
83 }
84
85 requests, err := calculatePodRequests(podList, container, resource)
86 if err != nil {
87 return 0, 0, 0, time.Time{}, err
88 }
89
90 usageRatio, utilization, rawUtilization, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
91 if err != nil {
92 return 0, 0, 0, time.Time{}, err
93 }
94
95 scaleUpWithUnready := len(unreadyPods) > 0 && usageRatio > 1.0
96 if !scaleUpWithUnready && len(missingPods) == 0 {
97 if math.Abs(1.0-usageRatio) <= c.tolerance {
98
99 return currentReplicas, utilization, rawUtilization, timestamp, nil
100 }
101
102
103 return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, rawUtilization, timestamp, nil
104 }
105
106 if len(missingPods) > 0 {
107 if usageRatio < 1.0 {
108
109
110 fallbackUtilization := int64(max(100, targetUtilization))
111 for podName := range missingPods {
112 metrics[podName] = metricsclient.PodMetric{Value: requests[podName] * fallbackUtilization / 100}
113 }
114 } else if usageRatio > 1.0 {
115
116 for podName := range missingPods {
117 metrics[podName] = metricsclient.PodMetric{Value: 0}
118 }
119 }
120 }
121
122 if scaleUpWithUnready {
123
124 for podName := range unreadyPods {
125 metrics[podName] = metricsclient.PodMetric{Value: 0}
126 }
127 }
128
129
130 newUsageRatio, _, _, err := metricsclient.GetResourceUtilizationRatio(metrics, requests, targetUtilization)
131 if err != nil {
132 return 0, utilization, rawUtilization, time.Time{}, err
133 }
134
135 if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
136
137
138 return currentReplicas, utilization, rawUtilization, timestamp, nil
139 }
140
141 newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics))))
142 if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) {
143
144 return currentReplicas, utilization, rawUtilization, timestamp, nil
145 }
146
147
148
149 return newReplicas, utilization, rawUtilization, timestamp, nil
150 }
151
152
153
154 func (c *ReplicaCalculator) GetRawResourceReplicas(ctx context.Context, currentReplicas int32, targetUsage int64, resource v1.ResourceName, namespace string, selector labels.Selector, container string) (replicaCount int32, usage int64, timestamp time.Time, err error) {
155 metrics, timestamp, err := c.metricsClient.GetResourceMetric(ctx, resource, namespace, selector, container)
156 if err != nil {
157 return 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err)
158 }
159
160 replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, namespace, selector, resource)
161 return replicaCount, usage, timestamp, err
162 }
163
164
165
166
167 func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
168 metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)
169 if err != nil {
170 return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)
171 }
172
173 replicaCount, usage, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUsage, namespace, selector, v1.ResourceName(""))
174 return replicaCount, usage, timestamp, err
175 }
176
177
178 func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUsage int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, usage int64, err error) {
179
180 podList, err := c.podLister.Pods(namespace).List(selector)
181 if err != nil {
182 return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
183 }
184
185 if len(podList) == 0 {
186 return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count")
187 }
188
189 readyPodCount, unreadyPods, missingPods, ignoredPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)
190 removeMetricsForPods(metrics, ignoredPods)
191 removeMetricsForPods(metrics, unreadyPods)
192
193 if len(metrics) == 0 {
194 return 0, 0, fmt.Errorf("did not receive metrics for targeted pods (pods might be unready)")
195 }
196
197 usageRatio, usage := metricsclient.GetMetricUsageRatio(metrics, targetUsage)
198
199 scaleUpWithUnready := len(unreadyPods) > 0 && usageRatio > 1.0
200
201 if !scaleUpWithUnready && len(missingPods) == 0 {
202 if math.Abs(1.0-usageRatio) <= c.tolerance {
203
204 return currentReplicas, usage, nil
205 }
206
207
208 return int32(math.Ceil(usageRatio * float64(readyPodCount))), usage, nil
209 }
210
211 if len(missingPods) > 0 {
212 if usageRatio < 1.0 {
213
214 for podName := range missingPods {
215 metrics[podName] = metricsclient.PodMetric{Value: targetUsage}
216 }
217 } else if usageRatio > 1.0 {
218
219 for podName := range missingPods {
220 metrics[podName] = metricsclient.PodMetric{Value: 0}
221 }
222 }
223 }
224
225 if scaleUpWithUnready {
226
227 for podName := range unreadyPods {
228 metrics[podName] = metricsclient.PodMetric{Value: 0}
229 }
230 }
231
232
233 newUsageRatio, _ := metricsclient.GetMetricUsageRatio(metrics, targetUsage)
234
235 if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) {
236
237
238 return currentReplicas, usage, nil
239 }
240
241 newReplicas := int32(math.Ceil(newUsageRatio * float64(len(metrics))))
242 if (newUsageRatio < 1.0 && newReplicas > currentReplicas) || (newUsageRatio > 1.0 && newReplicas < currentReplicas) {
243
244 return currentReplicas, usage, nil
245 }
246
247
248
249 return newReplicas, usage, nil
250 }
251
252
253
254 func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUsage int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
255 usage, _, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
256 if err != nil {
257 return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
258 }
259
260 usageRatio := float64(usage) / float64(targetUsage)
261 replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, namespace, selector)
262 return replicaCount, usage, timestamp, err
263 }
264
265
266
267 func (c *ReplicaCalculator) getUsageRatioReplicaCount(currentReplicas int32, usageRatio float64, namespace string, selector labels.Selector) (replicaCount int32, timestamp time.Time, err error) {
268 if currentReplicas != 0 {
269 if math.Abs(1.0-usageRatio) <= c.tolerance {
270
271 return currentReplicas, timestamp, nil
272 }
273 readyPodCount := int64(0)
274 readyPodCount, err = c.getReadyPodsCount(namespace, selector)
275 if err != nil {
276 return 0, time.Time{}, fmt.Errorf("unable to calculate ready pods: %s", err)
277 }
278 replicaCount = int32(math.Ceil(usageRatio * float64(readyPodCount)))
279 } else {
280
281 replicaCount = int32(math.Ceil(usageRatio))
282 }
283
284 return replicaCount, timestamp, err
285 }
286
287
288
289 func (c *ReplicaCalculator) GetObjectPerPodMetricReplicas(statusReplicas int32, targetAverageUsage int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
290 usage, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
291 if err != nil {
292 return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
293 }
294
295 replicaCount = statusReplicas
296 usageRatio := float64(usage) / (float64(targetAverageUsage) * float64(replicaCount))
297 if math.Abs(1.0-usageRatio) > c.tolerance {
298
299 replicaCount = int32(math.Ceil(float64(usage) / float64(targetAverageUsage)))
300 }
301 usage = int64(math.Ceil(float64(usage) / float64(statusReplicas)))
302 return replicaCount, usage, timestamp, nil
303 }
304
305
306
307
308 func (c *ReplicaCalculator) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) {
309 podList, err := c.podLister.Pods(namespace).List(selector)
310 if err != nil {
311 return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err)
312 }
313
314 if len(podList) == 0 {
315 return 0, fmt.Errorf("no pods returned by selector while calculating replica count")
316 }
317
318 readyPodCount := 0
319
320 for _, pod := range podList {
321 if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) {
322 readyPodCount++
323 }
324 }
325
326 return int64(readyPodCount), nil
327 }
328
329
330
331
332 func (c *ReplicaCalculator) GetExternalMetricReplicas(currentReplicas int32, targetUsage int64, metricName, namespace string, metricSelector *metav1.LabelSelector, podSelector labels.Selector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
333 metricLabelSelector, err := metav1.LabelSelectorAsSelector(metricSelector)
334 if err != nil {
335 return 0, 0, time.Time{}, err
336 }
337 metrics, _, err := c.metricsClient.GetExternalMetric(metricName, namespace, metricLabelSelector)
338 if err != nil {
339 return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, metricSelector, err)
340 }
341 usage = 0
342 for _, val := range metrics {
343 usage = usage + val
344 }
345
346 usageRatio := float64(usage) / float64(targetUsage)
347 replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, namespace, podSelector)
348 return replicaCount, usage, timestamp, err
349 }
350
351
352
353
354 func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(statusReplicas int32, targetUsagePerPod int64, metricName, namespace string, metricSelector *metav1.LabelSelector) (replicaCount int32, usage int64, timestamp time.Time, err error) {
355 metricLabelSelector, err := metav1.LabelSelectorAsSelector(metricSelector)
356 if err != nil {
357 return 0, 0, time.Time{}, err
358 }
359 metrics, timestamp, err := c.metricsClient.GetExternalMetric(metricName, namespace, metricLabelSelector)
360 if err != nil {
361 return 0, 0, time.Time{}, fmt.Errorf("unable to get external metric %s/%s/%+v: %s", namespace, metricName, metricSelector, err)
362 }
363 usage = 0
364 for _, val := range metrics {
365 usage = usage + val
366 }
367
368 replicaCount = statusReplicas
369 usageRatio := float64(usage) / (float64(targetUsagePerPod) * float64(replicaCount))
370 if math.Abs(1.0-usageRatio) > c.tolerance {
371
372 replicaCount = int32(math.Ceil(float64(usage) / float64(targetUsagePerPod)))
373 }
374 usage = int64(math.Ceil(float64(usage) / float64(statusReplicas)))
375 return replicaCount, usage, timestamp, nil
376 }
377
378 func groupPods(pods []*v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, unreadyPods, missingPods, ignoredPods sets.String) {
379 missingPods = sets.NewString()
380 unreadyPods = sets.NewString()
381 ignoredPods = sets.NewString()
382 for _, pod := range pods {
383 if pod.DeletionTimestamp != nil || pod.Status.Phase == v1.PodFailed {
384 ignoredPods.Insert(pod.Name)
385 continue
386 }
387
388 if pod.Status.Phase == v1.PodPending {
389 unreadyPods.Insert(pod.Name)
390 continue
391 }
392
393 metric, found := metrics[pod.Name]
394 if !found {
395 missingPods.Insert(pod.Name)
396 continue
397 }
398
399 if resource == v1.ResourceCPU {
400 var unready bool
401 _, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady)
402 if condition == nil || pod.Status.StartTime == nil {
403 unready = true
404 } else {
405
406 if pod.Status.StartTime.Add(cpuInitializationPeriod).After(time.Now()) {
407
408 unready = condition.Status == v1.ConditionFalse || metric.Timestamp.Before(condition.LastTransitionTime.Time.Add(metric.Window))
409 } else {
410
411 unready = condition.Status == v1.ConditionFalse && pod.Status.StartTime.Add(delayOfInitialReadinessStatus).After(condition.LastTransitionTime.Time)
412 }
413 }
414 if unready {
415 unreadyPods.Insert(pod.Name)
416 continue
417 }
418 }
419 readyPodCount++
420 }
421 return
422 }
423
424 func calculatePodRequests(pods []*v1.Pod, container string, resource v1.ResourceName) (map[string]int64, error) {
425 requests := make(map[string]int64, len(pods))
426 for _, pod := range pods {
427 podSum := int64(0)
428
429 containers := append([]v1.Container{}, pod.Spec.Containers...)
430 for _, c := range pod.Spec.InitContainers {
431 if c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways {
432 containers = append(containers, c)
433 }
434 }
435 for _, c := range containers {
436 if container == "" || container == c.Name {
437 if containerRequest, ok := c.Resources.Requests[resource]; ok {
438 podSum += containerRequest.MilliValue()
439 } else {
440 return nil, fmt.Errorf("missing request for %s in container %s of Pod %s", resource, c.Name, pod.ObjectMeta.Name)
441 }
442 }
443 }
444 requests[pod.Name] = podSum
445 }
446 return requests, nil
447 }
448
449 func removeMetricsForPods(metrics metricsclient.PodMetricsInfo, pods sets.String) {
450 for _, pod := range pods.UnsortedList() {
451 delete(metrics, pod)
452 }
453 }
454
View as plain text