1
2
3 package jobcontainers
4
5 import (
6 "context"
7 "fmt"
8 "os"
9 "path/filepath"
10 "regexp"
11 "strings"
12 "sync"
13 "time"
14 "unsafe"
15
16 "github.com/Microsoft/go-winio/pkg/guid"
17 "github.com/Microsoft/hcsshim/internal/conpty"
18 "github.com/Microsoft/hcsshim/internal/cow"
19 "github.com/Microsoft/hcsshim/internal/exec"
20 "github.com/Microsoft/hcsshim/internal/hcs"
21 "github.com/Microsoft/hcsshim/internal/hcs/schema1"
22 hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
23 "github.com/Microsoft/hcsshim/internal/jobobject"
24 "github.com/Microsoft/hcsshim/internal/log"
25 "github.com/Microsoft/hcsshim/internal/queue"
26 "github.com/Microsoft/hcsshim/internal/resources"
27 "github.com/Microsoft/hcsshim/internal/winapi"
28 specs "github.com/opencontainers/runtime-spec/specs-go"
29 "github.com/pkg/errors"
30 "golang.org/x/sys/windows"
31 )
32
33 var (
34 fileBindingSupport bool
35 checkBindSupportOnce sync.Once
36 )
37
38 const (
39
40 jobContainerNameFmt = "JobContainer_%s"
41
42
43 sandboxMountPointEnvVar = "CONTAINER_SANDBOX_MOUNT_POINT"
44 )
45
46
47
48
49
50
51 func splitArgs(cmdLine string) []string {
52 r := regexp.MustCompile(`[^\s"]+|"([^"]*)"`)
53 return r.FindAllString(cmdLine, -1)
54 }
55
56 type initProc struct {
57 initDoOnce sync.Once
58 proc *JobProcess
59 initBlock chan struct{}
60 }
61
62
63 type JobContainer struct {
64 id string
65
66 spec *specs.Spec
67
68 job *jobobject.JobObject
69
70
71
72 rootfsLocation string
73
74 closedWaitOnce sync.Once
75 init initProc
76 token windows.Token
77 localUserAccount string
78 startTimestamp time.Time
79 exited chan struct{}
80 waitBlock chan struct{}
81 waitError error
82 }
83
84
85 var (
86 _ cow.ProcessHost = &JobContainer{}
87 _ cow.Container = &JobContainer{}
88 )
89
90 func newJobContainer(id string, s *specs.Spec) *JobContainer {
91 return &JobContainer{
92 id: id,
93 spec: s,
94 waitBlock: make(chan struct{}),
95 exited: make(chan struct{}),
96 init: initProc{initBlock: make(chan struct{})},
97 }
98 }
99
100
101 func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, _ *resources.Resources, err error) {
102 log.G(ctx).WithField("id", id).Debug("Creating job container")
103
104 if s == nil {
105 return nil, nil, errors.New("Spec must be supplied")
106 }
107
108 if id == "" {
109 g, err := guid.NewV4()
110 if err != nil {
111 return nil, nil, err
112 }
113 id = g.String()
114 }
115
116 container := newJobContainer(id, s)
117
118
119 options := &jobobject.Options{
120 Name: fmt.Sprintf(jobContainerNameFmt, id),
121 Notifications: true,
122 EnableIOTracking: true,
123 }
124 container.job, err = jobobject.Create(ctx, options)
125 if err != nil {
126 return nil, nil, fmt.Errorf("failed to create job object: %w", err)
127 }
128
129
130
131 if err := container.job.SetTerminateOnLastHandleClose(); err != nil {
132 return nil, nil, fmt.Errorf("failed to set terminate on last handle close on job container: %w", err)
133 }
134
135 r := resources.NewContainerResources(id)
136 defer func() {
137 if err != nil {
138 container.Close()
139 _ = resources.ReleaseResources(ctx, r, nil, true)
140 }
141 }()
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 checkBindSupportOnce.Do(func() {
185 bindDLL := `C:\windows\system32\bindfltapi.dll`
186 if _, err := os.Stat(bindDLL); err == nil {
187 fileBindingSupport = true
188 }
189 })
190
191 var closer resources.ResourceCloser
192 if fileBindingSupport {
193 closer, err = container.bindSetup(ctx, s)
194 } else {
195 closer, err = container.fallbackSetup(ctx, s)
196 }
197 if err != nil {
198 return nil, nil, err
199 }
200 r.SetLayers(closer)
201
202 volumeGUIDRegex := `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}(|\\)$`
203 if matched, err := regexp.MatchString(volumeGUIDRegex, s.Root.Path); !matched || err != nil {
204 return nil, nil, fmt.Errorf(`invalid container spec - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, s.Root.Path)
205 }
206
207 limits, err := specToLimits(ctx, id, s)
208 if err != nil {
209 return nil, nil, fmt.Errorf("failed to convert OCI spec to job object limits: %w", err)
210 }
211
212
213 if err := container.job.SetResourceLimits(limits); err != nil {
214 return nil, nil, fmt.Errorf("failed to set resource limits: %w", err)
215 }
216
217 go container.waitBackground(ctx)
218 return container, r, nil
219 }
220
221
222
223 func (c *JobContainer) CreateProcess(ctx context.Context, config interface{}) (_ cow.Process, err error) {
224 conf, ok := config.(*hcsschema.ProcessParameters)
225 if !ok {
226 return nil, errors.New("unsupported process config passed in")
227 }
228
229 removeDriveLetter := func(name string) string {
230
231
232 if len(name) == 2 && name[1] == ':' {
233 name = "\\"
234 } else if len(name) > 2 && name[1] == ':' {
235 name = name[2:]
236 }
237 return name
238 }
239
240 workDir := c.rootfsLocation
241 if conf.WorkingDirectory != "" {
242 var changed bool
243
244
245
246 workDir, changed = c.replaceWithMountPoint(conf.WorkingDirectory)
247
248
249
250
251
252
253
254
255
256
257 if !changed && !fileBindingSupport {
258 workDir = filepath.Join(c.rootfsLocation, removeDriveLetter(workDir))
259 }
260 }
261
262
263 if _, err := os.Stat(workDir); os.IsNotExist(err) {
264 if err := os.MkdirAll(workDir, 0700); err != nil {
265 return nil, err
266 }
267 }
268
269
270
271 if c.token == 0 {
272 if inheritUserTokenIsSet(c.spec.Annotations) {
273 c.token, err = openCurrentProcessToken()
274 if err != nil {
275 return nil, err
276 }
277 } else {
278 c.token, err = c.processToken(ctx, conf.User)
279 if err != nil {
280 return nil, fmt.Errorf("failed to create user process token: %w", err)
281 }
282 }
283 }
284
285 env, err := defaultEnvBlock(c.token)
286 if err != nil {
287 return nil, errors.Wrap(err, "failed to get default environment block")
288 }
289
290
291 var envs []string
292 for k, v := range conf.Environment {
293 expanded, _ := c.replaceWithMountPoint(v)
294 envs = append(envs, k+"="+expanded)
295 }
296 env = append(env, envs...)
297 env = append(env, sandboxMountPointEnvVar+"="+c.rootfsLocation)
298
299 var path string
300 for idx, envVar := range env {
301 ev := strings.TrimSpace(envVar)
302 if strings.HasPrefix(strings.ToLower(ev), "path=") {
303
304 rootfsLoc := c.rootfsLocation
305 if rune(ev[len(ev)-1]) != ';' {
306 rootfsLoc = ";" + rootfsLoc
307 }
308
309
310
311 extraPaths := `;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\Wbem`
312 path = ev + rootfsLoc + extraPaths
313 env[idx] = path
314 }
315 }
316
317
318
319 commandLine, _ := c.replaceWithMountPoint(conf.CommandLine)
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 if fileBindingSupport {
339 commandLine = "cmd /c " + commandLine
340 }
341
342
343
344 absPath, commandLine, err := getApplicationName(commandLine, workDir, strings.Trim(path, "PATH="))
345 if err != nil {
346 return nil, errors.Wrapf(err, "failed to get application name from commandline %q", conf.CommandLine)
347 }
348
349
350
351
352
353
354
355 if err := os.Setenv("PATHEXT", ".COM;.EXE;.BAT;.CMD; "); err != nil {
356 return nil, errors.Wrap(err, "failed to set PATHEXT")
357 }
358
359 var cpty *conpty.Pty
360 if conf.EmulateConsole {
361 height := int16(25)
362 width := int16(80)
363
364
365 if len(conf.ConsoleSize) == 2 {
366 if conf.ConsoleSize[0] != 0 {
367 height = int16(conf.ConsoleSize[0])
368 }
369 if conf.ConsoleSize[1] != 0 {
370 width = int16(conf.ConsoleSize[1])
371 }
372 }
373
374 cpty, err = conpty.Create(width, height, 0)
375 if err != nil {
376 return nil, err
377 }
378 }
379
380 cmd, err := exec.New(
381 absPath,
382 commandLine,
383 exec.WithDir(workDir),
384 exec.WithEnv(env),
385 exec.WithToken(c.token),
386 exec.WithJobObject(c.job),
387 exec.WithConPty(cpty),
388 exec.WithProcessFlags(windows.CREATE_BREAKAWAY_FROM_JOB),
389 exec.WithStdio(conf.CreateStdOutPipe, conf.CreateStdErrPipe, conf.CreateStdInPipe),
390 )
391 if err != nil {
392 return nil, err
393 }
394 process := newProcess(cmd, cpty)
395
396
397 if conf.CreateStdInPipe {
398 process.stdin = process.cmd.Stdin()
399 }
400
401 if conf.CreateStdOutPipe {
402 process.stdout = process.cmd.Stdout()
403 }
404
405 if conf.CreateStdErrPipe {
406 process.stderr = process.cmd.Stderr()
407 }
408
409 defer func() {
410 if err != nil {
411 process.Close()
412 }
413 }()
414
415 if err = process.Start(); err != nil {
416 return nil, errors.Wrap(err, "failed to start host process")
417 }
418
419
420 c.init.initDoOnce.Do(func() {
421 c.init.proc = process
422 close(c.init.initBlock)
423 })
424
425
426 go c.pollJobMsgs(ctx)
427 go process.waitBackground(ctx)
428 return process, nil
429 }
430
431 func (c *JobContainer) Modify(ctx context.Context, config interface{}) (err error) {
432 return errors.New("modify not supported for job containers")
433 }
434
435
436
437 func (c *JobContainer) Start(ctx context.Context) error {
438 c.startTimestamp = time.Now()
439 return nil
440 }
441
442
443 func (c *JobContainer) Close() error {
444
445
446 var closeErr bool
447 if err := c.job.Close(); err != nil {
448 log.G(context.Background()).WithError(err).WithField("cid", c.id).Warning("failed to close job object")
449 closeErr = true
450 }
451
452 if err := c.token.Close(); err != nil {
453 log.G(context.Background()).WithError(err).WithField("cid", c.id).Warning("failed to close token")
454 closeErr = true
455 }
456
457
458 if c.localUserAccount != "" {
459 if err := winapi.NetUserDel("", c.localUserAccount); err != nil {
460 log.G(context.Background()).WithError(err).WithField("cid", c.id).Warning("failed to delete local account")
461 closeErr = true
462 }
463 }
464
465 c.closedWaitOnce.Do(func() {
466 c.waitError = hcs.ErrAlreadyClosed
467 close(c.waitBlock)
468 })
469 if closeErr {
470 return errors.New("failed to close one or more job container resources")
471 }
472 return nil
473 }
474
475
476 func (c *JobContainer) ID() string {
477 return c.id
478 }
479
480
481 func (c *JobContainer) Shutdown(ctx context.Context) error {
482 log.G(ctx).WithField("id", c.id).Debug("shutting down job container")
483
484 ctx, cancel := context.WithTimeout(ctx, time.Second*5)
485 defer cancel()
486 return c.shutdown(ctx)
487 }
488
489
490
491
492
493 func (c *JobContainer) shutdown(ctx context.Context) error {
494 pids, err := c.job.Pids()
495 if err != nil {
496 return errors.Wrap(err, "failed to get pids in container")
497 }
498
499 if len(pids) == 0 {
500 return nil
501 }
502
503 for _, pid := range pids {
504
505 if err := signalProcess(pid, windows.CTRL_SHUTDOWN_EVENT); err != nil {
506 log.G(ctx).WithField("pid", pid).Error("failed to signal process in job container")
507 }
508 }
509
510 select {
511 case <-c.exited:
512 case <-ctx.Done():
513 return c.Terminate(ctx)
514 }
515 return nil
516 }
517
518
519
520
521 func (c *JobContainer) PropertiesV2(ctx context.Context, types ...hcsschema.PropertyType) (*hcsschema.Properties, error) {
522 if len(types) == 0 {
523 return nil, errors.New("no property types supplied for PropertiesV2 call")
524 }
525 if types[0] != hcsschema.PTStatistics {
526 return nil, errors.New("PTStatistics is the only supported property type for job containers")
527 }
528
529
530 timestamp := time.Now()
531
532 memInfo, err := c.job.QueryMemoryStats()
533 if err != nil {
534 return nil, errors.Wrap(err, "failed to query for job containers memory information")
535 }
536
537 processorInfo, err := c.job.QueryProcessorStats()
538 if err != nil {
539 return nil, errors.Wrap(err, "failed to query for job containers processor information")
540 }
541
542 storageInfo, err := c.job.QueryStorageStats()
543 if err != nil {
544 return nil, errors.Wrap(err, "failed to query for job containers storage information")
545 }
546
547 privateWorkingSet, err := c.job.QueryPrivateWorkingSet()
548 if err != nil {
549 return nil, fmt.Errorf("failed to get private working set for container: %w", err)
550 }
551
552 return &hcsschema.Properties{
553 Statistics: &hcsschema.Statistics{
554 Timestamp: timestamp,
555 Uptime100ns: uint64(time.Since(c.startTimestamp).Nanoseconds()) / 100,
556 ContainerStartTime: c.startTimestamp,
557 Memory: &hcsschema.MemoryStats{
558 MemoryUsageCommitBytes: memInfo.JobMemory,
559 MemoryUsageCommitPeakBytes: memInfo.PeakJobMemoryUsed,
560 MemoryUsagePrivateWorkingSetBytes: privateWorkingSet,
561 },
562 Processor: &hcsschema.ProcessorStats{
563 RuntimeKernel100ns: uint64(processorInfo.TotalKernelTime),
564 RuntimeUser100ns: uint64(processorInfo.TotalUserTime),
565 TotalRuntime100ns: uint64(processorInfo.TotalKernelTime + processorInfo.TotalUserTime),
566 },
567 Storage: &hcsschema.StorageStats{
568 ReadCountNormalized: uint64(storageInfo.ReadStats.IoCount),
569 ReadSizeBytes: storageInfo.ReadStats.TotalSize,
570 WriteCountNormalized: uint64(storageInfo.WriteStats.IoCount),
571 WriteSizeBytes: storageInfo.WriteStats.TotalSize,
572 },
573 },
574 }, nil
575 }
576
577
578
579
580 func (c *JobContainer) Properties(ctx context.Context, types ...schema1.PropertyType) (*schema1.ContainerProperties, error) {
581 if len(types) == 0 {
582 return nil, errors.New("no property types supplied for Properties call")
583 }
584 if types[0] != schema1.PropertyTypeProcessList {
585 return nil, errors.New("ProcessList is the only supported property type for job containers")
586 }
587
588 var processList []schema1.ProcessListItem
589 err := forEachProcessInfo(c.job, func(procInfo *winapi.SYSTEM_PROCESS_INFORMATION) {
590 proc := schema1.ProcessListItem{
591 CreateTimestamp: time.Unix(0, procInfo.CreateTime),
592 ProcessId: uint32(procInfo.UniqueProcessID),
593 ImageName: procInfo.ImageName.String(),
594 UserTime100ns: uint64(procInfo.UserTime),
595 KernelTime100ns: uint64(procInfo.KernelTime),
596 MemoryCommitBytes: uint64(procInfo.PrivatePageCount),
597 MemoryWorkingSetPrivateBytes: uint64(procInfo.WorkingSetPrivateSize),
598 MemoryWorkingSetSharedBytes: uint64(procInfo.WorkingSetSize) - uint64(procInfo.WorkingSetPrivateSize),
599 }
600 processList = append(processList, proc)
601 })
602 if err != nil {
603 return nil, errors.Wrap(err, "failed to get process ")
604 }
605
606 return &schema1.ContainerProperties{ProcessList: processList}, nil
607 }
608
609
610 func (c *JobContainer) Terminate(ctx context.Context) error {
611 log.G(ctx).WithField("id", c.id).Debug("terminating job container")
612
613 if err := c.job.Terminate(1); err != nil {
614 return errors.Wrap(err, "failed to terminate job container")
615 }
616 return nil
617 }
618
619 func (c *JobContainer) WaitChannel() <-chan struct{} {
620 return c.waitBlock
621 }
622
623 func (c *JobContainer) WaitError() error {
624 return c.waitError
625 }
626
627
628
629 func (c *JobContainer) Wait() error {
630 <-c.WaitChannel()
631 return c.WaitError()
632 }
633
634 func (c *JobContainer) waitBackground(ctx context.Context) {
635
636 <-c.init.initBlock
637
638
639
640 <-c.init.proc.waitBlock
641
642 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
643 defer cancel()
644 if err := c.Shutdown(ctx); err != nil {
645 _ = c.Terminate(ctx)
646 }
647
648 c.closedWaitOnce.Do(func() {
649 c.waitError = c.init.proc.waitError
650 close(c.waitBlock)
651 })
652 }
653
654
655 func (c *JobContainer) pollJobMsgs(ctx context.Context) {
656 for {
657 notif, err := c.job.PollNotification()
658 if err != nil {
659
660
661 if err == queue.ErrQueueClosed || err == jobobject.ErrNotRegistered {
662 return
663 }
664 log.G(ctx).WithError(err).Warn("error while polling for job container notification")
665 }
666
667 switch msg := notif.(type) {
668
669 case jobobject.MsgAllProcessesExited:
670 close(c.exited)
671 return
672 case jobobject.MsgUnimplemented:
673 default:
674 log.G(ctx).WithField("message", msg).Warn("unknown job object notification encountered")
675 }
676 }
677 }
678
679
680 func (c *JobContainer) IsOCI() bool {
681 return false
682 }
683
684
685 func (c *JobContainer) OS() string {
686 return "windows"
687 }
688
689
690
691 func forEachProcessInfo(job *jobobject.JobObject, work func(*winapi.SYSTEM_PROCESS_INFORMATION)) error {
692 procInfos, err := systemProcessInformation()
693 if err != nil {
694 return err
695 }
696
697 pids, err := job.Pids()
698 if err != nil {
699 return err
700 }
701
702 pidsMap := make(map[uint32]struct{})
703 for _, pid := range pids {
704 pidsMap[pid] = struct{}{}
705 }
706
707 for _, procInfo := range procInfos {
708 if _, ok := pidsMap[uint32(procInfo.UniqueProcessID)]; ok {
709 work(procInfo)
710 }
711 }
712 return nil
713 }
714
715
716 func systemProcessInformation() ([]*winapi.SYSTEM_PROCESS_INFORMATION, error) {
717 var (
718 systemProcInfo *winapi.SYSTEM_PROCESS_INFORMATION
719 procInfos []*winapi.SYSTEM_PROCESS_INFORMATION
720
721
722 size = uint32(1024 * 512)
723 bounds uintptr
724 )
725 for {
726 b := make([]byte, size)
727 systemProcInfo = (*winapi.SYSTEM_PROCESS_INFORMATION)(unsafe.Pointer(&b[0]))
728 status := winapi.NtQuerySystemInformation(
729 winapi.SystemProcessInformation,
730 unsafe.Pointer(systemProcInfo),
731 size,
732 &size,
733 )
734 if winapi.NTSuccess(status) {
735
736
737 bounds = uintptr(unsafe.Pointer(&b[len(b)-1]))
738 break
739 } else if status != winapi.STATUS_INFO_LENGTH_MISMATCH {
740 return nil, winapi.RtlNtStatusToDosError(status)
741 }
742 }
743
744 for {
745 if uintptr(unsafe.Pointer(systemProcInfo))+uintptr(systemProcInfo.NextEntryOffset) >= bounds {
746
747 return nil, errors.New("system process info entry exceeds allocated buffer")
748 }
749 procInfos = append(procInfos, systemProcInfo)
750 if systemProcInfo.NextEntryOffset == 0 {
751 break
752 }
753 systemProcInfo = (*winapi.SYSTEM_PROCESS_INFORMATION)(unsafe.Pointer(uintptr(unsafe.Pointer(systemProcInfo)) + uintptr(systemProcInfo.NextEntryOffset)))
754 }
755
756 return procInfos, nil
757 }
758
759
760
761 func (c *JobContainer) replaceWithMountPoint(str string) (string, bool) {
762 mountPoint := c.rootfsLocation
763 newStr := strings.ReplaceAll(str, "%"+sandboxMountPointEnvVar+"%", mountPoint[:len(mountPoint)-1])
764 newStr = strings.ReplaceAll(newStr, "$env:"+sandboxMountPointEnvVar, mountPoint[:len(mountPoint)-1])
765 return newStr, str != newStr
766 }
767
768 func (c *JobContainer) bindSetup(ctx context.Context, s *specs.Spec) (_ resources.ResourceCloser, err error) {
769
770 if err := c.job.PromoteToSilo(); err != nil {
771 return nil, err
772 }
773
774 closer, err := c.mountLayers(ctx, c.id, s, "")
775 if err != nil {
776 return nil, fmt.Errorf("failed to mount container layers: %w", err)
777 }
778 defer func() {
779 if err != nil {
780 _ = closer.Release(ctx)
781 }
782 }()
783
784 rootfsLocation := defaultSiloRootfsLocation
785 if loc := customRootfsLocation(s.Annotations); loc != "" {
786 rootfsLocation = loc
787 }
788
789 if err := c.setupRootfsBinding(rootfsLocation, s.Root.Path); err != nil {
790 return nil, err
791 }
792 c.rootfsLocation = rootfsLocation
793 if err := c.setupMounts(ctx, s); err != nil {
794 return nil, err
795 }
796 return closer, nil
797 }
798
799
800
801 func (c *JobContainer) fallbackSetup(ctx context.Context, s *specs.Spec) (_ resources.ResourceCloser, err error) {
802 rootfsLocation := fmt.Sprintf(fallbackRootfsFormat, c.id)
803 if loc := customRootfsLocation(s.Annotations); loc != "" {
804 rootfsLocation = filepath.Join(loc, c.id)
805 }
806 closer, err := c.mountLayers(ctx, c.id, s, rootfsLocation)
807 if err != nil {
808 return nil, fmt.Errorf("failed to mount container layers: %w", err)
809 }
810 defer func() {
811 if err != nil {
812 _ = closer.Release(ctx)
813 }
814 }()
815 c.rootfsLocation = rootfsLocation
816 if err := fallbackMountSetup(s, c.rootfsLocation); err != nil {
817 return nil, err
818 }
819 return closer, nil
820 }
821
View as plain text