1
16
17 package scale
18
19 import (
20 "context"
21 "fmt"
22 "strconv"
23 "time"
24
25 autoscalingv1 "k8s.io/api/autoscaling/v1"
26 "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime/schema"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/apimachinery/pkg/util/json"
31 "k8s.io/apimachinery/pkg/util/wait"
32 scaleclient "k8s.io/client-go/scale"
33 )
34
35
36 type Scaler interface {
37
38
39
40
41 Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, wait *RetryParams, gvr schema.GroupVersionResource, dryRun bool) error
42
43
44 ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gvr schema.GroupVersionResource, dryRun bool) (updatedResourceVersion string, err error)
45 }
46
47
48 func NewScaler(scalesGetter scaleclient.ScalesGetter) Scaler {
49 return &genericScaler{scalesGetter}
50 }
51
52
53
54
55
56 type ScalePrecondition struct {
57 Size int
58 ResourceVersion string
59 }
60
61
62
63 type PreconditionError struct {
64 Precondition string
65 ExpectedValue string
66 ActualValue string
67 }
68
69 func (pe PreconditionError) Error() string {
70 return fmt.Sprintf("Expected %s to be %s, was %s", pe.Precondition, pe.ExpectedValue, pe.ActualValue)
71 }
72
73
74 type RetryParams struct {
75 Interval, Timeout time.Duration
76 }
77
78 func NewRetryParams(interval, timeout time.Duration) *RetryParams {
79 return &RetryParams{interval, timeout}
80 }
81
82
83 func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name string, count uint, updatedResourceVersion *string, gvr schema.GroupVersionResource, dryRun bool) wait.ConditionFunc {
84 return func() (bool, error) {
85 rv, err := r.ScaleSimple(namespace, name, precondition, count, gvr, dryRun)
86 if updatedResourceVersion != nil {
87 *updatedResourceVersion = rv
88 }
89
90 if errors.IsConflict(err) {
91 return false, nil
92 }
93 if err != nil {
94 return false, err
95 }
96 return true, nil
97 }
98 }
99
100
101 func (precondition *ScalePrecondition) validate(scale *autoscalingv1.Scale) error {
102 if precondition.Size != -1 && int(scale.Spec.Replicas) != precondition.Size {
103 return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(int(scale.Spec.Replicas))}
104 }
105 if len(precondition.ResourceVersion) > 0 && scale.ResourceVersion != precondition.ResourceVersion {
106 return PreconditionError{"resource version", precondition.ResourceVersion, scale.ResourceVersion}
107 }
108 return nil
109 }
110
111
112 type genericScaler struct {
113 scaleNamespacer scaleclient.ScalesGetter
114 }
115
116 var _ Scaler = &genericScaler{}
117
118
119 func (s *genericScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint, gvr schema.GroupVersionResource, dryRun bool) (updatedResourceVersion string, err error) {
120 if preconditions != nil {
121 scale, err := s.scaleNamespacer.Scales(namespace).Get(context.TODO(), gvr.GroupResource(), name, metav1.GetOptions{})
122 if err != nil {
123 return "", err
124 }
125 if err = preconditions.validate(scale); err != nil {
126 return "", err
127 }
128 scale.Spec.Replicas = int32(newSize)
129 updateOptions := metav1.UpdateOptions{}
130 if dryRun {
131 updateOptions.DryRun = []string{metav1.DryRunAll}
132 }
133 updatedScale, err := s.scaleNamespacer.Scales(namespace).Update(context.TODO(), gvr.GroupResource(), scale, updateOptions)
134 if err != nil {
135 return "", err
136 }
137 return updatedScale.ResourceVersion, nil
138 }
139
140
141 type objectForReplicas struct {
142 Replicas uint `json:"replicas"`
143 }
144
145 type objectForSpec struct {
146 Spec objectForReplicas `json:"spec"`
147 }
148 spec := objectForSpec{
149 Spec: objectForReplicas{Replicas: newSize},
150 }
151 patch, err := json.Marshal(&spec)
152 if err != nil {
153 return "", err
154 }
155 patchOptions := metav1.PatchOptions{}
156 if dryRun {
157 patchOptions.DryRun = []string{metav1.DryRunAll}
158 }
159 updatedScale, err := s.scaleNamespacer.Scales(namespace).Patch(context.TODO(), gvr, name, types.MergePatchType, patch, patchOptions)
160 if err != nil {
161 return "", err
162 }
163 return updatedScale.ResourceVersion, nil
164 }
165
166
167
168 func (s *genericScaler) Scale(namespace, resourceName string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams, gvr schema.GroupVersionResource, dryRun bool) error {
169 if retry == nil {
170
171 retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
172 }
173 cond := ScaleCondition(s, preconditions, namespace, resourceName, newSize, nil, gvr, dryRun)
174 if err := wait.PollImmediate(retry.Interval, retry.Timeout, cond); err != nil {
175 return err
176 }
177 if waitForReplicas != nil {
178 return WaitForScaleHasDesiredReplicas(s.scaleNamespacer, gvr.GroupResource(), resourceName, namespace, newSize, waitForReplicas)
179 }
180 return nil
181 }
182
183
184
185 func scaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, desiredReplicas int32) wait.ConditionFunc {
186 return func() (bool, error) {
187 actualScale, err := sClient.Scales(namespace).Get(context.TODO(), gr, resourceName, metav1.GetOptions{})
188 if err != nil {
189 return false, err
190 }
191
192 if actualScale.Spec.Replicas != desiredReplicas {
193 return true, nil
194 }
195 return actualScale.Spec.Replicas == actualScale.Status.Replicas &&
196 desiredReplicas == actualScale.Status.Replicas, nil
197 }
198 }
199
200
201
202 func WaitForScaleHasDesiredReplicas(sClient scaleclient.ScalesGetter, gr schema.GroupResource, resourceName string, namespace string, newSize uint, waitForReplicas *RetryParams) error {
203 if waitForReplicas == nil {
204 return fmt.Errorf("waitForReplicas parameter cannot be nil")
205 }
206 err := wait.PollImmediate(
207 waitForReplicas.Interval,
208 waitForReplicas.Timeout,
209 scaleHasDesiredReplicas(sClient, gr, resourceName, namespace, int32(newSize)))
210 if err == wait.ErrWaitTimeout {
211 return fmt.Errorf("timed out waiting for %q to be synced", resourceName)
212 }
213 return err
214 }
215
View as plain text