...
1
16
17
22 package goroutinemap
23
24 import (
25 "fmt"
26 "sync"
27
28 k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
29 "k8s.io/klog/v2"
30 "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
31 )
32
33
34
35
36
37 type GoRoutineMap interface {
38
39
40
41
42
43
44
45 Run(operationName string, operationFunc func() error) error
46
47
48
49
50 Wait()
51
52
53
54
55 WaitForCompletion()
56
57
58
59 IsOperationPending(operationName string) bool
60 }
61
62
63 func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
64 g := &goRoutineMap{
65 operations: make(map[string]operation),
66 exponentialBackOffOnError: exponentialBackOffOnError,
67 }
68
69 g.cond = sync.NewCond(&g.lock)
70 return g
71 }
72
73 type goRoutineMap struct {
74 operations map[string]operation
75 exponentialBackOffOnError bool
76 cond *sync.Cond
77 lock sync.RWMutex
78 }
79
80
81 type operation struct {
82 operationPending bool
83 expBackoff exponentialbackoff.ExponentialBackoff
84 }
85
86 func (grm *goRoutineMap) Run(
87 operationName string,
88 operationFunc func() error) error {
89 grm.lock.Lock()
90 defer grm.lock.Unlock()
91
92 existingOp, exists := grm.operations[operationName]
93 if exists {
94
95 if existingOp.operationPending {
96 return NewAlreadyExistsError(operationName)
97 }
98
99 if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
100 return err
101 }
102 }
103
104 grm.operations[operationName] = operation{
105 operationPending: true,
106 expBackoff: existingOp.expBackoff,
107 }
108 go func() (err error) {
109
110 defer k8sRuntime.HandleCrash()
111
112 defer grm.operationComplete(operationName, &err)
113
114 defer k8sRuntime.RecoverFromPanic(&err)
115 return operationFunc()
116 }()
117
118 return nil
119 }
120
121
122
123 func (grm *goRoutineMap) operationComplete(
124 operationName string, err *error) {
125
126
127
128
129 defer grm.cond.Signal()
130 grm.lock.Lock()
131 defer grm.lock.Unlock()
132
133 if *err == nil || !grm.exponentialBackOffOnError {
134
135 delete(grm.operations, operationName)
136 if *err != nil {
137
138 klog.Errorf("operation for %q failed with: %v",
139 operationName,
140 *err)
141 }
142 } else {
143
144 existingOp := grm.operations[operationName]
145 existingOp.expBackoff.Update(err)
146 existingOp.operationPending = false
147 grm.operations[operationName] = existingOp
148
149
150 klog.Errorf("%v",
151 existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
152 }
153 }
154
155 func (grm *goRoutineMap) IsOperationPending(operationName string) bool {
156 grm.lock.RLock()
157 defer grm.lock.RUnlock()
158 existingOp, exists := grm.operations[operationName]
159 if exists && existingOp.operationPending {
160 return true
161 }
162 return false
163 }
164
165 func (grm *goRoutineMap) Wait() {
166 grm.lock.Lock()
167 defer grm.lock.Unlock()
168
169 for len(grm.operations) > 0 {
170 grm.cond.Wait()
171 }
172 }
173
174 func (grm *goRoutineMap) WaitForCompletion() {
175 grm.lock.Lock()
176 defer grm.lock.Unlock()
177
178 for {
179 if len(grm.operations) == 0 || grm.nothingPending() {
180 break
181 } else {
182 grm.cond.Wait()
183 }
184 }
185 }
186
187
188
189 func (grm *goRoutineMap) nothingPending() bool {
190 nothingIsPending := true
191 for _, operation := range grm.operations {
192 if operation.operationPending {
193 nothingIsPending = false
194 break
195 }
196 }
197 return nothingIsPending
198 }
199
200
201 func NewAlreadyExistsError(operationName string) error {
202 return alreadyExistsError{operationName}
203 }
204
205
206
207
208 func IsAlreadyExists(err error) bool {
209 switch err.(type) {
210 case alreadyExistsError:
211 return true
212 default:
213 return false
214 }
215 }
216
217
218
219
220 type alreadyExistsError struct {
221 operationName string
222 }
223
224 var _ error = alreadyExistsError{}
225
226 func (err alreadyExistsError) Error() string {
227 return fmt.Sprintf(
228 "Failed to create operation with name %q. An operation with that name is already executing.",
229 err.operationName)
230 }
231
View as plain text