aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/Microsoft/hcsshim/internal/jobobject
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/Microsoft/hcsshim/internal/jobobject')
-rw-r--r--vendor/github.com/Microsoft/hcsshim/internal/jobobject/iocp.go111
-rw-r--r--vendor/github.com/Microsoft/hcsshim/internal/jobobject/jobobject.go499
-rw-r--r--vendor/github.com/Microsoft/hcsshim/internal/jobobject/limits.go315
3 files changed, 925 insertions, 0 deletions
diff --git a/vendor/github.com/Microsoft/hcsshim/internal/jobobject/iocp.go b/vendor/github.com/Microsoft/hcsshim/internal/jobobject/iocp.go
new file mode 100644
index 000000000..3d640ac7b
--- /dev/null
+++ b/vendor/github.com/Microsoft/hcsshim/internal/jobobject/iocp.go
@@ -0,0 +1,111 @@
+package jobobject
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "unsafe"
+
+ "github.com/Microsoft/hcsshim/internal/log"
+ "github.com/Microsoft/hcsshim/internal/queue"
+ "github.com/Microsoft/hcsshim/internal/winapi"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/sys/windows"
+)
+
+var (
+ ioInitOnce sync.Once
+ initIOErr error
+ // Global iocp handle that will be re-used for every job object
+ ioCompletionPort windows.Handle
+ // Mapping of job handle to queue to place notifications in.
+ jobMap sync.Map
+)
+
+// MsgAllProcessesExited is a type representing a message that every process in a job has exited.
+type MsgAllProcessesExited struct{}
+
+// MsgUnimplemented represents a message that we are aware of, but that isn't implemented currently.
+// This should not be treated as an error.
+type MsgUnimplemented struct{}
+
+// pollIOCP polls the io completion port forever.
+func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
+ var (
+ overlapped uintptr
+ code uint32
+ key uintptr
+ )
+
+ for {
+ err := windows.GetQueuedCompletionStatus(iocpHandle, &code, &key, (**windows.Overlapped)(unsafe.Pointer(&overlapped)), windows.INFINITE)
+ if err != nil {
+ log.G(ctx).WithError(err).Error("failed to poll for job object message")
+ continue
+ }
+ if val, ok := jobMap.Load(key); ok {
+ msq, ok := val.(*queue.MessageQueue)
+ if !ok {
+ log.G(ctx).WithField("value", msq).Warn("encountered non queue type in job map")
+ continue
+ }
+ notification, err := parseMessage(code, overlapped)
+ if err != nil {
+ log.G(ctx).WithFields(logrus.Fields{
+ "code": code,
+ "overlapped": overlapped,
+ }).Warn("failed to parse job object message")
+ continue
+ }
+ if err := msq.Write(notification); err == queue.ErrQueueClosed {
+ // Write will only return an error when the queue is closed.
+ // The only time a queue would ever be closed is when we call `Close` on
+ // the job it belongs to which also removes it from the jobMap, so something
+ // went wrong here. We can't return as this is reading messages for all jobs
+ // so just log it and move on.
+ log.G(ctx).WithFields(logrus.Fields{
+ "code": code,
+ "overlapped": overlapped,
+ }).Warn("tried to write to a closed queue")
+ continue
+ }
+ } else {
+ log.G(ctx).Warn("received a message for a job not present in the mapping")
+ }
+ }
+}
+
+func parseMessage(code uint32, overlapped uintptr) (interface{}, error) {
+ // Check code and parse out relevant information related to that notification
+ // that we care about. For now all we handle is the message that all processes
+ // in the job have exited.
+ switch code {
+ case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
+ return MsgAllProcessesExited{}, nil
+ // Other messages for completeness and a check to make sure that if we fall
+ // into the default case that this is a code we don't know how to handle.
+ case winapi.JOB_OBJECT_MSG_END_OF_JOB_TIME:
+ case winapi.JOB_OBJECT_MSG_END_OF_PROCESS_TIME:
+ case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT:
+ case winapi.JOB_OBJECT_MSG_NEW_PROCESS:
+ case winapi.JOB_OBJECT_MSG_EXIT_PROCESS:
+ case winapi.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS:
+ case winapi.JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT:
+ case winapi.JOB_OBJECT_MSG_JOB_MEMORY_LIMIT:
+ case winapi.JOB_OBJECT_MSG_NOTIFICATION_LIMIT:
+ default:
+ return nil, fmt.Errorf("unknown job notification type: %d", code)
+ }
+ return MsgUnimplemented{}, nil
+}
+
+// Assigns an IO completion port to get notified of events for the registered job
+// object.
+func attachIOCP(job windows.Handle, iocp windows.Handle) error {
+ info := winapi.JOBOBJECT_ASSOCIATE_COMPLETION_PORT{
+ CompletionKey: job,
+ CompletionPort: iocp,
+ }
+ _, err := windows.SetInformationJobObject(job, windows.JobObjectAssociateCompletionPortInformation, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info)))
+ return err
+}
diff --git a/vendor/github.com/Microsoft/hcsshim/internal/jobobject/jobobject.go b/vendor/github.com/Microsoft/hcsshim/internal/jobobject/jobobject.go
new file mode 100644
index 000000000..9c2726416
--- /dev/null
+++ b/vendor/github.com/Microsoft/hcsshim/internal/jobobject/jobobject.go
@@ -0,0 +1,499 @@
+package jobobject
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "unsafe"
+
+ "github.com/Microsoft/hcsshim/internal/queue"
+ "github.com/Microsoft/hcsshim/internal/winapi"
+ "golang.org/x/sys/windows"
+)
+
+// This file provides higher level constructs for the win32 job object API.
+// Most of the core creation and management functions are already present in "golang.org/x/sys/windows"
+// (CreateJobObject, AssignProcessToJobObject, etc.) as well as most of the limit information
+// structs and associated limit flags. Whatever is not present from the job object API
+// in golang.org/x/sys/windows is located in /internal/winapi.
+//
+// https://docs.microsoft.com/en-us/windows/win32/procthread/job-objects
+
+// JobObject is a high level wrapper around a Windows job object. Holds a handle to
+// the job, a queue to receive iocp notifications about the lifecycle
+// of the job and a mutex for synchronized handle access.
+type JobObject struct {
+ handle windows.Handle
+ mq *queue.MessageQueue
+ handleLock sync.RWMutex
+}
+
+// JobLimits represents the resource constraints that can be applied to a job object.
+type JobLimits struct {
+ CPULimit uint32
+ CPUWeight uint32
+ MemoryLimitInBytes uint64
+ MaxIOPS int64
+ MaxBandwidth int64
+}
+
+type CPURateControlType uint32
+
+const (
+ WeightBased CPURateControlType = iota
+ RateBased
+)
+
+// Processor resource controls
+const (
+ cpuLimitMin = 1
+ cpuLimitMax = 10000
+ cpuWeightMin = 1
+ cpuWeightMax = 9
+)
+
+var (
+ ErrAlreadyClosed = errors.New("the handle has already been closed")
+ ErrNotRegistered = errors.New("job is not registered to receive notifications")
+)
+
+// Options represents the set of configurable options when making or opening a job object.
+type Options struct {
+ // `Name` specifies the name of the job object if a named job object is desired.
+ Name string
+ // `Notifications` specifies if the job will be registered to receive notifications.
+ // Defaults to false.
+ Notifications bool
+ // `UseNTVariant` specifies if we should use the `Nt` variant of Open/CreateJobObject.
+ // Defaults to false.
+ UseNTVariant bool
+}
+
+// Create creates a job object.
+//
+// If options.Name is an empty string, the job will not be assigned a name.
+//
+// If options.Notifications are not enabled `PollNotifications` will return immediately with error `errNotRegistered`.
+//
+// If `options` is nil, use default option values.
+//
+// Returns a JobObject structure and an error if there is one.
+func Create(ctx context.Context, options *Options) (_ *JobObject, err error) {
+ if options == nil {
+ options = &Options{}
+ }
+
+ var jobName *winapi.UnicodeString
+ if options.Name != "" {
+ jobName, err = winapi.NewUnicodeString(options.Name)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ var jobHandle windows.Handle
+ if options.UseNTVariant {
+ oa := winapi.ObjectAttributes{
+ Length: unsafe.Sizeof(winapi.ObjectAttributes{}),
+ ObjectName: jobName,
+ Attributes: 0,
+ }
+ status := winapi.NtCreateJobObject(&jobHandle, winapi.JOB_OBJECT_ALL_ACCESS, &oa)
+ if status != 0 {
+ return nil, winapi.RtlNtStatusToDosError(status)
+ }
+ } else {
+ var jobNameBuf *uint16
+ if jobName != nil && jobName.Buffer != nil {
+ jobNameBuf = jobName.Buffer
+ }
+ jobHandle, err = windows.CreateJobObject(nil, jobNameBuf)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ defer func() {
+ if err != nil {
+ windows.Close(jobHandle)
+ }
+ }()
+
+ job := &JobObject{
+ handle: jobHandle,
+ }
+
+ // If the IOCP we'll be using to receive messages for all jobs hasn't been
+ // created, create it and start polling.
+ if options.Notifications {
+ mq, err := setupNotifications(ctx, job)
+ if err != nil {
+ return nil, err
+ }
+ job.mq = mq
+ }
+
+ return job, nil
+}
+
+// Open opens an existing job object with name provided in `options`. If no name is provided
+// return an error since we need to know what job object to open.
+//
+// If options.Notifications is false `PollNotifications` will return immediately with error `errNotRegistered`.
+//
+// Returns a JobObject structure and an error if there is one.
+func Open(ctx context.Context, options *Options) (_ *JobObject, err error) {
+ if options == nil || (options != nil && options.Name == "") {
+ return nil, errors.New("no job object name specified to open")
+ }
+
+ unicodeJobName, err := winapi.NewUnicodeString(options.Name)
+ if err != nil {
+ return nil, err
+ }
+
+ var jobHandle windows.Handle
+ if options != nil && options.UseNTVariant {
+ oa := winapi.ObjectAttributes{
+ Length: unsafe.Sizeof(winapi.ObjectAttributes{}),
+ ObjectName: unicodeJobName,
+ Attributes: 0,
+ }
+ status := winapi.NtOpenJobObject(&jobHandle, winapi.JOB_OBJECT_ALL_ACCESS, &oa)
+ if status != 0 {
+ return nil, winapi.RtlNtStatusToDosError(status)
+ }
+ } else {
+ jobHandle, err = winapi.OpenJobObject(winapi.JOB_OBJECT_ALL_ACCESS, false, unicodeJobName.Buffer)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ defer func() {
+ if err != nil {
+ windows.Close(jobHandle)
+ }
+ }()
+
+ job := &JobObject{
+ handle: jobHandle,
+ }
+
+ // If the IOCP we'll be using to receive messages for all jobs hasn't been
+ // created, create it and start polling.
+ if options != nil && options.Notifications {
+ mq, err := setupNotifications(ctx, job)
+ if err != nil {
+ return nil, err
+ }
+ job.mq = mq
+ }
+
+ return job, nil
+}
+
+// helper function to setup notifications for creating/opening a job object
+func setupNotifications(ctx context.Context, job *JobObject) (*queue.MessageQueue, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ ioInitOnce.Do(func() {
+ h, err := windows.CreateIoCompletionPort(windows.InvalidHandle, 0, 0, 0xffffffff)
+ if err != nil {
+ initIOErr = err
+ return
+ }
+ ioCompletionPort = h
+ go pollIOCP(ctx, h)
+ })
+
+ if initIOErr != nil {
+ return nil, initIOErr
+ }
+
+ mq := queue.NewMessageQueue()
+ jobMap.Store(uintptr(job.handle), mq)
+ if err := attachIOCP(job.handle, ioCompletionPort); err != nil {
+ jobMap.Delete(uintptr(job.handle))
+ return nil, fmt.Errorf("failed to attach job to IO completion port: %w", err)
+ }
+ return mq, nil
+}
+
+// PollNotification will poll for a job object notification. This call should only be called once
+// per job (ideally in a goroutine loop) and will block if there is not a notification ready.
+// This call will return immediately with error `ErrNotRegistered` if the job was not registered
+// to receive notifications during `Create`. Internally, messages will be queued and there
+// is no worry of messages being dropped.
+func (job *JobObject) PollNotification() (interface{}, error) {
+ if job.mq == nil {
+ return nil, ErrNotRegistered
+ }
+ return job.mq.ReadOrWait()
+}
+
+// UpdateProcThreadAttribute updates the passed in ProcThreadAttributeList to contain what is necessary to
+// launch a process in a job at creation time. This can be used to avoid having to call Assign() after a process
+// has already started running.
+func (job *JobObject) UpdateProcThreadAttribute(attrList *windows.ProcThreadAttributeListContainer) error {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+
+ if err := attrList.Update(
+ winapi.PROC_THREAD_ATTRIBUTE_JOB_LIST,
+ unsafe.Pointer(&job.handle),
+ unsafe.Sizeof(job.handle),
+ ); err != nil {
+ return fmt.Errorf("failed to update proc thread attributes for job object: %w", err)
+ }
+
+ return nil
+}
+
+// Close closes the job object handle.
+func (job *JobObject) Close() error {
+ job.handleLock.Lock()
+ defer job.handleLock.Unlock()
+
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+
+ if err := windows.Close(job.handle); err != nil {
+ return err
+ }
+
+ if job.mq != nil {
+ job.mq.Close()
+ }
+ // Handles now invalid so if the map entry to receive notifications for this job still
+ // exists remove it so we can stop receiving notifications.
+ if _, ok := jobMap.Load(uintptr(job.handle)); ok {
+ jobMap.Delete(uintptr(job.handle))
+ }
+
+ job.handle = 0
+ return nil
+}
+
+// Assign assigns a process to the job object.
+func (job *JobObject) Assign(pid uint32) error {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+
+ if pid == 0 {
+ return errors.New("invalid pid: 0")
+ }
+ hProc, err := windows.OpenProcess(winapi.PROCESS_ALL_ACCESS, true, pid)
+ if err != nil {
+ return err
+ }
+ defer windows.Close(hProc)
+ return windows.AssignProcessToJobObject(job.handle, hProc)
+}
+
+// Terminate terminates the job, essentially calls TerminateProcess on every process in the
+// job.
+func (job *JobObject) Terminate(exitCode uint32) error {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+ return windows.TerminateJobObject(job.handle, exitCode)
+}
+
+// Pids returns all of the process IDs in the job object.
+func (job *JobObject) Pids() ([]uint32, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ info := winapi.JOBOBJECT_BASIC_PROCESS_ID_LIST{}
+ err := winapi.QueryInformationJobObject(
+ job.handle,
+ winapi.JobObjectBasicProcessIdList,
+ uintptr(unsafe.Pointer(&info)),
+ uint32(unsafe.Sizeof(info)),
+ nil,
+ )
+
+ // This is either the case where there is only one process or no processes in
+ // the job. Any other case will result in ERROR_MORE_DATA. Check if info.NumberOfProcessIdsInList
+ // is 1 and just return this, otherwise return an empty slice.
+ if err == nil {
+ if info.NumberOfProcessIdsInList == 1 {
+ return []uint32{uint32(info.ProcessIdList[0])}, nil
+ }
+ // Return empty slice instead of nil to play well with the caller of this.
+ // Do not return an error if no processes are running inside the job
+ return []uint32{}, nil
+ }
+
+ if err != winapi.ERROR_MORE_DATA {
+ return nil, fmt.Errorf("failed initial query for PIDs in job object: %w", err)
+ }
+
+ jobBasicProcessIDListSize := unsafe.Sizeof(info) + (unsafe.Sizeof(info.ProcessIdList[0]) * uintptr(info.NumberOfAssignedProcesses-1))
+ buf := make([]byte, jobBasicProcessIDListSize)
+ if err = winapi.QueryInformationJobObject(
+ job.handle,
+ winapi.JobObjectBasicProcessIdList,
+ uintptr(unsafe.Pointer(&buf[0])),
+ uint32(len(buf)),
+ nil,
+ ); err != nil {
+ return nil, fmt.Errorf("failed to query for PIDs in job object: %w", err)
+ }
+
+ bufInfo := (*winapi.JOBOBJECT_BASIC_PROCESS_ID_LIST)(unsafe.Pointer(&buf[0]))
+ pids := make([]uint32, bufInfo.NumberOfProcessIdsInList)
+ for i, bufPid := range bufInfo.AllPids() {
+ pids[i] = uint32(bufPid)
+ }
+ return pids, nil
+}
+
+// QueryMemoryStats gets the memory stats for the job object.
+func (job *JobObject) QueryMemoryStats() (*winapi.JOBOBJECT_MEMORY_USAGE_INFORMATION, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ info := winapi.JOBOBJECT_MEMORY_USAGE_INFORMATION{}
+ if err := winapi.QueryInformationJobObject(
+ job.handle,
+ winapi.JobObjectMemoryUsageInformation,
+ uintptr(unsafe.Pointer(&info)),
+ uint32(unsafe.Sizeof(info)),
+ nil,
+ ); err != nil {
+ return nil, fmt.Errorf("failed to query for job object memory stats: %w", err)
+ }
+ return &info, nil
+}
+
+// QueryProcessorStats gets the processor stats for the job object.
+func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_INFORMATION, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ info := winapi.JOBOBJECT_BASIC_ACCOUNTING_INFORMATION{}
+ if err := winapi.QueryInformationJobObject(
+ job.handle,
+ winapi.JobObjectBasicAccountingInformation,
+ uintptr(unsafe.Pointer(&info)),
+ uint32(unsafe.Sizeof(info)),
+ nil,
+ ); err != nil {
+ return nil, fmt.Errorf("failed to query for job object process stats: %w", err)
+ }
+ return &info, nil
+}
+
+// QueryStorageStats gets the storage (I/O) stats for the job object.
+func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ info := winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION{
+ ControlFlags: winapi.JOBOBJECT_IO_ATTRIBUTION_CONTROL_ENABLE,
+ }
+ if err := winapi.QueryInformationJobObject(
+ job.handle,
+ winapi.JobObjectIoAttribution,
+ uintptr(unsafe.Pointer(&info)),
+ uint32(unsafe.Sizeof(info)),
+ nil,
+ ); err != nil {
+ return nil, fmt.Errorf("failed to query for job object storage stats: %w", err)
+ }
+ return &info, nil
+}
+
+// QueryPrivateWorkingSet returns the private working set size for the job. This is calculated by adding up the
+// private working set for every process running in the job.
+func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {
+ pids, err := job.Pids()
+ if err != nil {
+ return 0, err
+ }
+
+ openAndQueryWorkingSet := func(pid uint32) (uint64, error) {
+ h, err := windows.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, pid)
+ if err != nil {
+ // Continue to the next if OpenProcess doesn't return a valid handle (fails). Handles a
+ // case where one of the pids in the job exited before we open.
+ return 0, nil
+ }
+ defer func() {
+ _ = windows.Close(h)
+ }()
+ // Check if the process is actually running in the job still. There's a small chance
+ // that the process could have exited and had its pid re-used between grabbing the pids
+ // in the job and opening the handle to it above.
+ var inJob int32
+ if err := winapi.IsProcessInJob(h, job.handle, &inJob); err != nil {
+ // This shouldn't fail unless we have incorrect access rights which we control
+ // here so probably best to error out if this failed.
+ return 0, err
+ }
+ // Don't report stats for this process as it's not running in the job. This shouldn't be
+ // an error condition though.
+ if inJob == 0 {
+ return 0, nil
+ }
+
+ var vmCounters winapi.VM_COUNTERS_EX2
+ status := winapi.NtQueryInformationProcess(
+ h,
+ winapi.ProcessVmCounters,
+ uintptr(unsafe.Pointer(&vmCounters)),
+ uint32(unsafe.Sizeof(vmCounters)),
+ nil,
+ )
+ if !winapi.NTSuccess(status) {
+ return 0, fmt.Errorf("failed to query information for process: %w", winapi.RtlNtStatusToDosError(status))
+ }
+ return uint64(vmCounters.PrivateWorkingSetSize), nil
+ }
+
+ var jobWorkingSetSize uint64
+ for _, pid := range pids {
+ workingSet, err := openAndQueryWorkingSet(pid)
+ if err != nil {
+ return 0, err
+ }
+ jobWorkingSetSize += workingSet
+ }
+
+ return jobWorkingSetSize, nil
+}
diff --git a/vendor/github.com/Microsoft/hcsshim/internal/jobobject/limits.go b/vendor/github.com/Microsoft/hcsshim/internal/jobobject/limits.go
new file mode 100644
index 000000000..4be297788
--- /dev/null
+++ b/vendor/github.com/Microsoft/hcsshim/internal/jobobject/limits.go
@@ -0,0 +1,315 @@
+package jobobject
+
+import (
+ "errors"
+ "fmt"
+ "unsafe"
+
+ "github.com/Microsoft/hcsshim/internal/winapi"
+ "golang.org/x/sys/windows"
+)
+
+const (
+ memoryLimitMax uint64 = 0xffffffffffffffff
+)
+
+func isFlagSet(flag, controlFlags uint32) bool {
+ return (flag & controlFlags) == flag
+}
+
+// SetResourceLimits sets resource limits on the job object (cpu, memory, storage).
+func (job *JobObject) SetResourceLimits(limits *JobLimits) error {
+ // Go through and check what limits were specified and apply them to the job.
+ if limits.MemoryLimitInBytes != 0 {
+ if err := job.SetMemoryLimit(limits.MemoryLimitInBytes); err != nil {
+ return fmt.Errorf("failed to set job object memory limit: %w", err)
+ }
+ }
+
+ if limits.CPULimit != 0 {
+ if err := job.SetCPULimit(RateBased, limits.CPULimit); err != nil {
+ return fmt.Errorf("failed to set job object cpu limit: %w", err)
+ }
+ } else if limits.CPUWeight != 0 {
+ if err := job.SetCPULimit(WeightBased, limits.CPUWeight); err != nil {
+ return fmt.Errorf("failed to set job object cpu limit: %w", err)
+ }
+ }
+
+ if limits.MaxBandwidth != 0 || limits.MaxIOPS != 0 {
+ if err := job.SetIOLimit(limits.MaxBandwidth, limits.MaxIOPS); err != nil {
+ return fmt.Errorf("failed to set io limit on job object: %w", err)
+ }
+ }
+ return nil
+}
+
+// SetTerminateOnLastHandleClose sets the job object flag that specifies that the job should terminate
+// all processes in the job on the last open handle being closed.
+func (job *JobObject) SetTerminateOnLastHandleClose() error {
+ info, err := job.getExtendedInformation()
+ if err != nil {
+ return err
+ }
+ info.BasicLimitInformation.LimitFlags |= windows.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
+ return job.setExtendedInformation(info)
+}
+
+// SetMemoryLimit sets the memory limit of the job object based on the given `memoryLimitInBytes`.
+func (job *JobObject) SetMemoryLimit(memoryLimitInBytes uint64) error {
+ if memoryLimitInBytes >= memoryLimitMax {
+ return errors.New("memory limit specified exceeds the max size")
+ }
+
+ info, err := job.getExtendedInformation()
+ if err != nil {
+ return err
+ }
+
+ info.JobMemoryLimit = uintptr(memoryLimitInBytes)
+ info.BasicLimitInformation.LimitFlags |= windows.JOB_OBJECT_LIMIT_JOB_MEMORY
+ return job.setExtendedInformation(info)
+}
+
+// GetMemoryLimit gets the memory limit in bytes of the job object.
+func (job *JobObject) GetMemoryLimit() (uint64, error) {
+ info, err := job.getExtendedInformation()
+ if err != nil {
+ return 0, err
+ }
+ return uint64(info.JobMemoryLimit), nil
+}
+
+// SetCPULimit sets the CPU limit depending on the specified `CPURateControlType` to
+// `rateControlValue` for the job object.
+func (job *JobObject) SetCPULimit(rateControlType CPURateControlType, rateControlValue uint32) error {
+ cpuInfo, err := job.getCPURateControlInformation()
+ if err != nil {
+ return err
+ }
+ switch rateControlType {
+ case WeightBased:
+ if rateControlValue < cpuWeightMin || rateControlValue > cpuWeightMax {
+ return fmt.Errorf("processor weight value of `%d` is invalid", rateControlValue)
+ }
+ cpuInfo.ControlFlags |= winapi.JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | winapi.JOB_OBJECT_CPU_RATE_CONTROL_WEIGHT_BASED
+ cpuInfo.Value = rateControlValue
+ case RateBased:
+ if rateControlValue < cpuLimitMin || rateControlValue > cpuLimitMax {
+ return fmt.Errorf("processor rate of `%d` is invalid", rateControlValue)
+ }
+ cpuInfo.ControlFlags |= winapi.JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | winapi.JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP
+ cpuInfo.Value = rateControlValue
+ default:
+ return errors.New("invalid job object cpu rate control type")
+ }
+ return job.setCPURateControlInfo(cpuInfo)
+}
+
+// GetCPULimit gets the cpu limits for the job object.
+// `rateControlType` is used to indicate what type of cpu limit to query for.
+func (job *JobObject) GetCPULimit(rateControlType CPURateControlType) (uint32, error) {
+ info, err := job.getCPURateControlInformation()
+ if err != nil {
+ return 0, err
+ }
+
+ if !isFlagSet(winapi.JOB_OBJECT_CPU_RATE_CONTROL_ENABLE, info.ControlFlags) {
+ return 0, errors.New("the job does not have cpu rate control enabled")
+ }
+
+ switch rateControlType {
+ case WeightBased:
+ if !isFlagSet(winapi.JOB_OBJECT_CPU_RATE_CONTROL_WEIGHT_BASED, info.ControlFlags) {
+ return 0, errors.New("cannot get cpu weight for job object without cpu weight option set")
+ }
+ case RateBased:
+ if !isFlagSet(winapi.JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP, info.ControlFlags) {
+ return 0, errors.New("cannot get cpu rate hard cap for job object without cpu rate hard cap option set")
+ }
+ default:
+ return 0, errors.New("invalid job object cpu rate control type")
+ }
+ return info.Value, nil
+}
+
+// SetCPUAffinity sets the processor affinity for the job object.
+// The affinity is passed in as a bitmask.
+func (job *JobObject) SetCPUAffinity(affinityBitMask uint64) error {
+ info, err := job.getExtendedInformation()
+ if err != nil {
+ return err
+ }
+ info.BasicLimitInformation.LimitFlags |= uint32(windows.JOB_OBJECT_LIMIT_AFFINITY)
+ info.BasicLimitInformation.Affinity = uintptr(affinityBitMask)
+ return job.setExtendedInformation(info)
+}
+
+// GetCPUAffinity gets the processor affinity for the job object.
+// The returned affinity is a bitmask.
+func (job *JobObject) GetCPUAffinity() (uint64, error) {
+ info, err := job.getExtendedInformation()
+ if err != nil {
+ return 0, err
+ }
+ return uint64(info.BasicLimitInformation.Affinity), nil
+}
+
+// SetIOLimit sets the IO limits specified on the job object.
+func (job *JobObject) SetIOLimit(maxBandwidth, maxIOPS int64) error {
+ ioInfo, err := job.getIOLimit()
+ if err != nil {
+ return err
+ }
+ ioInfo.ControlFlags |= winapi.JOB_OBJECT_IO_RATE_CONTROL_ENABLE
+ if maxBandwidth != 0 {
+ ioInfo.MaxBandwidth = maxBandwidth
+ }
+ if maxIOPS != 0 {
+ ioInfo.MaxIops = maxIOPS
+ }
+ return job.setIORateControlInfo(ioInfo)
+}
+
+// GetIOMaxBandwidthLimit gets the max bandwidth for the job object.
+func (job *JobObject) GetIOMaxBandwidthLimit() (int64, error) {
+ info, err := job.getIOLimit()
+ if err != nil {
+ return 0, err
+ }
+ return info.MaxBandwidth, nil
+}
+
+// GetIOMaxIopsLimit gets the max iops for the job object.
+func (job *JobObject) GetIOMaxIopsLimit() (int64, error) {
+ info, err := job.getIOLimit()
+ if err != nil {
+ return 0, err
+ }
+ return info.MaxIops, nil
+}
+
+// Helper function for getting a job object's extended information.
+func (job *JobObject) getExtendedInformation() (*windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ info := windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION{}
+ if err := winapi.QueryInformationJobObject(
+ job.handle,
+ windows.JobObjectExtendedLimitInformation,
+ uintptr(unsafe.Pointer(&info)),
+ uint32(unsafe.Sizeof(info)),
+ nil,
+ ); err != nil {
+ return nil, fmt.Errorf("query %v returned error: %w", info, err)
+ }
+ return &info, nil
+}
+
+// Helper function for getting a job object's CPU rate control information.
+func (job *JobObject) getCPURateControlInformation() (*winapi.JOBOBJECT_CPU_RATE_CONTROL_INFORMATION, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ info := winapi.JOBOBJECT_CPU_RATE_CONTROL_INFORMATION{}
+ if err := winapi.QueryInformationJobObject(
+ job.handle,
+ windows.JobObjectCpuRateControlInformation,
+ uintptr(unsafe.Pointer(&info)),
+ uint32(unsafe.Sizeof(info)),
+ nil,
+ ); err != nil {
+ return nil, fmt.Errorf("query %v returned error: %w", info, err)
+ }
+ return &info, nil
+}
+
+// Helper function for setting a job object's extended information.
+func (job *JobObject) setExtendedInformation(info *windows.JOBOBJECT_EXTENDED_LIMIT_INFORMATION) error {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+
+ if _, err := windows.SetInformationJobObject(
+ job.handle,
+ windows.JobObjectExtendedLimitInformation,
+ uintptr(unsafe.Pointer(info)),
+ uint32(unsafe.Sizeof(*info)),
+ ); err != nil {
+ return fmt.Errorf("failed to set Extended info %v on job object: %w", info, err)
+ }
+ return nil
+}
+
+// Helper function for querying job handle for IO limit information.
+func (job *JobObject) getIOLimit() (*winapi.JOBOBJECT_IO_RATE_CONTROL_INFORMATION, error) {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return nil, ErrAlreadyClosed
+ }
+
+ ioInfo := &winapi.JOBOBJECT_IO_RATE_CONTROL_INFORMATION{}
+ var blockCount uint32 = 1
+
+ if _, err := winapi.QueryIoRateControlInformationJobObject(
+ job.handle,
+ nil,
+ &ioInfo,
+ &blockCount,
+ ); err != nil {
+ return nil, fmt.Errorf("query %v returned error: %w", ioInfo, err)
+ }
+
+ if !isFlagSet(winapi.JOB_OBJECT_IO_RATE_CONTROL_ENABLE, ioInfo.ControlFlags) {
+ return nil, fmt.Errorf("query %v cannot get IO limits for job object without IO rate control option set", ioInfo)
+ }
+ return ioInfo, nil
+}
+
+// Helper function for setting a job object's IO rate control information.
+func (job *JobObject) setIORateControlInfo(ioInfo *winapi.JOBOBJECT_IO_RATE_CONTROL_INFORMATION) error {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+
+ if _, err := winapi.SetIoRateControlInformationJobObject(job.handle, ioInfo); err != nil {
+ return fmt.Errorf("failed to set IO limit info %v on job object: %w", ioInfo, err)
+ }
+ return nil
+}
+
+// Helper function for setting a job object's CPU rate control information.
+func (job *JobObject) setCPURateControlInfo(cpuInfo *winapi.JOBOBJECT_CPU_RATE_CONTROL_INFORMATION) error {
+ job.handleLock.RLock()
+ defer job.handleLock.RUnlock()
+
+ if job.handle == 0 {
+ return ErrAlreadyClosed
+ }
+ if _, err := windows.SetInformationJobObject(
+ job.handle,
+ windows.JobObjectCpuRateControlInformation,
+ uintptr(unsafe.Pointer(cpuInfo)),
+ uint32(unsafe.Sizeof(cpuInfo)),
+ ); err != nil {
+ return fmt.Errorf("failed to set cpu limit info %v on job object: %w", cpuInfo, err)
+ }
+ return nil
+}