1
16
17 package statefulset
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23
24 apps "k8s.io/api/apps/v1"
25 v1 "k8s.io/api/core/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 errorutils "k8s.io/apimachinery/pkg/util/errors"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 utilfeature "k8s.io/apiserver/pkg/util/feature"
31 clientset "k8s.io/client-go/kubernetes"
32 corelisters "k8s.io/client-go/listers/core/v1"
33 "k8s.io/client-go/tools/record"
34 "k8s.io/client-go/util/retry"
35 "k8s.io/klog/v2"
36 "k8s.io/kubernetes/pkg/features"
37 )
38
39
40
41 type StatefulPodControlObjectManager interface {
42 CreatePod(ctx context.Context, pod *v1.Pod) error
43 GetPod(namespace, podName string) (*v1.Pod, error)
44 UpdatePod(pod *v1.Pod) error
45 DeletePod(pod *v1.Pod) error
46 CreateClaim(claim *v1.PersistentVolumeClaim) error
47 GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error)
48 UpdateClaim(claim *v1.PersistentVolumeClaim) error
49 }
50
51
52
53
54
55 type StatefulPodControl struct {
56 objectMgr StatefulPodControlObjectManager
57 recorder record.EventRecorder
58 }
59
60
61
62 func NewStatefulPodControl(
63 client clientset.Interface,
64 podLister corelisters.PodLister,
65 claimLister corelisters.PersistentVolumeClaimLister,
66 recorder record.EventRecorder,
67 ) *StatefulPodControl {
68 return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder}
69 }
70
71
72 func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl {
73 return &StatefulPodControl{om, recorder}
74 }
75
76
77 type realStatefulPodControlObjectManager struct {
78 client clientset.Interface
79 podLister corelisters.PodLister
80 claimLister corelisters.PersistentVolumeClaimLister
81 }
82
83 func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
84 _, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
85 return err
86 }
87
88 func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) {
89 return om.podLister.Pods(namespace).Get(podName)
90 }
91
92 func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error {
93 _, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
94 return err
95 }
96
97 func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error {
98 return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
99 }
100
101 func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
102 _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{})
103 return err
104 }
105
106 func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) {
107 return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName)
108 }
109
110 func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error {
111 _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{})
112 return err
113 }
114
115 func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
116
117 if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
118 spc.recordPodEvent("create", set, pod, err)
119 return err
120 }
121
122 err := spc.objectMgr.CreatePod(ctx, pod)
123
124 if apierrors.IsAlreadyExists(err) {
125 return err
126 }
127 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
128
129 if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
130 spc.recordPodEvent("update", set, pod, err)
131 return err
132 }
133 }
134 spc.recordPodEvent("create", set, pod, err)
135 return err
136 }
137
138 func (spc *StatefulPodControl) UpdateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
139 attemptedUpdate := false
140 err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
141
142 consistent := true
143
144 if !identityMatches(set, pod) {
145 updateIdentity(set, pod)
146 consistent = false
147 }
148
149
150 if !storageMatches(set, pod) {
151 updateStorage(set, pod)
152 consistent = false
153 if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
154 spc.recordPodEvent("update", set, pod, err)
155 return err
156 }
157 }
158 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
159
160
161 if match, err := spc.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
162 spc.recordPodEvent("update", set, pod, err)
163 return err
164 } else if !match {
165 if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
166 spc.recordPodEvent("update", set, pod, err)
167 return err
168 }
169 consistent = false
170 }
171 }
172
173
174 if consistent {
175 return nil
176 }
177
178 attemptedUpdate = true
179
180
181 updateErr := spc.objectMgr.UpdatePod(pod)
182 if updateErr == nil {
183 return nil
184 }
185
186 if updated, err := spc.objectMgr.GetPod(set.Namespace, pod.Name); err == nil {
187
188 pod = updated.DeepCopy()
189 } else {
190 utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s: %w", set.Namespace, pod.Name, err))
191 }
192
193 return updateErr
194 })
195 if attemptedUpdate {
196 spc.recordPodEvent("update", set, pod, err)
197 }
198 return err
199 }
200
201 func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
202 err := spc.objectMgr.DeletePod(pod)
203 spc.recordPodEvent("delete", set, pod, err)
204 return err
205 }
206
207
208
209
210 func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
211 logger := klog.FromContext(ctx)
212 ordinal := getOrdinal(pod)
213 templates := set.Spec.VolumeClaimTemplates
214 for i := range templates {
215 claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
216 claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
217 switch {
218 case apierrors.IsNotFound(err):
219 klog.FromContext(ctx).V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim))
220 case err != nil:
221 return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
222 default:
223 if !claimOwnerMatchesSetAndPod(logger, claim, set, pod) {
224 return false, nil
225 }
226 }
227 }
228 return true, nil
229 }
230
231
232 func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
233 logger := klog.FromContext(ctx)
234 ordinal := getOrdinal(pod)
235 templates := set.Spec.VolumeClaimTemplates
236 for i := range templates {
237 claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
238 claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
239 switch {
240 case apierrors.IsNotFound(err):
241 logger.V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim))
242 case err != nil:
243 return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err)
244 default:
245 if !claimOwnerMatchesSetAndPod(logger, claim, set, pod) {
246 claim = claim.DeepCopy()
247 needsUpdate := updateClaimOwnerRefForSetAndPod(logger, claim, set, pod)
248 if needsUpdate {
249 err := spc.objectMgr.UpdateClaim(claim)
250 if err != nil {
251 return fmt.Errorf("Could not update claim %s for delete policy ownerRefs: %w", claimName, err)
252 }
253 }
254 }
255 }
256 }
257 return nil
258 }
259
260
261
262
263 func (spc *StatefulPodControl) PodClaimIsStale(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
264 policy := getPersistentVolumeClaimRetentionPolicy(set)
265 if policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType {
266
267 return false, nil
268 }
269 for _, claim := range getPersistentVolumeClaims(set, pod) {
270 pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
271 switch {
272 case apierrors.IsNotFound(err):
273
274 continue
275 case err != nil:
276 return false, err
277 case err == nil:
278
279 if hasStaleOwnerRef(pvc, pod) {
280 return true, nil
281 }
282 }
283 }
284 return false, nil
285 }
286
287
288
289 func (spc *StatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
290 if err == nil {
291 reason := fmt.Sprintf("Successful%s", strings.Title(verb))
292 message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
293 strings.ToLower(verb), pod.Name, set.Name)
294 spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
295 } else {
296 reason := fmt.Sprintf("Failed%s", strings.Title(verb))
297 message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
298 strings.ToLower(verb), pod.Name, set.Name, err)
299 spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
300 }
301 }
302
303
304
305
306 func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
307 if err == nil {
308 reason := fmt.Sprintf("Successful%s", strings.Title(verb))
309 message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
310 strings.ToLower(verb), claim.Name, pod.Name, set.Name)
311 spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
312 } else {
313 reason := fmt.Sprintf("Failed%s", strings.Title(verb))
314 message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
315 strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
316 spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
317 }
318 }
319
320
321 func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
322 if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
323 return err
324 }
325
326 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
327
328 if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
329 spc.recordPodEvent("update", set, pod, err)
330 return err
331 }
332 }
333 return nil
334 }
335
336
337
338
339
340 func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
341 var errs []error
342 for _, claim := range getPersistentVolumeClaims(set, pod) {
343 pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
344 switch {
345 case apierrors.IsNotFound(err):
346 err := spc.objectMgr.CreateClaim(&claim)
347 if err != nil {
348 errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err))
349 }
350 if err == nil || !apierrors.IsAlreadyExists(err) {
351 spc.recordClaimEvent("create", set, pod, &claim, err)
352 }
353 case err != nil:
354 errs = append(errs, fmt.Errorf("failed to retrieve PVC %s: %s", claim.Name, err))
355 spc.recordClaimEvent("create", set, pod, &claim, err)
356 default:
357 if pvc.DeletionTimestamp != nil {
358 errs = append(errs, fmt.Errorf("pvc %s is being deleted", claim.Name))
359 }
360 }
361
362 }
363 return errorutils.NewAggregate(errs)
364 }
365
View as plain text