diff options
Diffstat (limited to 'oci')
-rw-r--r-- | oci/container.go | 260 | ||||
-rw-r--r-- | oci/finished.go | 14 | ||||
-rw-r--r-- | oci/finished_32.go | 14 | ||||
-rw-r--r-- | oci/history.go | 31 | ||||
-rw-r--r-- | oci/memory_store.go | 92 | ||||
-rw-r--r-- | oci/oci.go | 748 | ||||
-rw-r--r-- | oci/store.go | 27 |
7 files changed, 1186 insertions, 0 deletions
diff --git a/oci/container.go b/oci/container.go new file mode 100644 index 000000000..c0eff2fd1 --- /dev/null +++ b/oci/container.go @@ -0,0 +1,260 @@ +package oci + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/containernetworking/plugins/pkg/ns" + "github.com/docker/docker/pkg/signal" + specs "github.com/opencontainers/runtime-spec/specs-go" + "k8s.io/apimachinery/pkg/fields" + pb "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" +) + +const ( + defaultStopSignal = "TERM" +) + +// Container represents a runtime container. +type Container struct { + id string + name string + logPath string + labels fields.Set + annotations fields.Set + crioAnnotations fields.Set + image string + sandbox string + netns ns.NetNS + terminal bool + stdin bool + stdinOnce bool + privileged bool + trusted bool + state *ContainerState + metadata *pb.ContainerMetadata + opLock sync.Locker + // this is the /var/run/storage/... directory, erased on reboot + bundlePath string + // this is the /var/lib/storage/... directory + dir string + stopSignal string + imageName string + imageRef string + volumes []ContainerVolume + mountPoint string + spec *specs.Spec +} + +// ContainerVolume is a bind mount for the container. +type ContainerVolume struct { + ContainerPath string `json:"container_path"` + HostPath string `json:"host_path"` + Readonly bool `json:"readonly"` +} + +// ContainerState represents the status of a container. +type ContainerState struct { + specs.State + Created time.Time `json:"created"` + Started time.Time `json:"started,omitempty"` + Finished time.Time `json:"finished,omitempty"` + ExitCode int32 `json:"exitCode,omitempty"` + OOMKilled bool `json:"oomKilled,omitempty"` + Error string `json:"error,omitempty"` +} + +// NewContainer creates a container object. +func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, crioAnnotations map[string]string, annotations map[string]string, image string, imageName string, imageRef string, metadata *pb.ContainerMetadata, sandbox string, terminal bool, stdin bool, stdinOnce bool, privileged bool, trusted bool, dir string, created time.Time, stopSignal string) (*Container, error) { + state := &ContainerState{} + state.Created = created + c := &Container{ + id: id, + name: name, + bundlePath: bundlePath, + logPath: logPath, + labels: labels, + sandbox: sandbox, + netns: netns, + terminal: terminal, + stdin: stdin, + stdinOnce: stdinOnce, + privileged: privileged, + trusted: trusted, + metadata: metadata, + annotations: annotations, + crioAnnotations: crioAnnotations, + image: image, + imageName: imageName, + imageRef: imageRef, + dir: dir, + state: state, + stopSignal: stopSignal, + opLock: new(sync.Mutex), + } + return c, nil +} + +// SetSpec loads the OCI spec in the container struct +func (c *Container) SetSpec(s *specs.Spec) { + c.spec = s +} + +// Spec returns a copy of the spec for the container +func (c *Container) Spec() specs.Spec { + return *c.spec +} + +// GetStopSignal returns the container's own stop signal configured from the +// image configuration or the default one. +func (c *Container) GetStopSignal() string { + if c.stopSignal == "" { + return defaultStopSignal + } + cleanSignal := strings.TrimPrefix(strings.ToUpper(c.stopSignal), "SIG") + _, ok := signal.SignalMap[cleanSignal] + if !ok { + return defaultStopSignal + } + return cleanSignal +} + +// FromDisk restores container's state from disk +func (c *Container) FromDisk() error { + jsonSource, err := os.Open(c.StatePath()) + if err != nil { + return err + } + defer jsonSource.Close() + + dec := json.NewDecoder(jsonSource) + return dec.Decode(c.state) +} + +// StatePath returns the containers state.json path +func (c *Container) StatePath() string { + return filepath.Join(c.dir, "state.json") +} + +// CreatedAt returns the container creation time +func (c *Container) CreatedAt() time.Time { + return c.state.Created +} + +// Name returns the name of the container. +func (c *Container) Name() string { + return c.name +} + +// ID returns the id of the container. +func (c *Container) ID() string { + return c.id +} + +// BundlePath returns the bundlePath of the container. +func (c *Container) BundlePath() string { + return c.bundlePath +} + +// LogPath returns the log path of the container. +func (c *Container) LogPath() string { + return c.logPath +} + +// Labels returns the labels of the container. +func (c *Container) Labels() map[string]string { + return c.labels +} + +// Annotations returns the annotations of the container. +func (c *Container) Annotations() map[string]string { + return c.annotations +} + +// CrioAnnotations returns the crio annotations of the container. +func (c *Container) CrioAnnotations() map[string]string { + return c.crioAnnotations +} + +// Image returns the image of the container. +func (c *Container) Image() string { + return c.image +} + +// ImageName returns the image name of the container. +func (c *Container) ImageName() string { + return c.imageName +} + +// ImageRef returns the image ref of the container. +func (c *Container) ImageRef() string { + return c.imageRef +} + +// Sandbox returns the sandbox name of the container. +func (c *Container) Sandbox() string { + return c.sandbox +} + +// Dir returns the the dir of the container +func (c *Container) Dir() string { + return c.dir +} + +// NetNsPath returns the path to the network namespace of the container. +func (c *Container) NetNsPath() (string, error) { + if c.state == nil { + return "", fmt.Errorf("container state is not populated") + } + + if c.netns == nil { + return fmt.Sprintf("/proc/%d/ns/net", c.state.Pid), nil + } + + return c.netns.Path(), nil +} + +// Metadata returns the metadata of the container. +func (c *Container) Metadata() *pb.ContainerMetadata { + return c.metadata +} + +// State returns the state of the running container +func (c *Container) State() *ContainerState { + c.opLock.Lock() + defer c.opLock.Unlock() + return c.state +} + +// AddVolume adds a volume to list of container volumes. +func (c *Container) AddVolume(v ContainerVolume) { + c.volumes = append(c.volumes, v) +} + +// Volumes returns the list of container volumes. +func (c *Container) Volumes() []ContainerVolume { + return c.volumes + +} + +// SetMountPoint sets the container mount point +func (c *Container) SetMountPoint(mp string) { + c.mountPoint = mp +} + +// MountPoint returns the container mount point +func (c *Container) MountPoint() string { + return c.mountPoint +} + +// SetState sets the conainer state +// +// XXX: DO NOT EVER USE THIS, THIS IS JUST USEFUL FOR MOCKING!!! +func (c *Container) SetState(state *ContainerState) { + c.state = state +} diff --git a/oci/finished.go b/oci/finished.go new file mode 100644 index 000000000..9fedbfb7a --- /dev/null +++ b/oci/finished.go @@ -0,0 +1,14 @@ +// +build !arm,!386 + +package oci + +import ( + "os" + "syscall" + "time" +) + +func getFinishedTime(fi os.FileInfo) time.Time { + st := fi.Sys().(*syscall.Stat_t) + return time.Unix(st.Ctim.Sec, st.Ctim.Nsec) +} diff --git a/oci/finished_32.go b/oci/finished_32.go new file mode 100644 index 000000000..3f24b1baa --- /dev/null +++ b/oci/finished_32.go @@ -0,0 +1,14 @@ +// +build arm 386 + +package oci + +import ( + "os" + "syscall" + "time" +) + +func getFinishedTime(fi os.FileInfo) time.Time { + st := fi.Sys().(*syscall.Stat_t) + return time.Unix(int64(st.Ctim.Sec), int64(st.Ctim.Nsec)) +} diff --git a/oci/history.go b/oci/history.go new file mode 100644 index 000000000..2ced41d61 --- /dev/null +++ b/oci/history.go @@ -0,0 +1,31 @@ +package oci + +import "sort" + +// History is a convenience type for storing a list of containers, +// sorted by creation date in descendant order. +type History []*Container + +// Len returns the number of containers in the history. +func (history *History) Len() int { + return len(*history) +} + +// Less compares two containers and returns true if the second one +// was created before the first one. +func (history *History) Less(i, j int) bool { + containers := *history + // FIXME: state access should be serialized + return containers[j].state.Created.Before(containers[i].state.Created) +} + +// Swap switches containers i and j positions in the history. +func (history *History) Swap(i, j int) { + containers := *history + containers[i], containers[j] = containers[j], containers[i] +} + +// sort orders the history by creation date in descendant order. +func (history *History) sort() { + sort.Sort(history) +} diff --git a/oci/memory_store.go b/oci/memory_store.go new file mode 100644 index 000000000..6223ce7f0 --- /dev/null +++ b/oci/memory_store.go @@ -0,0 +1,92 @@ +package oci + +import "sync" + +// memoryStore implements a Store in memory. +type memoryStore struct { + s map[string]*Container + sync.RWMutex +} + +// NewMemoryStore initializes a new memory store. +func NewMemoryStore() ContainerStorer { + return &memoryStore{ + s: make(map[string]*Container), + } +} + +// Add appends a new container to the memory store. +// It overrides the id if it existed before. +func (c *memoryStore) Add(id string, cont *Container) { + c.Lock() + c.s[id] = cont + c.Unlock() +} + +// Get returns a container from the store by id. +func (c *memoryStore) Get(id string) *Container { + c.RLock() + res := c.s[id] + c.RUnlock() + return res +} + +// Delete removes a container from the store by id. +func (c *memoryStore) Delete(id string) { + c.Lock() + delete(c.s, id) + c.Unlock() +} + +// List returns a sorted list of containers from the store. +// The containers are ordered by creation date. +func (c *memoryStore) List() []*Container { + containers := History(c.all()) + containers.sort() + return containers +} + +// Size returns the number of containers in the store. +func (c *memoryStore) Size() int { + c.RLock() + defer c.RUnlock() + return len(c.s) +} + +// First returns the first container found in the store by a given filter. +func (c *memoryStore) First(filter StoreFilter) *Container { + for _, cont := range c.all() { + if filter(cont) { + return cont + } + } + return nil +} + +// ApplyAll calls the reducer function with every container in the store. +// This operation is asynchronous in the memory store. +// NOTE: Modifications to the store MUST NOT be done by the StoreReducer. +func (c *memoryStore) ApplyAll(apply StoreReducer) { + wg := new(sync.WaitGroup) + for _, cont := range c.all() { + wg.Add(1) + go func(container *Container) { + apply(container) + wg.Done() + }(cont) + } + + wg.Wait() +} + +func (c *memoryStore) all() []*Container { + c.RLock() + containers := make([]*Container, 0, len(c.s)) + for _, cont := range c.s { + containers = append(containers, cont) + } + c.RUnlock() + return containers +} + +var _ ContainerStorer = &memoryStore{} diff --git a/oci/oci.go b/oci/oci.go new file mode 100644 index 000000000..756be44bf --- /dev/null +++ b/oci/oci.go @@ -0,0 +1,748 @@ +package oci + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/containerd/cgroups" + "github.com/kubernetes-incubator/cri-o/utils" + rspec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/sirupsen/logrus" + "golang.org/x/net/context" + "golang.org/x/sys/unix" + kwait "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + // ContainerStateCreated represents the created state of a container + ContainerStateCreated = "created" + // ContainerStatePaused represents the paused state of a container + ContainerStatePaused = "paused" + // ContainerStateRunning represents the running state of a container + ContainerStateRunning = "running" + // ContainerStateStopped represents the stopped state of a container + ContainerStateStopped = "stopped" + // ContainerCreateTimeout represents the value of container creating timeout + ContainerCreateTimeout = 240 * time.Second + + // CgroupfsCgroupsManager represents cgroupfs native cgroup manager + CgroupfsCgroupsManager = "cgroupfs" + // SystemdCgroupsManager represents systemd native cgroup manager + SystemdCgroupsManager = "systemd" + // ContainerExitsDir is the location of container exit dirs + ContainerExitsDir = "/var/run/crio/exits" + // ContainerAttachSocketDir is the location for container attach sockets + ContainerAttachSocketDir = "/var/run/crio" + + // killContainerTimeout is the timeout that we wait for the container to + // be SIGKILLed. + killContainerTimeout = 2 * time.Minute +) + +// New creates a new Runtime with options provided +func New(runtimeTrustedPath string, + runtimeUntrustedPath string, + trustLevel string, + conmonPath string, + conmonEnv []string, + cgroupManager string, + containerExitsDir string, + logSizeMax int64, + noPivot bool) (*Runtime, error) { + r := &Runtime{ + name: filepath.Base(runtimeTrustedPath), + trustedPath: runtimeTrustedPath, + untrustedPath: runtimeUntrustedPath, + trustLevel: trustLevel, + conmonPath: conmonPath, + conmonEnv: conmonEnv, + cgroupManager: cgroupManager, + containerExitsDir: containerExitsDir, + logSizeMax: logSizeMax, + noPivot: noPivot, + } + return r, nil +} + +// Runtime stores the information about a oci runtime +type Runtime struct { + name string + trustedPath string + untrustedPath string + trustLevel string + conmonPath string + conmonEnv []string + cgroupManager string + containerExitsDir string + logSizeMax int64 + noPivot bool +} + +// syncInfo is used to return data from monitor process to daemon +type syncInfo struct { + Pid int `json:"pid"` + Message string `json:"message,omitempty"` +} + +// exitCodeInfo is used to return the monitored process exit code to the daemon +type exitCodeInfo struct { + ExitCode int32 `json:"exit_code"` + Message string `json:"message,omitempty"` +} + +// Name returns the name of the OCI Runtime +func (r *Runtime) Name() string { + return r.name +} + +// Path returns the full path the OCI Runtime executable. +// Depending if the container is privileged and/or trusted, +// this will return either the trusted or untrusted runtime path. +func (r *Runtime) Path(c *Container) string { + if !c.trusted { + // We have an explicitly untrusted container. + if c.privileged { + logrus.Warnf("Running an untrusted but privileged container") + return r.trustedPath + } + + if r.untrustedPath != "" { + return r.untrustedPath + } + + return r.trustedPath + } + + // Our container is trusted. Let's look at the configured trust level. + if r.trustLevel == "trusted" { + return r.trustedPath + } + + // Our container is trusted, but we are running untrusted. + // We will use the untrusted container runtime if it's set + // and if it's not a privileged container. + if c.privileged || r.untrustedPath == "" { + return r.trustedPath + } + + return r.untrustedPath +} + +// Version returns the version of the OCI Runtime +func (r *Runtime) Version() (string, error) { + runtimeVersion, err := getOCIVersion(r.trustedPath, "-v") + if err != nil { + return "", err + } + return runtimeVersion, nil +} + +func getOCIVersion(name string, args ...string) (string, error) { + out, err := utils.ExecCmd(name, args...) + if err != nil { + return "", err + } + + firstLine := out[:strings.Index(out, "\n")] + v := firstLine[strings.LastIndex(firstLine, " ")+1:] + return v, nil +} + +// CreateContainer creates a container. +func (r *Runtime) CreateContainer(c *Container, cgroupParent string) (err error) { + var stderrBuf bytes.Buffer + parentPipe, childPipe, err := newPipe() + childStartPipe, parentStartPipe, err := newPipe() + if err != nil { + return fmt.Errorf("error creating socket pair: %v", err) + } + defer parentPipe.Close() + defer parentStartPipe.Close() + + var args []string + if r.cgroupManager == SystemdCgroupsManager { + args = append(args, "-s") + } + args = append(args, "-c", c.id) + args = append(args, "-u", c.id) + args = append(args, "-r", r.Path(c)) + args = append(args, "-b", c.bundlePath) + args = append(args, "-p", filepath.Join(c.bundlePath, "pidfile")) + args = append(args, "-l", c.logPath) + args = append(args, "--exit-dir", r.containerExitsDir) + args = append(args, "--socket-dir-path", ContainerAttachSocketDir) + if r.logSizeMax >= 0 { + args = append(args, "--log-size-max", fmt.Sprintf("%v", r.logSizeMax)) + } + if r.noPivot { + args = append(args, "--no-pivot") + } + if c.terminal { + args = append(args, "-t") + } else if c.stdin { + args = append(args, "-i") + } + logrus.WithFields(logrus.Fields{ + "args": args, + }).Debugf("running conmon: %s", r.conmonPath) + + cmd := exec.Command(r.conmonPath, args...) + cmd.Dir = c.bundlePath + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + cmd.Stdin = os.Stdin + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if c.terminal { + cmd.Stderr = &stderrBuf + } + cmd.ExtraFiles = append(cmd.ExtraFiles, childPipe, childStartPipe) + // 0, 1 and 2 are stdin, stdout and stderr + cmd.Env = append(r.conmonEnv, fmt.Sprintf("_OCI_SYNCPIPE=%d", 3)) + cmd.Env = append(cmd.Env, fmt.Sprintf("_OCI_STARTPIPE=%d", 4)) + + err = cmd.Start() + if err != nil { + childPipe.Close() + return err + } + + // We don't need childPipe on the parent side + childPipe.Close() + childStartPipe.Close() + + // Move conmon to specified cgroup + if r.cgroupManager == SystemdCgroupsManager { + logrus.Infof("Running conmon under slice %s and unitName %s", cgroupParent, createUnitName("crio-conmon", c.id)) + if err = utils.RunUnderSystemdScope(cmd.Process.Pid, cgroupParent, createUnitName("crio-conmon", c.id)); err != nil { + logrus.Warnf("Failed to add conmon to systemd sandbox cgroup: %v", err) + } + } else { + control, err := cgroups.New(cgroups.V1, cgroups.StaticPath(filepath.Join(cgroupParent, "/crio-conmon-"+c.id)), &rspec.LinuxResources{}) + if err != nil { + logrus.Warnf("Failed to add conmon to cgroupfs sandbox cgroup: %v", err) + } else { + // Here we should defer a crio-connmon- cgroup hierarchy deletion, but it will + // always fail as conmon's pid is still there. + // Fortunately, kubelet takes care of deleting this for us, so the leak will + // only happens in corner case where one does a manual deletion of the container + // through e.g. runc. This should be handled by implementing a conmon monitoring + // routine that does the cgroup cleanup once conmon is terminated. + if err := control.Add(cgroups.Process{Pid: cmd.Process.Pid}); err != nil { + logrus.Warnf("Failed to add conmon to cgroupfs sandbox cgroup: %v", err) + } + } + } + + /* We set the cgroup, now the child can start creating children */ + someData := []byte{0} + _, err = parentStartPipe.Write(someData) + if err != nil { + return err + } + + /* Wait for initial setup and fork, and reap child */ + err = cmd.Wait() + if err != nil { + return err + } + + // We will delete all container resources if creation fails + defer func() { + if err != nil { + r.DeleteContainer(c) + } + }() + + // Wait to get container pid from conmon + type syncStruct struct { + si *syncInfo + err error + } + ch := make(chan syncStruct) + go func() { + var si *syncInfo + if err = json.NewDecoder(parentPipe).Decode(&si); err != nil { + ch <- syncStruct{err: err} + return + } + ch <- syncStruct{si: si} + }() + + select { + case ss := <-ch: + if ss.err != nil { + return fmt.Errorf("error reading container (probably exited) json message: %v", ss.err) + } + logrus.Debugf("Received container pid: %d", ss.si.Pid) + if ss.si.Pid == -1 { + if ss.si.Message != "" { + logrus.Errorf("Container creation error: %s", ss.si.Message) + return fmt.Errorf("container create failed: %s", ss.si.Message) + } + logrus.Errorf("Container creation failed") + return fmt.Errorf("container create failed") + } + case <-time.After(ContainerCreateTimeout): + logrus.Errorf("Container creation timeout (%v)", ContainerCreateTimeout) + return fmt.Errorf("create container timeout") + } + return nil +} + +func createUnitName(prefix string, name string) string { + return fmt.Sprintf("%s-%s.scope", prefix, name) +} + +// StartContainer starts a container. +func (r *Runtime) StartContainer(c *Container) error { + c.opLock.Lock() + defer c.opLock.Unlock() + if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "start", c.id); err != nil { + return err + } + c.state.Started = time.Now() + return nil +} + +// ExecSyncResponse is returned from ExecSync. +type ExecSyncResponse struct { + Stdout []byte + Stderr []byte + ExitCode int32 +} + +// ExecSyncError wraps command's streams, exit code and error on ExecSync error. +type ExecSyncError struct { + Stdout bytes.Buffer + Stderr bytes.Buffer + ExitCode int32 + Err error +} + +func (e ExecSyncError) Error() string { + return fmt.Sprintf("command error: %+v, stdout: %s, stderr: %s, exit code %d", e.Err, e.Stdout.Bytes(), e.Stderr.Bytes(), e.ExitCode) +} + +func prepareExec() (pidFile, parentPipe, childPipe *os.File, err error) { + parentPipe, childPipe, err = os.Pipe() + if err != nil { + return nil, nil, nil, err + } + + pidFile, err = ioutil.TempFile("", "pidfile") + if err != nil { + parentPipe.Close() + childPipe.Close() + return nil, nil, nil, err + } + + return +} + +func parseLog(log []byte) (stdout, stderr []byte) { + // Split the log on newlines, which is what separates entries. + lines := bytes.SplitAfter(log, []byte{'\n'}) + for _, line := range lines { + // Ignore empty lines. + if len(line) == 0 { + continue + } + + // The format of log lines is "DATE pipe REST". + parts := bytes.SplitN(line, []byte{' '}, 3) + if len(parts) < 3 { + // Ignore the line if it's formatted incorrectly, but complain + // about it so it can be debugged. + logrus.Warnf("hit invalid log format: %q", string(line)) + continue + } + + pipe := string(parts[1]) + content := parts[2] + + switch pipe { + case "stdout": + stdout = append(stdout, content...) + case "stderr": + stderr = append(stderr, content...) + default: + // Complain about unknown pipes. + logrus.Warnf("hit invalid log format [unknown pipe %s]: %q", pipe, string(line)) + continue + } + } + + return stdout, stderr +} + +// ExecSync execs a command in a container and returns it's stdout, stderr and return code. +func (r *Runtime) ExecSync(c *Container, command []string, timeout int64) (resp *ExecSyncResponse, err error) { + pidFile, parentPipe, childPipe, err := prepareExec() + if err != nil { + return nil, ExecSyncError{ + ExitCode: -1, + Err: err, + } + } + defer parentPipe.Close() + defer func() { + if e := os.Remove(pidFile.Name()); e != nil { + logrus.Warnf("could not remove temporary PID file %s", pidFile.Name()) + } + }() + + logFile, err := ioutil.TempFile("", "crio-log-"+c.id) + if err != nil { + return nil, ExecSyncError{ + ExitCode: -1, + Err: err, + } + } + logPath := logFile.Name() + defer func() { + logFile.Close() + os.RemoveAll(logPath) + }() + + f, err := ioutil.TempFile("", "exec-process") + if err != nil { + return nil, ExecSyncError{ + ExitCode: -1, + Err: err, + } + } + defer os.RemoveAll(f.Name()) + + var args []string + args = append(args, "-c", c.id) + args = append(args, "-r", r.Path(c)) + args = append(args, "-p", pidFile.Name()) + args = append(args, "-e") + if c.terminal { + args = append(args, "-t") + } + if timeout > 0 { + args = append(args, "-T") + args = append(args, fmt.Sprintf("%d", timeout)) + } + args = append(args, "-l", logPath) + args = append(args, "--socket-dir-path", ContainerAttachSocketDir) + + pspec := c.Spec().Process + pspec.Env = append(pspec.Env, r.conmonEnv...) + pspec.Args = command + processJSON, err := json.Marshal(pspec) + if err != nil { + return nil, ExecSyncError{ + ExitCode: -1, + Err: err, + } + } + + if err := ioutil.WriteFile(f.Name(), processJSON, 0644); err != nil { + return nil, ExecSyncError{ + ExitCode: -1, + Err: err, + } + } + + args = append(args, "--exec-process-spec", f.Name()) + + cmd := exec.Command(r.conmonPath, args...) + + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + cmd.ExtraFiles = append(cmd.ExtraFiles, childPipe) + // 0, 1 and 2 are stdin, stdout and stderr + cmd.Env = append(r.conmonEnv, fmt.Sprintf("_OCI_SYNCPIPE=%d", 3)) + + err = cmd.Start() + if err != nil { + childPipe.Close() + return nil, ExecSyncError{ + Stdout: stdoutBuf, + Stderr: stderrBuf, + ExitCode: -1, + Err: err, + } + } + + // We don't need childPipe on the parent side + childPipe.Close() + + err = cmd.Wait() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + if status, ok := exitErr.Sys().(unix.WaitStatus); ok { + return nil, ExecSyncError{ + Stdout: stdoutBuf, + Stderr: stderrBuf, + ExitCode: int32(status.ExitStatus()), + Err: err, + } + } + } else { + return nil, ExecSyncError{ + Stdout: stdoutBuf, + Stderr: stderrBuf, + ExitCode: -1, + Err: err, + } + } + } + + var ec *exitCodeInfo + if err := json.NewDecoder(parentPipe).Decode(&ec); err != nil { + return nil, ExecSyncError{ + Stdout: stdoutBuf, + Stderr: stderrBuf, + ExitCode: -1, + Err: err, + } + } + + logrus.Infof("Received container exit code: %v, message: %s", ec.ExitCode, ec.Message) + + if ec.ExitCode == -1 { + return nil, ExecSyncError{ + Stdout: stdoutBuf, + Stderr: stderrBuf, + ExitCode: -1, + Err: fmt.Errorf(ec.Message), + } + } + + // The actual logged output is not the same as stdoutBuf and stderrBuf, + // which are used for getting error information. For the actual + // ExecSyncResponse we have to read the logfile. + // XXX: Currently runC dups the same console over both stdout and stderr, + // so we can't differentiate between the two. + + logBytes, err := ioutil.ReadFile(logPath) + if err != nil { + return nil, ExecSyncError{ + Stdout: stdoutBuf, + Stderr: stderrBuf, + ExitCode: -1, + Err: err, + } + } + + // We have to parse the log output into {stdout, stderr} buffers. + stdoutBytes, stderrBytes := parseLog(logBytes) + return &ExecSyncResponse{ + Stdout: stdoutBytes, + Stderr: stderrBytes, + ExitCode: ec.ExitCode, + }, nil +} + +func waitContainerStop(ctx context.Context, c *Container, timeout time.Duration) error { + done := make(chan struct{}) + // we could potentially re-use "done" channel to exit the loop on timeout + // but we use another channel "chControl" so that we won't never incur in the + // case the "done" channel is closed in the "default" select case and we also + // reach the timeout in the select below. If that happens we could raise + // a panic closing a closed channel so better be safe and use another new + // channel just to control the loop. + chControl := make(chan struct{}) + go func() { + for { + select { + case <-chControl: + return + default: + // Check if the process is still around + err := unix.Kill(c.state.Pid, 0) + if err == unix.ESRCH { + close(done) + return + } + time.Sleep(100 * time.Millisecond) + } + } + }() + select { + case <-done: + return nil + case <-ctx.Done(): + close(chControl) + return ctx.Err() + case <-time.After(timeout): + close(chControl) + err := unix.Kill(c.state.Pid, unix.SIGKILL) + if err != nil && err != unix.ESRCH { + return fmt.Errorf("failed to kill process: %v", err) + } + } + + c.state.Finished = time.Now() + return nil +} + +// StopContainer stops a container. Timeout is given in seconds. +func (r *Runtime) StopContainer(ctx context.Context, c *Container, timeout int64) error { + c.opLock.Lock() + defer c.opLock.Unlock() + + // Check if the process is around before sending a signal + err := unix.Kill(c.state.Pid, 0) + if err == unix.ESRCH { + c.state.Finished = time.Now() + return nil + } + + if timeout > 0 { + if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "kill", c.id, c.GetStopSignal()); err != nil { + return fmt.Errorf("failed to stop container %s, %v", c.id, err) + } + err = waitContainerStop(ctx, c, time.Duration(timeout)*time.Second) + if err == nil { + return nil + } + logrus.Warnf("Stop container %q timed out: %v", c.ID(), err) + } + + if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "kill", "--all", c.id, "KILL"); err != nil { + return fmt.Errorf("failed to stop container %s, %v", c.id, err) + } + + return waitContainerStop(ctx, c, killContainerTimeout) +} + +// DeleteContainer deletes a container. +func (r *Runtime) DeleteContainer(c *Container) error { + c.opLock.Lock() + defer c.opLock.Unlock() + _, err := utils.ExecCmd(r.Path(c), "delete", "--force", c.id) + return err +} + +// SetStartFailed sets the container state appropriately after a start failure +func (r *Runtime) SetStartFailed(c *Container, err error) { + c.opLock.Lock() + defer c.opLock.Unlock() + // adjust finished and started times + c.state.Finished, c.state.Started = c.state.Created, c.state.Created + c.state.Error = err.Error() +} + +// UpdateStatus refreshes the status of the container. +func (r *Runtime) UpdateStatus(c *Container) error { + c.opLock.Lock() + defer c.opLock.Unlock() + out, err := exec.Command(r.Path(c), "state", c.id).CombinedOutput() + if err != nil { + // there are many code paths that could lead to have a bad state in the + // underlying runtime. + // On any error like a container went away or we rebooted and containers + // went away we do not error out stopping kubernetes to recover. + // We always populate the fields below so kube can restart/reschedule + // containers failing. + c.state.Status = ContainerStateStopped + c.state.Finished = time.Now() + c.state.ExitCode = 255 + return nil + } + if err := json.NewDecoder(bytes.NewBuffer(out)).Decode(&c.state); err != nil { + return fmt.Errorf("failed to decode container status for %s: %s", c.id, err) + } + + if c.state.Status == ContainerStateStopped { + exitFilePath := filepath.Join(r.containerExitsDir, c.id) + var fi os.FileInfo + err = kwait.ExponentialBackoff( + kwait.Backoff{ + Duration: 500 * time.Millisecond, + Factor: 1.2, + Steps: 6, + }, + func() (bool, error) { + var err error + fi, err = os.Stat(exitFilePath) + if err != nil { + // wait longer + return false, nil + } + return true, nil + }) + if err != nil { + logrus.Warnf("failed to find container exit file: %v", err) + c.state.ExitCode = -1 + } else { + c.state.Finished = getFinishedTime(fi) + statusCodeStr, err := ioutil.ReadFile(exitFilePath) + if err != nil { + return fmt.Errorf("failed to read exit file: %v", err) + } + statusCode, err := strconv.Atoi(string(statusCodeStr)) + if err != nil { + return fmt.Errorf("status code conversion failed: %v", err) + } + c.state.ExitCode = int32(statusCode) + } + + oomFilePath := filepath.Join(c.bundlePath, "oom") + if _, err = os.Stat(oomFilePath); err == nil { + c.state.OOMKilled = true + } + } + + return nil +} + +// ContainerStatus returns the state of a container. +func (r *Runtime) ContainerStatus(c *Container) *ContainerState { + c.opLock.Lock() + defer c.opLock.Unlock() + return c.state +} + +// newPipe creates a unix socket pair for communication +func newPipe() (parent *os.File, child *os.File, err error) { + fds, err := unix.Socketpair(unix.AF_LOCAL, unix.SOCK_STREAM|unix.SOCK_CLOEXEC, 0) + if err != nil { + return nil, nil, err + } + return os.NewFile(uintptr(fds[1]), "parent"), os.NewFile(uintptr(fds[0]), "child"), nil +} + +// RuntimeReady checks if the runtime is up and ready to accept +// basic containers e.g. container only needs host network. +func (r *Runtime) RuntimeReady() (bool, error) { + return true, nil +} + +// NetworkReady checks if the runtime network is up and ready to +// accept containers which require container network. +func (r *Runtime) NetworkReady() (bool, error) { + return true, nil +} + +// PauseContainer pauses a container. +func (r *Runtime) PauseContainer(c *Container) error { + c.opLock.Lock() + defer c.opLock.Unlock() + _, err := utils.ExecCmd(r.Path(c), "pause", c.id) + return err +} + +// UnpauseContainer unpauses a container. +func (r *Runtime) UnpauseContainer(c *Container) error { + c.opLock.Lock() + defer c.opLock.Unlock() + _, err := utils.ExecCmd(r.Path(c), "resume", c.id) + return err +} diff --git a/oci/store.go b/oci/store.go new file mode 100644 index 000000000..1d27a0f9e --- /dev/null +++ b/oci/store.go @@ -0,0 +1,27 @@ +package oci + +// StoreFilter defines a function to filter +// container in the store. +type StoreFilter func(*Container) bool + +// StoreReducer defines a function to +// manipulate containers in the store +type StoreReducer func(*Container) + +// ContainerStorer defines an interface that any container store must implement. +type ContainerStorer interface { + // Add appends a new container to the store. + Add(string, *Container) + // Get returns a container from the store by the identifier it was stored with. + Get(string) *Container + // Delete removes a container from the store by the identifier it was stored with. + Delete(string) + // List returns a list of containers from the store. + List() []*Container + // Size returns the number of containers in the store. + Size() int + // First returns the first container found in the store by a given filter. + First(StoreFilter) *Container + // ApplyAll calls the reducer function with every container in the store. + ApplyAll(StoreReducer) +} |