1
2
3 package main
4
5 import (
6 "context"
7 "sync"
8 "time"
9
10 eventstypes "github.com/containerd/containerd/api/events"
11 containerd_v1_types "github.com/containerd/containerd/api/types/task"
12 "github.com/containerd/containerd/errdefs"
13 "github.com/containerd/containerd/runtime"
14 "github.com/containerd/containerd/runtime/v2/task"
15 "github.com/opencontainers/runtime-spec/specs-go"
16 "github.com/pkg/errors"
17 "github.com/sirupsen/logrus"
18 "go.opencensus.io/trace"
19
20 "github.com/Microsoft/hcsshim/internal/cmd"
21 "github.com/Microsoft/hcsshim/internal/cow"
22 "github.com/Microsoft/hcsshim/internal/hcs"
23 "github.com/Microsoft/hcsshim/internal/log"
24 "github.com/Microsoft/hcsshim/internal/oc"
25 "github.com/Microsoft/hcsshim/internal/protocol/guestresource"
26 "github.com/Microsoft/hcsshim/internal/signals"
27 "github.com/Microsoft/hcsshim/internal/uvm"
28 "github.com/Microsoft/hcsshim/osversion"
29 )
30
31
32
33
34
35 func newHcsExec(
36 ctx context.Context,
37 events publisher,
38 tid string,
39 host *uvm.UtilityVM,
40 c cow.Container,
41 id, bundle string,
42 isWCOW bool,
43 spec *specs.Process,
44 io cmd.UpstreamIO) shimExec {
45 log.G(ctx).WithFields(logrus.Fields{
46 "tid": tid,
47 "eid": id,
48 "bundle": bundle,
49 "wcow": isWCOW,
50 }).Debug("newHcsExec")
51
52 he := &hcsExec{
53 events: events,
54 tid: tid,
55 host: host,
56 c: c,
57 id: id,
58 bundle: bundle,
59 isWCOW: isWCOW,
60 spec: spec,
61 io: io,
62 processDone: make(chan struct{}),
63 state: shimExecStateCreated,
64 exitStatus: 255,
65 exited: make(chan struct{}),
66 }
67 go he.waitForContainerExit()
68 return he
69 }
70
71 var _ = (shimExec)(&hcsExec{})
72
73 type hcsExec struct {
74 events publisher
75
76
77
78 tid string
79
80
81
82
83 host *uvm.UtilityVM
84
85
86
87 c cow.Container
88
89
90
91 id string
92
93
94
95
96
97 bundle string
98
99
100
101 isWCOW bool
102
103
104
105
106
107 spec *specs.Process
108
109
110
111
112
113 io cmd.UpstreamIO
114 processDone chan struct{}
115 processDoneOnce sync.Once
116
117
118
119 sl sync.Mutex
120 state shimExecState
121 pid int
122 exitStatus uint32
123 exitedAt time.Time
124 p *cmd.Cmd
125
126
127 exited chan struct{}
128 exitedOnce sync.Once
129 }
130
131 func (he *hcsExec) ID() string {
132 return he.id
133 }
134
135 func (he *hcsExec) Pid() int {
136 he.sl.Lock()
137 defer he.sl.Unlock()
138 return he.pid
139 }
140
141 func (he *hcsExec) State() shimExecState {
142 he.sl.Lock()
143 defer he.sl.Unlock()
144 return he.state
145 }
146
147 func (he *hcsExec) Status() *task.StateResponse {
148 he.sl.Lock()
149 defer he.sl.Unlock()
150
151 var s containerd_v1_types.Status
152 switch he.state {
153 case shimExecStateCreated:
154 s = containerd_v1_types.StatusCreated
155 case shimExecStateRunning:
156 s = containerd_v1_types.StatusRunning
157 case shimExecStateExited:
158 s = containerd_v1_types.StatusStopped
159 }
160
161 return &task.StateResponse{
162 ID: he.tid,
163 ExecID: he.id,
164 Bundle: he.bundle,
165 Pid: uint32(he.pid),
166 Status: s,
167 Stdin: he.io.StdinPath(),
168 Stdout: he.io.StdoutPath(),
169 Stderr: he.io.StderrPath(),
170 Terminal: he.io.Terminal(),
171 ExitStatus: he.exitStatus,
172 ExitedAt: he.exitedAt,
173 }
174 }
175
176 func (he *hcsExec) startInternal(ctx context.Context, initializeContainer bool) (err error) {
177 he.sl.Lock()
178 defer he.sl.Unlock()
179 if he.state != shimExecStateCreated {
180 return newExecInvalidStateError(he.tid, he.id, he.state, "start")
181 }
182 defer func() {
183 if err != nil {
184 he.exitFromCreatedL(ctx, 1)
185 }
186 }()
187 if initializeContainer {
188 err = he.c.Start(ctx)
189 if err != nil {
190 return err
191 }
192 defer func() {
193 if err != nil {
194 _ = he.c.Terminate(ctx)
195 he.c.Close()
196 }
197 }()
198 }
199 cmd := &cmd.Cmd{
200 Host: he.c,
201 Stdin: he.io.Stdin(),
202 Stdout: he.io.Stdout(),
203 Stderr: he.io.Stderr(),
204 Log: log.G(ctx).WithFields(logrus.Fields{
205 "tid": he.tid,
206 "eid": he.id,
207 }),
208 CopyAfterExitTimeout: time.Second * 1,
209 }
210 if he.isWCOW || he.id != he.tid {
211
212
213 cmd.Spec = he.spec
214 }
215 err = cmd.Start()
216 if err != nil {
217 return err
218 }
219 he.p = cmd
220
221
222 he.pid = he.p.Process.Pid()
223 he.state = shimExecStateRunning
224
225
226
227 if he.id != he.tid {
228 if err := he.events.publishEvent(
229 ctx,
230 runtime.TaskExecStartedEventTopic,
231 &eventstypes.TaskExecStarted{
232 ContainerID: he.tid,
233 ExecID: he.id,
234 Pid: uint32(he.pid),
235 }); err != nil {
236 return err
237 }
238 } else {
239 if err := he.events.publishEvent(
240 ctx,
241 runtime.TaskStartEventTopic,
242 &eventstypes.TaskStart{
243 ContainerID: he.tid,
244 Pid: uint32(he.pid),
245 }); err != nil {
246 return err
247 }
248 }
249
250
251 go he.waitForExit()
252 return nil
253 }
254
255 func (he *hcsExec) Start(ctx context.Context) (err error) {
256
257
258 return he.startInternal(ctx, he.id == he.tid)
259 }
260
261 func (he *hcsExec) Kill(ctx context.Context, signal uint32) error {
262 he.sl.Lock()
263 defer he.sl.Unlock()
264 switch he.state {
265 case shimExecStateCreated:
266 he.exitFromCreatedL(ctx, 1)
267 return nil
268 case shimExecStateRunning:
269 supported := false
270 if osversion.Build() >= osversion.RS5 {
271 supported = he.host == nil || he.host.SignalProcessSupported()
272 }
273 var options interface{}
274 var err error
275 if he.isWCOW {
276 var opt *guestresource.SignalProcessOptionsWCOW
277 opt, err = signals.ValidateWCOW(int(signal), supported)
278 if opt != nil {
279 options = opt
280 }
281 } else {
282 var opt *guestresource.SignalProcessOptionsLCOW
283 opt, err = signals.ValidateLCOW(int(signal), supported)
284 if opt != nil {
285 options = opt
286 }
287 }
288 if err != nil {
289 return errors.Wrapf(errdefs.ErrFailedPrecondition, "signal %d: %v", signal, err)
290 }
291 var delivered bool
292 if supported && options != nil {
293 if he.isWCOW {
294
295
296
297
298
299
300
301
302 go func() {
303 signalDelivered, deliveryErr := he.p.Process.Signal(ctx, options)
304
305 if deliveryErr != nil {
306 if !hcs.IsAlreadyStopped(deliveryErr) {
307
308 log.G(ctx).WithField("err", deliveryErr).Errorf("Error in delivering signal %d, to pid: %d", signal, he.pid)
309 }
310 }
311 if !signalDelivered {
312 log.G(ctx).Errorf("Error: NotFound; exec: '%s' in task: '%s' not found", he.id, he.tid)
313 }
314 }()
315 delivered, err = true, nil
316 } else {
317 delivered, err = he.p.Process.Signal(ctx, options)
318 }
319 } else {
320
321
322 delivered, err = he.p.Process.Kill(ctx)
323 }
324 if err != nil {
325 if hcs.IsAlreadyStopped(err) {
326
327
328 return nil
329 }
330 return err
331 }
332 if !delivered {
333 return errors.Wrapf(errdefs.ErrNotFound, "exec: '%s' in task: '%s' not found", he.id, he.tid)
334 }
335 return nil
336 case shimExecStateExited:
337 return errors.Wrapf(errdefs.ErrNotFound, "exec: '%s' in task: '%s' not found", he.id, he.tid)
338 default:
339 return newExecInvalidStateError(he.tid, he.id, he.state, "kill")
340 }
341 }
342
343 func (he *hcsExec) ResizePty(ctx context.Context, width, height uint32) error {
344 he.sl.Lock()
345 defer he.sl.Unlock()
346 if !he.io.Terminal() {
347 return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '%s' in task: '%s' is not a tty", he.id, he.tid)
348 }
349
350 if he.state == shimExecStateRunning {
351 return he.p.Process.ResizeConsole(ctx, uint16(width), uint16(height))
352 }
353 return nil
354 }
355
356 func (he *hcsExec) CloseIO(ctx context.Context, stdin bool) error {
357
358
359
360
361 he.io.CloseStdin(ctx)
362 return nil
363 }
364
365 func (he *hcsExec) Wait() *task.StateResponse {
366 <-he.exited
367 return he.Status()
368 }
369
370 func (he *hcsExec) ForceExit(ctx context.Context, status int) {
371 he.sl.Lock()
372 defer he.sl.Unlock()
373 if he.state != shimExecStateExited {
374 switch he.state {
375 case shimExecStateCreated:
376 he.exitFromCreatedL(ctx, status)
377 case shimExecStateRunning:
378
379 _, _ = he.p.Process.Kill(ctx)
380 }
381 }
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405 func (he *hcsExec) exitFromCreatedL(ctx context.Context, status int) {
406 if he.state != shimExecStateExited {
407
408 log.G(ctx).WithField("status", status).Debug("hcsExec::exitFromCreatedL")
409
410
411 he.processDoneOnce.Do(func() { close(he.processDone) })
412
413 he.state = shimExecStateExited
414 he.exitStatus = uint32(status)
415 he.exitedAt = time.Now()
416
417 he.io.Close(ctx)
418
419 he.exitedOnce.Do(func() {
420 close(he.exited)
421 })
422 }
423 }
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448 func (he *hcsExec) waitForExit() {
449 var err error
450 ctx, span := oc.StartSpan(context.Background(), "hcsExec::waitForExit")
451 defer span.End()
452 defer func() { oc.SetSpanStatus(span, err) }()
453 span.AddAttributes(
454 trace.StringAttribute("tid", he.tid),
455 trace.StringAttribute("eid", he.id))
456
457 err = he.p.Process.Wait()
458 if err != nil {
459 log.G(ctx).WithError(err).Error("failed process Wait")
460 }
461
462
463
464 he.processDoneOnce.Do(func() { close(he.processDone) })
465
466 code, err := he.p.Process.ExitCode()
467 if err != nil {
468 log.G(ctx).WithError(err).Error("failed to get ExitCode")
469 } else {
470 log.G(ctx).WithField("exitCode", code).Debug("exited")
471 }
472
473 he.sl.Lock()
474 he.state = shimExecStateExited
475 he.exitStatus = uint32(code)
476 he.exitedAt = time.Now()
477 he.sl.Unlock()
478
479
480 _ = he.p.Wait()
481 he.io.Close(ctx)
482
483
484
485 if he.tid != he.id {
486
487 if err := he.events.publishEvent(
488 ctx,
489 runtime.TaskExitEventTopic,
490 &eventstypes.TaskExit{
491 ContainerID: he.tid,
492 ID: he.id,
493 Pid: uint32(he.pid),
494 ExitStatus: he.exitStatus,
495 ExitedAt: he.exitedAt,
496 }); err != nil {
497 log.G(ctx).WithError(err).Error("failed to publish TaskExitEvent")
498 }
499 }
500
501
502 he.exitedOnce.Do(func() {
503 close(he.exited)
504 })
505 }
506
507
508
509
510
511
512 func (he *hcsExec) waitForContainerExit() {
513 ctx, span := oc.StartSpan(context.Background(), "hcsExec::waitForContainerExit")
514 defer span.End()
515 span.AddAttributes(
516 trace.StringAttribute("tid", he.tid),
517 trace.StringAttribute("eid", he.id))
518
519
520 select {
521 case <-he.c.WaitChannel():
522
523
524 he.sl.Lock()
525 switch he.state {
526 case shimExecStateCreated:
527 he.exitFromCreatedL(ctx, 1)
528 case shimExecStateRunning:
529
530 _, _ = he.p.Process.Kill(ctx)
531 }
532 he.sl.Unlock()
533 case <-he.processDone:
534
535
536 }
537 }
538
View as plain text