1
2
3
4 package taskrunner
5
6 import (
7 "context"
8 "fmt"
9
10 "k8s.io/klog/v2"
11 "sigs.k8s.io/cli-utils/pkg/apply/cache"
12 "sigs.k8s.io/cli-utils/pkg/apply/event"
13 pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
14 "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
15 "sigs.k8s.io/cli-utils/pkg/object"
16 )
17
18
19 func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusWatcher watcher.StatusWatcher) *TaskStatusRunner {
20 return &TaskStatusRunner{
21 Identifiers: identifiers,
22 StatusWatcher: statusWatcher,
23 }
24 }
25
26
27
28
29 type TaskStatusRunner struct {
30 Identifiers object.ObjMetadataSet
31 StatusWatcher watcher.StatusWatcher
32 }
33
34
35
36 type Options struct {
37 EmitStatusEvents bool
38 }
39
40
41
42
43
44
45
46
47
48
49
50 func (tsr *TaskStatusRunner) Run(
51 ctx context.Context,
52 taskContext *TaskContext,
53 taskQueue chan Task,
54 opts Options,
55 ) error {
56
57
58
59 statusCtx, cancelFunc := context.WithCancel(context.Background())
60 statusChannel := tsr.StatusWatcher.Watch(statusCtx, tsr.Identifiers, watcher.Options{})
61
62
63
64
65
66
67 complete := func(err error) error {
68 klog.V(7).Info("Runner cancelled status watcher")
69 cancelFunc()
70 for statusEvent := range statusChannel {
71 klog.V(7).Infof("Runner ignored status event: %v", statusEvent)
72 }
73 return err
74 }
75
76
77 var currentTask Task
78 done := false
79
80
81
82
83
84
85 abort := false
86 var abortReason error
87
88
89
90 doneCh := ctx.Done()
91
92 for {
93 select {
94
95
96
97
98
99 case statusEvent, ok := <-statusChannel:
100
101
102
103
104
105 if !ok {
106 continue
107 }
108
109 if abort {
110 klog.V(7).Infof("Runner ignored status event: %v", statusEvent)
111 continue
112 }
113 klog.V(7).Infof("Runner received status event: %v", statusEvent)
114
115
116
117
118 if statusEvent.Type == pollevent.ErrorEvent {
119 abort = true
120 abortReason = fmt.Errorf("polling for status failed: %v",
121 statusEvent.Error)
122 if currentTask != nil {
123 currentTask.Cancel(taskContext)
124 } else {
125
126 return complete(abortReason)
127 }
128 continue
129 }
130
131
132
133 if statusEvent.Type == pollevent.SyncEvent {
134
135 currentTask, done = nextTask(taskQueue, taskContext)
136 if done {
137 return complete(nil)
138 }
139 continue
140 }
141
142 if opts.EmitStatusEvents {
143
144 taskContext.SendEvent(event.Event{
145 Type: event.StatusType,
146 StatusEvent: event.StatusEvent{
147 Identifier: statusEvent.Resource.Identifier,
148 PollResourceInfo: statusEvent.Resource,
149 Resource: statusEvent.Resource.Resource,
150 Error: statusEvent.Error,
151 },
152 })
153 }
154
155 id := statusEvent.Resource.Identifier
156
157
158
159
160 taskContext.ResourceCache().Put(id, cache.ResourceStatus{
161 Resource: statusEvent.Resource.Resource,
162 Status: statusEvent.Resource.Status,
163 StatusMessage: statusEvent.Resource.Message,
164 })
165
166
167
168 if currentTask != nil {
169 if currentTask.Identifiers().Contains(id) {
170 currentTask.StatusUpdate(taskContext, id)
171 }
172 }
173
174
175
176
177
178
179
180 case msg := <-taskContext.TaskChannel():
181 taskContext.SendEvent(event.Event{
182 Type: event.ActionGroupType,
183 ActionGroupEvent: event.ActionGroupEvent{
184 GroupName: currentTask.Name(),
185 Action: currentTask.Action(),
186 Status: event.Finished,
187 },
188 })
189 if msg.Err != nil {
190 return complete(
191 fmt.Errorf("task failed (action: %q, name: %q): %w",
192 currentTask.Action(), currentTask.Name(), msg.Err))
193 }
194 if abort {
195 return complete(abortReason)
196 }
197 currentTask, done = nextTask(taskQueue, taskContext)
198
199
200 if done {
201 return complete(nil)
202 }
203
204
205
206 case <-doneCh:
207 doneCh = nil
208 abort = true
209 abortReason = ctx.Err()
210 klog.V(7).Infof("Runner aborting: %v", abortReason)
211 if currentTask != nil {
212 currentTask.Cancel(taskContext)
213 } else {
214
215 return complete(abortReason)
216 }
217 }
218 }
219 }
220
221
222
223
224 func nextTask(taskQueue chan Task, taskContext *TaskContext) (Task, bool) {
225 var tsk Task
226 select {
227
228
229 case t := <-taskQueue:
230 tsk = t
231 default:
232
233 return nil, true
234 }
235
236 taskContext.SendEvent(event.Event{
237 Type: event.ActionGroupType,
238 ActionGroupEvent: event.ActionGroupEvent{
239 GroupName: tsk.Name(),
240 Action: tsk.Action(),
241 Status: event.Started,
242 },
243 })
244
245 tsk.Start(taskContext)
246
247 return tsk, false
248 }
249
250
251
252
253 type TaskResult struct {
254 Err error
255 }
256
View as plain text