1
16
17
25 package nestedpendingoperations
26
27 import (
28 "fmt"
29 "sync"
30
31 v1 "k8s.io/api/core/v1"
32 "k8s.io/apimachinery/pkg/types"
33 k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
34 "k8s.io/klog/v2"
35 "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
36 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
37 )
38
39 const (
40
41 EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("")
42
43
44 EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
45
46
47 EmptyNodeName types.NodeName = types.NodeName("")
48 )
49
50
51 type NestedPendingOperations interface {
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 Run(
93 volumeName v1.UniqueVolumeName,
94 podName volumetypes.UniquePodName,
95 nodeName types.NodeName,
96 generatedOperations volumetypes.GeneratedOperations) error
97
98
99
100
101 Wait()
102
103
104
105 IsOperationPending(
106 volumeName v1.UniqueVolumeName,
107 podName volumetypes.UniquePodName,
108 nodeName types.NodeName) bool
109
110
111
112 IsOperationSafeToRetry(
113 volumeName v1.UniqueVolumeName,
114 podName volumetypes.UniquePodName,
115 nodeName types.NodeName, operationName string) bool
116 }
117
118
119 func NewNestedPendingOperations(exponentialBackOffOnError bool) NestedPendingOperations {
120 g := &nestedPendingOperations{
121 operations: []operation{},
122 exponentialBackOffOnError: exponentialBackOffOnError,
123 }
124 g.cond = sync.NewCond(&g.lock)
125 return g
126 }
127
128 type nestedPendingOperations struct {
129 operations []operation
130 exponentialBackOffOnError bool
131 cond *sync.Cond
132 lock sync.RWMutex
133 }
134
135 type operation struct {
136 key operationKey
137 operationName string
138 operationPending bool
139 expBackoff exponentialbackoff.ExponentialBackoff
140 }
141
142 func (grm *nestedPendingOperations) Run(
143 volumeName v1.UniqueVolumeName,
144 podName volumetypes.UniquePodName,
145 nodeName types.NodeName,
146 generatedOperations volumetypes.GeneratedOperations) error {
147 grm.lock.Lock()
148 defer grm.lock.Unlock()
149
150 opKey := operationKey{volumeName, podName, nodeName}
151
152 opExists, previousOpIndex := grm.isOperationExists(opKey)
153 if opExists {
154 previousOp := grm.operations[previousOpIndex]
155
156 if previousOp.operationPending {
157
158 return NewAlreadyExistsError(opKey)
159 }
160
161 backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
162 if backOffErr != nil {
163 if previousOp.operationName == generatedOperations.OperationName {
164 return backOffErr
165 }
166
167 grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
168 grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
169 }
170
171
172 grm.operations[previousOpIndex].operationPending = true
173 grm.operations[previousOpIndex].key = opKey
174 } else {
175
176 grm.operations = append(grm.operations,
177 operation{
178 key: opKey,
179 operationPending: true,
180 operationName: generatedOperations.OperationName,
181 expBackoff: exponentialbackoff.ExponentialBackoff{},
182 })
183 }
184
185 go func() (eventErr, detailedErr error) {
186
187 defer k8sRuntime.HandleCrash()
188
189 defer grm.operationComplete(opKey, &detailedErr)
190 return generatedOperations.Run()
191 }()
192
193 return nil
194 }
195 func (grm *nestedPendingOperations) IsOperationSafeToRetry(
196 volumeName v1.UniqueVolumeName,
197 podName volumetypes.UniquePodName,
198 nodeName types.NodeName,
199 operationName string) bool {
200
201 grm.lock.RLock()
202 defer grm.lock.RUnlock()
203
204 opKey := operationKey{volumeName, podName, nodeName}
205 exist, previousOpIndex := grm.isOperationExists(opKey)
206 if !exist {
207 return true
208 }
209 previousOp := grm.operations[previousOpIndex]
210 if previousOp.operationPending {
211 return false
212 }
213 backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
214 if backOffErr != nil {
215 if previousOp.operationName == operationName {
216 return false
217 }
218 }
219
220 return true
221 }
222
223 func (grm *nestedPendingOperations) IsOperationPending(
224 volumeName v1.UniqueVolumeName,
225 podName volumetypes.UniquePodName,
226 nodeName types.NodeName) bool {
227
228 grm.lock.RLock()
229 defer grm.lock.RUnlock()
230
231 opKey := operationKey{volumeName, podName, nodeName}
232 exist, previousOpIndex := grm.isOperationExists(opKey)
233 if exist && grm.operations[previousOpIndex].operationPending {
234 return true
235 }
236 return false
237 }
238
239
240 func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
241
242
243 if key.volumeName == EmptyUniqueVolumeName {
244 return false, -1
245 }
246
247 opIndex := -1
248 for previousOpIndex, previousOp := range grm.operations {
249 volumeNameMatch := previousOp.key.volumeName == key.volumeName
250
251 podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
252 key.podName == EmptyUniquePodName ||
253 previousOp.key.podName == key.podName
254
255 podNameExactMatch := previousOp.key.podName == key.podName
256
257 nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
258 key.nodeName == EmptyNodeName ||
259 previousOp.key.nodeName == key.nodeName
260
261 nodeNameExactMatch := previousOp.key.nodeName == key.nodeName
262
263 if volumeNameMatch && podNameMatch && nodeNameMatch {
264
265 if previousOp.operationPending {
266 return true, previousOpIndex
267 }
268
269
270 if opIndex == -1 || (podNameExactMatch && nodeNameExactMatch) {
271 opIndex = previousOpIndex
272 }
273 }
274 }
275 return opIndex != -1, opIndex
276
277 }
278
279 func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
280
281
282 for i, op := range grm.operations {
283 if op.key.volumeName == key.volumeName &&
284 op.key.podName == key.podName &&
285 op.key.nodeName == key.nodeName {
286 return uint(i), nil
287 }
288 }
289
290 return 0, fmt.Errorf("operation %+v not found", key)
291 }
292
293 func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
294
295
296 opIndex := -1
297 for i, op := range grm.operations {
298 if op.key.volumeName == key.volumeName &&
299 op.key.podName == key.podName &&
300 op.key.nodeName == key.nodeName {
301 opIndex = i
302 break
303 }
304 }
305
306 if opIndex < 0 {
307 return
308 }
309
310
311 grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
312 grm.operations = grm.operations[:len(grm.operations)-1]
313 }
314
315 func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) {
316
317
318
319
320 defer grm.cond.Signal()
321 grm.lock.Lock()
322 defer grm.lock.Unlock()
323
324 if *err == nil || !grm.exponentialBackOffOnError {
325
326 grm.deleteOperation(key)
327 if *err != nil {
328
329 klog.Errorf("operation %+v failed with: %v", key, *err)
330 }
331 return
332 }
333
334
335 existingOpIndex, getOpErr := grm.getOperation(key)
336 if getOpErr != nil {
337
338 klog.Errorf("Operation %+v completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
339 key,
340 *err)
341 return
342 }
343
344 grm.operations[existingOpIndex].expBackoff.Update(err)
345 grm.operations[existingOpIndex].operationPending = false
346
347
348 klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
349 GenerateNoRetriesPermittedMsg(fmt.Sprintf("%+v", key)))
350 }
351
352 func (grm *nestedPendingOperations) Wait() {
353 grm.lock.Lock()
354 defer grm.lock.Unlock()
355
356 for len(grm.operations) > 0 {
357 grm.cond.Wait()
358 }
359 }
360
361 type operationKey struct {
362 volumeName v1.UniqueVolumeName
363 podName volumetypes.UniquePodName
364 nodeName types.NodeName
365 }
366
367
368 func NewAlreadyExistsError(key operationKey) error {
369 return alreadyExistsError{key}
370 }
371
372
373
374
375 func IsAlreadyExists(err error) bool {
376 switch err.(type) {
377 case alreadyExistsError:
378 return true
379 default:
380 return false
381 }
382 }
383
384
385
386
387 type alreadyExistsError struct {
388 operationKey operationKey
389 }
390
391 var _ error = alreadyExistsError{}
392
393 func (err alreadyExistsError) Error() string {
394 return fmt.Sprintf(
395 "Failed to create operation with name %+v. An operation with that name is already executing.",
396 err.operationKey)
397 }
398
View as plain text