1
2
3 package jobobject
4
5 import (
6 "context"
7 "errors"
8 "fmt"
9 "os"
10 "path/filepath"
11 "sync"
12 "sync/atomic"
13 "unsafe"
14
15 "github.com/Microsoft/hcsshim/internal/queue"
16 "github.com/Microsoft/hcsshim/internal/winapi"
17 "golang.org/x/sys/windows"
18 )
19
20
21
22
23 type JobObject struct {
24 handle windows.Handle
25
26
27 silo uint32
28 mq *queue.MessageQueue
29 handleLock sync.RWMutex
30 }
31
32
33 type JobLimits struct {
34 CPULimit uint32
35 CPUWeight uint32
36 MemoryLimitInBytes uint64
37 MaxIOPS int64
38 MaxBandwidth int64
39 }
40
41 type CPURateControlType uint32
42
43 const (
44 WeightBased CPURateControlType = iota
45 RateBased
46 )
47
48
49 const (
50 cpuLimitMin = 1
51 cpuLimitMax = 10000
52 cpuWeightMin = 1
53 cpuWeightMax = 9
54 )
55
56 var (
57 ErrAlreadyClosed = errors.New("the handle has already been closed")
58 ErrNotRegistered = errors.New("job is not registered to receive notifications")
59 ErrNotSilo = errors.New("job is not a silo")
60 )
61
62
63 type Options struct {
64
65 Name string
66
67
68 Notifications bool
69
70
71 UseNTVariant bool
72
73
74 Silo bool
75
76
77 EnableIOTracking bool
78 }
79
80
81
82
83
84
85
86
87
88
89 func Create(ctx context.Context, options *Options) (_ *JobObject, err error) {
90 if options == nil {
91 options = &Options{}
92 }
93
94 var jobName *winapi.UnicodeString
95 if options.Name != "" {
96 jobName, err = winapi.NewUnicodeString(options.Name)
97 if err != nil {
98 return nil, err
99 }
100 }
101
102 var jobHandle windows.Handle
103 if options.UseNTVariant {
104 oa := winapi.ObjectAttributes{
105 Length: unsafe.Sizeof(winapi.ObjectAttributes{}),
106 ObjectName: jobName,
107 Attributes: 0,
108 }
109 status := winapi.NtCreateJobObject(&jobHandle, winapi.JOB_OBJECT_ALL_ACCESS, &oa)
110 if status != 0 {
111 return nil, winapi.RtlNtStatusToDosError(status)
112 }
113 } else {
114 var jobNameBuf *uint16
115 if jobName != nil && jobName.Buffer != nil {
116 jobNameBuf = jobName.Buffer
117 }
118 jobHandle, err = windows.CreateJobObject(nil, jobNameBuf)
119 if err != nil {
120 return nil, err
121 }
122 }
123
124 defer func() {
125 if err != nil {
126 windows.Close(jobHandle)
127 }
128 }()
129
130 job := &JobObject{
131 handle: jobHandle,
132 }
133
134
135
136 if options.Notifications {
137 mq, err := setupNotifications(ctx, job)
138 if err != nil {
139 return nil, err
140 }
141 job.mq = mq
142 }
143
144 if options.EnableIOTracking {
145 if err := enableIOTracking(jobHandle); err != nil {
146 return nil, err
147 }
148 }
149
150 if options.Silo {
151
152 if err := job.SetTerminateOnLastHandleClose(); err != nil {
153 return nil, err
154 }
155 if err := job.PromoteToSilo(); err != nil {
156 return nil, err
157 }
158 }
159
160 return job, nil
161 }
162
163
164
165
166
167
168
169 func Open(ctx context.Context, options *Options) (_ *JobObject, err error) {
170 if options == nil || (options != nil && options.Name == "") {
171 return nil, errors.New("no job object name specified to open")
172 }
173
174 unicodeJobName, err := winapi.NewUnicodeString(options.Name)
175 if err != nil {
176 return nil, err
177 }
178
179 var jobHandle windows.Handle
180 if options.UseNTVariant {
181 oa := winapi.ObjectAttributes{
182 Length: unsafe.Sizeof(winapi.ObjectAttributes{}),
183 ObjectName: unicodeJobName,
184 Attributes: 0,
185 }
186 status := winapi.NtOpenJobObject(&jobHandle, winapi.JOB_OBJECT_ALL_ACCESS, &oa)
187 if status != 0 {
188 return nil, winapi.RtlNtStatusToDosError(status)
189 }
190 } else {
191 jobHandle, err = winapi.OpenJobObject(winapi.JOB_OBJECT_ALL_ACCESS, 0, unicodeJobName.Buffer)
192 if err != nil {
193 return nil, err
194 }
195 }
196
197 defer func() {
198 if err != nil {
199 windows.Close(jobHandle)
200 }
201 }()
202
203 job := &JobObject{
204 handle: jobHandle,
205 }
206
207 if isJobSilo(jobHandle) {
208 job.silo = 1
209 }
210
211
212
213 if options.Notifications {
214 mq, err := setupNotifications(ctx, job)
215 if err != nil {
216 return nil, err
217 }
218 job.mq = mq
219 }
220
221 return job, nil
222 }
223
224
225 func setupNotifications(ctx context.Context, job *JobObject) (*queue.MessageQueue, error) {
226 job.handleLock.RLock()
227 defer job.handleLock.RUnlock()
228
229 if job.handle == 0 {
230 return nil, ErrAlreadyClosed
231 }
232
233 ioInitOnce.Do(func() {
234 h, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0xffffffff)
235 if err != nil {
236 initIOErr = err
237 return
238 }
239 ioCompletionPort = h
240 go pollIOCP(ctx, h)
241 })
242
243 if initIOErr != nil {
244 return nil, initIOErr
245 }
246
247 mq := queue.NewMessageQueue()
248 jobMap.Store(uintptr(job.handle), mq)
249 if err := attachIOCP(job.handle, ioCompletionPort); err != nil {
250 jobMap.Delete(uintptr(job.handle))
251 return nil, fmt.Errorf("failed to attach job to IO completion port: %w", err)
252 }
253 return mq, nil
254 }
255
256
257
258
259
260
261 func (job *JobObject) PollNotification() (interface{}, error) {
262 if job.mq == nil {
263 return nil, ErrNotRegistered
264 }
265 return job.mq.Dequeue()
266 }
267
268
269
270
271 func (job *JobObject) UpdateProcThreadAttribute(attrList *windows.ProcThreadAttributeListContainer) error {
272 job.handleLock.RLock()
273 defer job.handleLock.RUnlock()
274
275 if job.handle == 0 {
276 return ErrAlreadyClosed
277 }
278
279 if err := attrList.Update(
280 winapi.PROC_THREAD_ATTRIBUTE_JOB_LIST,
281 unsafe.Pointer(&job.handle),
282 unsafe.Sizeof(job.handle),
283 ); err != nil {
284 return fmt.Errorf("failed to update proc thread attributes for job object: %w", err)
285 }
286
287 return nil
288 }
289
290
291 func (job *JobObject) Close() error {
292 job.handleLock.Lock()
293 defer job.handleLock.Unlock()
294
295 if job.handle == 0 {
296 return ErrAlreadyClosed
297 }
298
299 if err := windows.Close(job.handle); err != nil {
300 return err
301 }
302
303 if job.mq != nil {
304 job.mq.Close()
305 }
306
307
308 if _, ok := jobMap.Load(uintptr(job.handle)); ok {
309 jobMap.Delete(uintptr(job.handle))
310 }
311
312 job.handle = 0
313 return nil
314 }
315
316
317 func (job *JobObject) Assign(pid uint32) error {
318 job.handleLock.RLock()
319 defer job.handleLock.RUnlock()
320
321 if job.handle == 0 {
322 return ErrAlreadyClosed
323 }
324
325 if pid == 0 {
326 return errors.New("invalid pid: 0")
327 }
328 hProc, err := windows.OpenProcess(winapi.PROCESS_ALL_ACCESS, true, pid)
329 if err != nil {
330 return err
331 }
332 defer windows.Close(hProc)
333 return windows.AssignProcessToJobObject(job.handle, hProc)
334 }
335
336
337
338 func (job *JobObject) Terminate(exitCode uint32) error {
339 job.handleLock.RLock()
340 defer job.handleLock.RUnlock()
341 if job.handle == 0 {
342 return ErrAlreadyClosed
343 }
344 return windows.TerminateJobObject(job.handle, exitCode)
345 }
346
347
348 func (job *JobObject) Pids() ([]uint32, error) {
349 job.handleLock.RLock()
350 defer job.handleLock.RUnlock()
351
352 if job.handle == 0 {
353 return nil, ErrAlreadyClosed
354 }
355
356 info := winapi.JOBOBJECT_BASIC_PROCESS_ID_LIST{}
357 err := winapi.QueryInformationJobObject(
358 job.handle,
359 winapi.JobObjectBasicProcessIdList,
360 unsafe.Pointer(&info),
361 uint32(unsafe.Sizeof(info)),
362 nil,
363 )
364
365
366
367
368 if err == nil {
369 if info.NumberOfProcessIdsInList == 1 {
370 return []uint32{uint32(info.ProcessIdList[0])}, nil
371 }
372
373
374 return []uint32{}, nil
375 }
376
377 if err != winapi.ERROR_MORE_DATA {
378 return nil, fmt.Errorf("failed initial query for PIDs in job object: %w", err)
379 }
380
381 jobBasicProcessIDListSize := unsafe.Sizeof(info) + (unsafe.Sizeof(info.ProcessIdList[0]) * uintptr(info.NumberOfAssignedProcesses-1))
382 buf := make([]byte, jobBasicProcessIDListSize)
383 if err = winapi.QueryInformationJobObject(
384 job.handle,
385 winapi.JobObjectBasicProcessIdList,
386 unsafe.Pointer(&buf[0]),
387 uint32(len(buf)),
388 nil,
389 ); err != nil {
390 return nil, fmt.Errorf("failed to query for PIDs in job object: %w", err)
391 }
392
393 bufInfo := (*winapi.JOBOBJECT_BASIC_PROCESS_ID_LIST)(unsafe.Pointer(&buf[0]))
394 pids := make([]uint32, bufInfo.NumberOfProcessIdsInList)
395 for i, bufPid := range bufInfo.AllPids() {
396 pids[i] = uint32(bufPid)
397 }
398 return pids, nil
399 }
400
401
402 func (job *JobObject) QueryMemoryStats() (*winapi.JOBOBJECT_MEMORY_USAGE_INFORMATION, error) {
403 job.handleLock.RLock()
404 defer job.handleLock.RUnlock()
405
406 if job.handle == 0 {
407 return nil, ErrAlreadyClosed
408 }
409
410 info := winapi.JOBOBJECT_MEMORY_USAGE_INFORMATION{}
411 if err := winapi.QueryInformationJobObject(
412 job.handle,
413 winapi.JobObjectMemoryUsageInformation,
414 unsafe.Pointer(&info),
415 uint32(unsafe.Sizeof(info)),
416 nil,
417 ); err != nil {
418 return nil, fmt.Errorf("failed to query for job object memory stats: %w", err)
419 }
420 return &info, nil
421 }
422
423
424 func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_INFORMATION, error) {
425 job.handleLock.RLock()
426 defer job.handleLock.RUnlock()
427
428 if job.handle == 0 {
429 return nil, ErrAlreadyClosed
430 }
431
432 info := winapi.JOBOBJECT_BASIC_ACCOUNTING_INFORMATION{}
433 if err := winapi.QueryInformationJobObject(
434 job.handle,
435 winapi.JobObjectBasicAccountingInformation,
436 unsafe.Pointer(&info),
437 uint32(unsafe.Sizeof(info)),
438 nil,
439 ); err != nil {
440 return nil, fmt.Errorf("failed to query for job object process stats: %w", err)
441 }
442 return &info, nil
443 }
444
445
446
447
448 func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION, error) {
449 job.handleLock.RLock()
450 defer job.handleLock.RUnlock()
451
452 if job.handle == 0 {
453 return nil, ErrAlreadyClosed
454 }
455
456 info := winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION{
457 ControlFlags: winapi.JOBOBJECT_IO_ATTRIBUTION_CONTROL_ENABLE,
458 }
459 if err := winapi.QueryInformationJobObject(
460 job.handle,
461 winapi.JobObjectIoAttribution,
462 unsafe.Pointer(&info),
463 uint32(unsafe.Sizeof(info)),
464 nil,
465 ); err != nil {
466 return nil, fmt.Errorf("failed to query for job object storage stats: %w", err)
467 }
468 return &info, nil
469 }
470
471
472
473
474 func (job *JobObject) ApplyFileBinding(root, target string, readOnly bool) error {
475 job.handleLock.RLock()
476 defer job.handleLock.RUnlock()
477
478 if job.handle == 0 {
479 return ErrAlreadyClosed
480 }
481
482 if !job.isSilo() {
483 return ErrNotSilo
484 }
485
486
487
488
489 if err := os.MkdirAll(filepath.Dir(root), 0); err != nil {
490 return err
491 }
492
493 rootPtr, err := windows.UTF16PtrFromString(root)
494 if err != nil {
495 return err
496 }
497
498 targetPtr, err := windows.UTF16PtrFromString(target)
499 if err != nil {
500 return err
501 }
502
503 flags := winapi.BINDFLT_FLAG_USE_CURRENT_SILO_MAPPING
504 if readOnly {
505 flags |= winapi.BINDFLT_FLAG_READ_ONLY_MAPPING
506 }
507
508 if err := winapi.BfSetupFilter(
509 job.handle,
510 flags,
511 rootPtr,
512 targetPtr,
513 nil,
514 0,
515 ); err != nil {
516 return fmt.Errorf("failed to bind target %q to root %q for job object: %w", target, root, err)
517 }
518 return nil
519 }
520
521
522
523 func isJobSilo(h windows.Handle) bool {
524
525
526
527
528 type isSiloObj struct {
529 _ [16]byte
530 }
531 var siloInfo isSiloObj
532 err := winapi.QueryInformationJobObject(
533 h,
534 winapi.JobObjectSiloBasicInformation,
535 unsafe.Pointer(&siloInfo),
536 uint32(unsafe.Sizeof(siloInfo)),
537 nil,
538 )
539 return err == nil
540 }
541
542
543
544 func (job *JobObject) PromoteToSilo() error {
545 job.handleLock.RLock()
546 defer job.handleLock.RUnlock()
547
548 if job.handle == 0 {
549 return ErrAlreadyClosed
550 }
551
552 if job.isSilo() {
553 return nil
554 }
555
556 pids, err := job.Pids()
557 if err != nil {
558 return err
559 }
560
561 if len(pids) != 0 {
562 return fmt.Errorf("job cannot have running processes to be promoted to a silo, found %d running processes", len(pids))
563 }
564
565 _, err = windows.SetInformationJobObject(
566 job.handle,
567 winapi.JobObjectCreateSilo,
568 0,
569 0,
570 )
571 if err != nil {
572 return fmt.Errorf("failed to promote job to silo: %w", err)
573 }
574
575 atomic.StoreUint32(&job.silo, 1)
576 return nil
577 }
578
579
580 func (job *JobObject) isSilo() bool {
581 return atomic.LoadUint32(&job.silo) == 1
582 }
583
584
585
586 func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {
587 pids, err := job.Pids()
588 if err != nil {
589 return 0, err
590 }
591
592 openAndQueryWorkingSet := func(pid uint32) (uint64, error) {
593 h, err := windows.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, pid)
594 if err != nil {
595
596
597 return 0, nil
598 }
599 defer func() {
600 _ = windows.Close(h)
601 }()
602
603
604
605 var inJob int32
606 if err := winapi.IsProcessInJob(h, job.handle, &inJob); err != nil {
607
608
609 return 0, err
610 }
611
612
613 if inJob == 0 {
614 return 0, nil
615 }
616
617 var vmCounters winapi.VM_COUNTERS_EX2
618 status := winapi.NtQueryInformationProcess(
619 h,
620 winapi.ProcessVmCounters,
621 unsafe.Pointer(&vmCounters),
622 uint32(unsafe.Sizeof(vmCounters)),
623 nil,
624 )
625 if !winapi.NTSuccess(status) {
626 return 0, fmt.Errorf("failed to query information for process: %w", winapi.RtlNtStatusToDosError(status))
627 }
628 return uint64(vmCounters.PrivateWorkingSetSize), nil
629 }
630
631 var jobWorkingSetSize uint64
632 for _, pid := range pids {
633 workingSet, err := openAndQueryWorkingSet(pid)
634 if err != nil {
635 return 0, err
636 }
637 jobWorkingSetSize += workingSet
638 }
639
640 return jobWorkingSetSize, nil
641 }
642
643
644
645 func (job *JobObject) SetIOTracking() error {
646 job.handleLock.RLock()
647 defer job.handleLock.RUnlock()
648
649 if job.handle == 0 {
650 return ErrAlreadyClosed
651 }
652
653 return enableIOTracking(job.handle)
654 }
655
656 func enableIOTracking(job windows.Handle) error {
657 info := winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION{
658 ControlFlags: winapi.JOBOBJECT_IO_ATTRIBUTION_CONTROL_ENABLE,
659 }
660 if _, err := windows.SetInformationJobObject(
661 job,
662 winapi.JobObjectIoAttribution,
663 uintptr(unsafe.Pointer(&info)),
664 uint32(unsafe.Sizeof(info)),
665 ); err != nil {
666 return fmt.Errorf("failed to enable IO tracking on job object: %w", err)
667 }
668 return nil
669 }
670
View as plain text