1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
package jobobject
import (
"context"
"fmt"
"sync"
"unsafe"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/queue"
"github.com/Microsoft/hcsshim/internal/winapi"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)
var (
ioInitOnce sync.Once
initIOErr error
// Global iocp handle that will be re-used for every job object
ioCompletionPort windows.Handle
// Mapping of job handle to queue to place notifications in.
jobMap sync.Map
)
// MsgAllProcessesExited is a type representing a message that every process in a job has exited.
type MsgAllProcessesExited struct{}
// MsgUnimplemented represents a message that we are aware of, but that isn't implemented currently.
// This should not be treated as an error.
type MsgUnimplemented struct{}
// pollIOCP polls the io completion port forever.
func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
var (
overlapped uintptr
code uint32
key uintptr
)
for {
err := windows.GetQueuedCompletionStatus(iocpHandle, &code, &key, (**windows.Overlapped)(unsafe.Pointer(&overlapped)), windows.INFINITE)
if err != nil {
log.G(ctx).WithError(err).Error("failed to poll for job object message")
continue
}
if val, ok := jobMap.Load(key); ok {
msq, ok := val.(*queue.MessageQueue)
if !ok {
log.G(ctx).WithField("value", msq).Warn("encountered non queue type in job map")
continue
}
notification, err := parseMessage(code, overlapped)
if err != nil {
log.G(ctx).WithFields(logrus.Fields{
"code": code,
"overlapped": overlapped,
}).Warn("failed to parse job object message")
continue
}
if err := msq.Write(notification); err == queue.ErrQueueClosed {
// Write will only return an error when the queue is closed.
// The only time a queue would ever be closed is when we call `Close` on
// the job it belongs to which also removes it from the jobMap, so something
// went wrong here. We can't return as this is reading messages for all jobs
// so just log it and move on.
log.G(ctx).WithFields(logrus.Fields{
"code": code,
"overlapped": overlapped,
}).Warn("tried to write to a closed queue")
continue
}
} else {
log.G(ctx).Warn("received a message for a job not present in the mapping")
}
}
}
func parseMessage(code uint32, overlapped uintptr) (interface{}, error) {
// Check code and parse out relevant information related to that notification
// that we care about. For now all we handle is the message that all processes
// in the job have exited.
switch code {
case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
return MsgAllProcessesExited{}, nil
// Other messages for completeness and a check to make sure that if we fall
// into the default case that this is a code we don't know how to handle.
case winapi.JOB_OBJECT_MSG_END_OF_JOB_TIME:
case winapi.JOB_OBJECT_MSG_END_OF_PROCESS_TIME:
case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT:
case winapi.JOB_OBJECT_MSG_NEW_PROCESS:
case winapi.JOB_OBJECT_MSG_EXIT_PROCESS:
case winapi.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS:
case winapi.JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT:
case winapi.JOB_OBJECT_MSG_JOB_MEMORY_LIMIT:
case winapi.JOB_OBJECT_MSG_NOTIFICATION_LIMIT:
default:
return nil, fmt.Errorf("unknown job notification type: %d", code)
}
return MsgUnimplemented{}, nil
}
// Assigns an IO completion port to get notified of events for the registered job
// object.
func attachIOCP(job windows.Handle, iocp windows.Handle) error {
info := winapi.JOBOBJECT_ASSOCIATE_COMPLETION_PORT{
CompletionKey: job,
CompletionPort: iocp,
}
_, err := windows.SetInformationJobObject(job, windows.JobObjectAssociateCompletionPortInformation, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info)))
return err
}
|