1
2
3 package main
4
5 import (
6 "context"
7 "os"
8 "runtime"
9 "strings"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "github.com/Microsoft/hcsshim/internal/extendedtask"
15 "github.com/Microsoft/hcsshim/internal/oc"
16 "github.com/Microsoft/hcsshim/internal/shimdiag"
17 "github.com/containerd/containerd/errdefs"
18 "github.com/containerd/containerd/runtime/v2/task"
19 google_protobuf1 "github.com/gogo/protobuf/types"
20 "go.opencensus.io/trace"
21 )
22
23 type ServiceOptions struct {
24 Events publisher
25 TID string
26 IsSandbox bool
27 }
28
29 type ServiceOption func(*ServiceOptions)
30
31 func WithEventPublisher(e publisher) ServiceOption {
32 return func(o *ServiceOptions) {
33 o.Events = e
34 }
35 }
36 func WithTID(tid string) ServiceOption {
37 return func(o *ServiceOptions) {
38 o.TID = tid
39 }
40 }
41 func WithIsSandbox(s bool) ServiceOption {
42 return func(o *ServiceOptions) {
43 o.IsSandbox = s
44 }
45 }
46
47 type service struct {
48 events publisher
49
50
51
52
53
54 tid string
55
56
57
58
59
60
61 isSandbox bool
62
63
64
65
66 taskOrPod atomic.Value
67
68
69
70
71
72 cl sync.Mutex
73
74
75 shutdown chan struct{}
76
77 shutdownOnce sync.Once
78
79
80 gracefulShutdown bool
81 }
82
83 var _ = (task.TaskService)(&service{})
84
85 func NewService(o ...ServiceOption) (svc *service, err error) {
86 var opts ServiceOptions
87 for _, op := range o {
88 op(&opts)
89 }
90
91 svc = &service{
92 events: opts.Events,
93 tid: opts.TID,
94 isSandbox: opts.IsSandbox,
95 shutdown: make(chan struct{}),
96 }
97 return svc, nil
98 }
99
100 func (s *service) State(ctx context.Context, req *task.StateRequest) (resp *task.StateResponse, err error) {
101 ctx, span := oc.StartSpan(ctx, "State")
102 defer span.End()
103 defer func() {
104 if resp != nil {
105 span.AddAttributes(
106 trace.StringAttribute("status", resp.Status.String()),
107 trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
108 trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
109 }
110 oc.SetSpanStatus(span, err)
111 }()
112
113 span.AddAttributes(
114 trace.StringAttribute("tid", req.ID),
115 trace.StringAttribute("eid", req.ExecID))
116
117 if s.isSandbox {
118 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
119 }
120
121 r, e := s.stateInternal(ctx, req)
122 return r, errdefs.ToGRPC(e)
123 }
124
125 func (s *service) Create(ctx context.Context, req *task.CreateTaskRequest) (resp *task.CreateTaskResponse, err error) {
126 ctx, span := oc.StartSpan(ctx, "Create")
127 defer span.End()
128 defer func() {
129 if resp != nil {
130 span.AddAttributes(trace.Int64Attribute("pid", int64(resp.Pid)))
131 }
132 oc.SetSpanStatus(span, err)
133 }()
134
135 span.AddAttributes(
136 trace.StringAttribute("tid", req.ID),
137 trace.StringAttribute("bundle", req.Bundle),
138
139
140 trace.BoolAttribute("terminal", req.Terminal),
141 trace.StringAttribute("stdin", req.Stdin),
142 trace.StringAttribute("stdout", req.Stdout),
143 trace.StringAttribute("stderr", req.Stderr),
144 trace.StringAttribute("checkpoint", req.Checkpoint),
145 trace.StringAttribute("parentcheckpoint", req.ParentCheckpoint))
146
147 if s.isSandbox {
148 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
149 }
150
151 r, e := s.createInternal(ctx, req)
152 return r, errdefs.ToGRPC(e)
153 }
154
155 func (s *service) Start(ctx context.Context, req *task.StartRequest) (resp *task.StartResponse, err error) {
156 ctx, span := oc.StartSpan(ctx, "Start")
157 defer span.End()
158 defer func() {
159 if resp != nil {
160 span.AddAttributes(trace.Int64Attribute("pid", int64(resp.Pid)))
161 }
162 oc.SetSpanStatus(span, err)
163 }()
164
165 span.AddAttributes(
166 trace.StringAttribute("tid", req.ID),
167 trace.StringAttribute("eid", req.ExecID))
168
169 if s.isSandbox {
170 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
171 }
172
173 r, e := s.startInternal(ctx, req)
174 return r, errdefs.ToGRPC(e)
175 }
176
177 func (s *service) Delete(ctx context.Context, req *task.DeleteRequest) (resp *task.DeleteResponse, err error) {
178 ctx, span := oc.StartSpan(ctx, "Delete")
179 defer span.End()
180 defer func() {
181 if resp != nil {
182 span.AddAttributes(
183 trace.Int64Attribute("pid", int64(resp.Pid)),
184 trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
185 trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
186 }
187 oc.SetSpanStatus(span, err)
188 }()
189
190 span.AddAttributes(
191 trace.StringAttribute("tid", req.ID),
192 trace.StringAttribute("eid", req.ExecID))
193
194 if s.isSandbox {
195 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
196 }
197
198 r, e := s.deleteInternal(ctx, req)
199 return r, errdefs.ToGRPC(e)
200 }
201
202 func (s *service) Pids(ctx context.Context, req *task.PidsRequest) (_ *task.PidsResponse, err error) {
203 ctx, span := oc.StartSpan(ctx, "Pids")
204 defer span.End()
205 defer func() { oc.SetSpanStatus(span, err) }()
206
207 span.AddAttributes(trace.StringAttribute("tid", req.ID))
208
209 if s.isSandbox {
210 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
211 }
212
213 r, e := s.pidsInternal(ctx, req)
214 return r, errdefs.ToGRPC(e)
215 }
216
217 func (s *service) Pause(ctx context.Context, req *task.PauseRequest) (_ *google_protobuf1.Empty, err error) {
218 ctx, span := oc.StartSpan(ctx, "Pause")
219 defer span.End()
220 defer func() { oc.SetSpanStatus(span, err) }()
221
222 span.AddAttributes(trace.StringAttribute("tid", req.ID))
223
224 if s.isSandbox {
225 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
226 }
227
228 r, e := s.pauseInternal(ctx, req)
229 return r, errdefs.ToGRPC(e)
230 }
231
232 func (s *service) Resume(ctx context.Context, req *task.ResumeRequest) (_ *google_protobuf1.Empty, err error) {
233 ctx, span := oc.StartSpan(ctx, "Resume")
234 defer span.End()
235 defer func() { oc.SetSpanStatus(span, err) }()
236
237 span.AddAttributes(trace.StringAttribute("tid", req.ID))
238
239 if s.isSandbox {
240 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
241 }
242
243 r, e := s.resumeInternal(ctx, req)
244 return r, errdefs.ToGRPC(e)
245 }
246
247 func (s *service) Checkpoint(ctx context.Context, req *task.CheckpointTaskRequest) (_ *google_protobuf1.Empty, err error) {
248 ctx, span := oc.StartSpan(ctx, "Checkpoint")
249 defer span.End()
250 defer func() { oc.SetSpanStatus(span, err) }()
251
252 span.AddAttributes(
253 trace.StringAttribute("tid", req.ID),
254 trace.StringAttribute("path", req.Path))
255
256 if s.isSandbox {
257 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
258 }
259
260 r, e := s.checkpointInternal(ctx, req)
261 return r, errdefs.ToGRPC(e)
262 }
263
264 func (s *service) Kill(ctx context.Context, req *task.KillRequest) (_ *google_protobuf1.Empty, err error) {
265 ctx, span := oc.StartSpan(ctx, "Kill")
266 defer span.End()
267 defer func() { oc.SetSpanStatus(span, err) }()
268
269 span.AddAttributes(
270 trace.StringAttribute("tid", req.ID),
271 trace.StringAttribute("eid", req.ExecID),
272 trace.Int64Attribute("signal", int64(req.Signal)),
273 trace.BoolAttribute("all", req.All))
274
275 if s.isSandbox {
276 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
277 }
278
279 r, e := s.killInternal(ctx, req)
280 return r, errdefs.ToGRPC(e)
281 }
282
283 func (s *service) Exec(ctx context.Context, req *task.ExecProcessRequest) (_ *google_protobuf1.Empty, err error) {
284 ctx, span := oc.StartSpan(ctx, "Exec")
285 defer span.End()
286 defer func() { oc.SetSpanStatus(span, err) }()
287
288 span.AddAttributes(
289 trace.StringAttribute("tid", req.ID),
290 trace.StringAttribute("eid", req.ExecID),
291 trace.BoolAttribute("terminal", req.Terminal),
292 trace.StringAttribute("stdin", req.Stdin),
293 trace.StringAttribute("stdout", req.Stdout),
294 trace.StringAttribute("stderr", req.Stderr))
295
296 if s.isSandbox {
297 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
298 }
299
300 r, e := s.execInternal(ctx, req)
301 return r, errdefs.ToGRPC(e)
302 }
303
304 func (s *service) DiagExecInHost(ctx context.Context, req *shimdiag.ExecProcessRequest) (_ *shimdiag.ExecProcessResponse, err error) {
305 ctx, span := oc.StartSpan(ctx, "DiagExecInHost")
306 defer span.End()
307 defer func() { oc.SetSpanStatus(span, err) }()
308
309 span.AddAttributes(
310 trace.StringAttribute("args", strings.Join(req.Args, " ")),
311 trace.StringAttribute("workdir", req.Workdir),
312 trace.BoolAttribute("terminal", req.Terminal),
313 trace.StringAttribute("stdin", req.Stdin),
314 trace.StringAttribute("stdout", req.Stdout),
315 trace.StringAttribute("stderr", req.Stderr))
316
317 if s.isSandbox {
318 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
319 }
320
321 r, e := s.diagExecInHostInternal(ctx, req)
322 return r, errdefs.ToGRPC(e)
323 }
324
325 func (s *service) DiagShare(ctx context.Context, req *shimdiag.ShareRequest) (_ *shimdiag.ShareResponse, err error) {
326 ctx, span := oc.StartSpan(ctx, "DiagShare")
327 defer span.End()
328 defer func() { oc.SetSpanStatus(span, err) }()
329
330 span.AddAttributes(
331 trace.StringAttribute("hostpath", req.HostPath),
332 trace.StringAttribute("uvmpath", req.UvmPath),
333 trace.BoolAttribute("readonly", req.ReadOnly))
334
335 if s.isSandbox {
336 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
337 }
338
339 r, e := s.diagShareInternal(ctx, req)
340 return r, errdefs.ToGRPC(e)
341 }
342
343 func (s *service) DiagTasks(ctx context.Context, req *shimdiag.TasksRequest) (_ *shimdiag.TasksResponse, err error) {
344 ctx, span := oc.StartSpan(ctx, "DiagTasks")
345 defer span.End()
346 defer func() { oc.SetSpanStatus(span, err) }()
347
348 span.AddAttributes(
349 trace.BoolAttribute("execs", req.Execs))
350
351 if s.isSandbox {
352 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
353 }
354
355 r, e := s.diagTasksInternal(ctx, req)
356 return r, errdefs.ToGRPC(e)
357 }
358
359 func (s *service) ResizePty(ctx context.Context, req *task.ResizePtyRequest) (_ *google_protobuf1.Empty, err error) {
360 ctx, span := oc.StartSpan(ctx, "ResizePty")
361 defer span.End()
362 defer func() { oc.SetSpanStatus(span, err) }()
363
364 span.AddAttributes(
365 trace.StringAttribute("tid", req.ID),
366 trace.StringAttribute("eid", req.ExecID),
367 trace.Int64Attribute("width", int64(req.Width)),
368 trace.Int64Attribute("height", int64(req.Height)))
369
370 if s.isSandbox {
371 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
372 }
373
374 r, e := s.resizePtyInternal(ctx, req)
375 return r, errdefs.ToGRPC(e)
376 }
377
378 func (s *service) CloseIO(ctx context.Context, req *task.CloseIORequest) (_ *google_protobuf1.Empty, err error) {
379 ctx, span := oc.StartSpan(ctx, "CloseIO")
380 defer span.End()
381 defer func() { oc.SetSpanStatus(span, err) }()
382
383 span.AddAttributes(
384 trace.StringAttribute("tid", req.ID),
385 trace.StringAttribute("eid", req.ExecID),
386 trace.BoolAttribute("stdin", req.Stdin))
387
388 if s.isSandbox {
389 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
390 }
391
392 r, e := s.closeIOInternal(ctx, req)
393 return r, errdefs.ToGRPC(e)
394 }
395
396 func (s *service) Update(ctx context.Context, req *task.UpdateTaskRequest) (_ *google_protobuf1.Empty, err error) {
397 ctx, span := oc.StartSpan(ctx, "Update")
398 defer span.End()
399 defer func() { oc.SetSpanStatus(span, err) }()
400
401 span.AddAttributes(trace.StringAttribute("tid", req.ID))
402
403 if s.isSandbox {
404 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
405 }
406
407 r, e := s.updateInternal(ctx, req)
408 return r, errdefs.ToGRPC(e)
409 }
410
411 func (s *service) Wait(ctx context.Context, req *task.WaitRequest) (resp *task.WaitResponse, err error) {
412 ctx, span := oc.StartSpan(ctx, "Wait")
413 defer span.End()
414 defer func() {
415 if resp != nil {
416 span.AddAttributes(
417 trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
418 trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
419 }
420 oc.SetSpanStatus(span, err)
421 }()
422
423 span.AddAttributes(
424 trace.StringAttribute("tid", req.ID),
425 trace.StringAttribute("eid", req.ExecID))
426
427 if s.isSandbox {
428 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
429 }
430
431 r, e := s.waitInternal(ctx, req)
432 return r, errdefs.ToGRPC(e)
433 }
434
435 func (s *service) Stats(ctx context.Context, req *task.StatsRequest) (_ *task.StatsResponse, err error) {
436 ctx, span := oc.StartSpan(ctx, "Stats")
437 defer span.End()
438 defer func() { oc.SetSpanStatus(span, err) }()
439
440 span.AddAttributes(trace.StringAttribute("tid", req.ID))
441
442 if s.isSandbox {
443 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
444 }
445
446 r, e := s.statsInternal(ctx, req)
447 return r, errdefs.ToGRPC(e)
448 }
449
450 func (s *service) Connect(ctx context.Context, req *task.ConnectRequest) (resp *task.ConnectResponse, err error) {
451 ctx, span := oc.StartSpan(ctx, "Connect")
452 defer span.End()
453 defer func() {
454 if resp != nil {
455 span.AddAttributes(
456 trace.Int64Attribute("shimPid", int64(resp.ShimPid)),
457 trace.Int64Attribute("taskPid", int64(resp.TaskPid)),
458 trace.StringAttribute("version", resp.Version))
459 }
460 oc.SetSpanStatus(span, err)
461 }()
462
463 span.AddAttributes(trace.StringAttribute("tid", req.ID))
464
465 if s.isSandbox {
466 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
467 }
468
469 r, e := s.connectInternal(ctx, req)
470 return r, errdefs.ToGRPC(e)
471 }
472
473 func (s *service) Shutdown(ctx context.Context, req *task.ShutdownRequest) (_ *google_protobuf1.Empty, err error) {
474 ctx, span := oc.StartSpan(ctx, "Shutdown")
475 defer span.End()
476 defer func() { oc.SetSpanStatus(span, err) }()
477
478 span.AddAttributes(trace.StringAttribute("tid", req.ID))
479
480 if s.isSandbox {
481 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
482 }
483
484 r, e := s.shutdownInternal(ctx, req)
485 return r, errdefs.ToGRPC(e)
486 }
487
488 func (s *service) DiagStacks(ctx context.Context, req *shimdiag.StacksRequest) (*shimdiag.StacksResponse, error) {
489 if s == nil {
490 return nil, nil
491 }
492 ctx, span := oc.StartSpan(ctx, "DiagStacks")
493 defer span.End()
494
495 span.AddAttributes(trace.StringAttribute("tid", s.tid))
496
497 if s.isSandbox {
498 span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
499 }
500
501 buf := make([]byte, 4096)
502 for {
503 buf = buf[:runtime.Stack(buf, true)]
504 if len(buf) < cap(buf) {
505 break
506 }
507 buf = make([]byte, 2*len(buf))
508 }
509 resp := &shimdiag.StacksResponse{Stacks: string(buf)}
510
511 t, _ := s.getTask(s.tid)
512 if t != nil {
513 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
514 defer cancel()
515 resp.GuestStacks = t.DumpGuestStacks(ctx)
516 }
517 return resp, nil
518 }
519
520 func (s *service) DiagPid(ctx context.Context, req *shimdiag.PidRequest) (*shimdiag.PidResponse, error) {
521 if s == nil {
522 return nil, nil
523 }
524 ctx, span := oc.StartSpan(ctx, "DiagPid")
525 defer span.End()
526
527 span.AddAttributes(trace.StringAttribute("tid", s.tid))
528
529 return &shimdiag.PidResponse{
530 Pid: int32(os.Getpid()),
531 }, nil
532 }
533
534 func (s *service) ComputeProcessorInfo(ctx context.Context, req *extendedtask.ComputeProcessorInfoRequest) (*extendedtask.ComputeProcessorInfoResponse, error) {
535 ctx, span := oc.StartSpan(ctx, "ComputeProcessorInfo")
536 defer span.End()
537
538 span.AddAttributes(trace.StringAttribute("tid", s.tid))
539
540 r, e := s.computeProcessorInfoInternal(ctx, req)
541 return r, errdefs.ToGRPC(e)
542 }
543
544 func (s *service) Done() <-chan struct{} {
545 return s.shutdown
546 }
547
548 func (s *service) IsShutdown() bool {
549 select {
550 case <-s.shutdown:
551 return true
552 default:
553 return false
554 }
555 }
556
View as plain text