aboutsummaryrefslogtreecommitdiff
path: root/oci
diff options
context:
space:
mode:
Diffstat (limited to 'oci')
-rw-r--r--oci/container.go260
-rw-r--r--oci/finished.go14
-rw-r--r--oci/finished_32.go14
-rw-r--r--oci/history.go31
-rw-r--r--oci/memory_store.go92
-rw-r--r--oci/oci.go748
-rw-r--r--oci/store.go27
7 files changed, 1186 insertions, 0 deletions
diff --git a/oci/container.go b/oci/container.go
new file mode 100644
index 000000000..c0eff2fd1
--- /dev/null
+++ b/oci/container.go
@@ -0,0 +1,260 @@
+package oci
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/containernetworking/plugins/pkg/ns"
+ "github.com/docker/docker/pkg/signal"
+ specs "github.com/opencontainers/runtime-spec/specs-go"
+ "k8s.io/apimachinery/pkg/fields"
+ pb "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
+)
+
+const (
+ defaultStopSignal = "TERM"
+)
+
+// Container represents a runtime container.
+type Container struct {
+ id string
+ name string
+ logPath string
+ labels fields.Set
+ annotations fields.Set
+ crioAnnotations fields.Set
+ image string
+ sandbox string
+ netns ns.NetNS
+ terminal bool
+ stdin bool
+ stdinOnce bool
+ privileged bool
+ trusted bool
+ state *ContainerState
+ metadata *pb.ContainerMetadata
+ opLock sync.Locker
+ // this is the /var/run/storage/... directory, erased on reboot
+ bundlePath string
+ // this is the /var/lib/storage/... directory
+ dir string
+ stopSignal string
+ imageName string
+ imageRef string
+ volumes []ContainerVolume
+ mountPoint string
+ spec *specs.Spec
+}
+
+// ContainerVolume is a bind mount for the container.
+type ContainerVolume struct {
+ ContainerPath string `json:"container_path"`
+ HostPath string `json:"host_path"`
+ Readonly bool `json:"readonly"`
+}
+
+// ContainerState represents the status of a container.
+type ContainerState struct {
+ specs.State
+ Created time.Time `json:"created"`
+ Started time.Time `json:"started,omitempty"`
+ Finished time.Time `json:"finished,omitempty"`
+ ExitCode int32 `json:"exitCode,omitempty"`
+ OOMKilled bool `json:"oomKilled,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+// NewContainer creates a container object.
+func NewContainer(id string, name string, bundlePath string, logPath string, netns ns.NetNS, labels map[string]string, crioAnnotations map[string]string, annotations map[string]string, image string, imageName string, imageRef string, metadata *pb.ContainerMetadata, sandbox string, terminal bool, stdin bool, stdinOnce bool, privileged bool, trusted bool, dir string, created time.Time, stopSignal string) (*Container, error) {
+ state := &ContainerState{}
+ state.Created = created
+ c := &Container{
+ id: id,
+ name: name,
+ bundlePath: bundlePath,
+ logPath: logPath,
+ labels: labels,
+ sandbox: sandbox,
+ netns: netns,
+ terminal: terminal,
+ stdin: stdin,
+ stdinOnce: stdinOnce,
+ privileged: privileged,
+ trusted: trusted,
+ metadata: metadata,
+ annotations: annotations,
+ crioAnnotations: crioAnnotations,
+ image: image,
+ imageName: imageName,
+ imageRef: imageRef,
+ dir: dir,
+ state: state,
+ stopSignal: stopSignal,
+ opLock: new(sync.Mutex),
+ }
+ return c, nil
+}
+
+// SetSpec loads the OCI spec in the container struct
+func (c *Container) SetSpec(s *specs.Spec) {
+ c.spec = s
+}
+
+// Spec returns a copy of the spec for the container
+func (c *Container) Spec() specs.Spec {
+ return *c.spec
+}
+
+// GetStopSignal returns the container's own stop signal configured from the
+// image configuration or the default one.
+func (c *Container) GetStopSignal() string {
+ if c.stopSignal == "" {
+ return defaultStopSignal
+ }
+ cleanSignal := strings.TrimPrefix(strings.ToUpper(c.stopSignal), "SIG")
+ _, ok := signal.SignalMap[cleanSignal]
+ if !ok {
+ return defaultStopSignal
+ }
+ return cleanSignal
+}
+
+// FromDisk restores container's state from disk
+func (c *Container) FromDisk() error {
+ jsonSource, err := os.Open(c.StatePath())
+ if err != nil {
+ return err
+ }
+ defer jsonSource.Close()
+
+ dec := json.NewDecoder(jsonSource)
+ return dec.Decode(c.state)
+}
+
+// StatePath returns the containers state.json path
+func (c *Container) StatePath() string {
+ return filepath.Join(c.dir, "state.json")
+}
+
+// CreatedAt returns the container creation time
+func (c *Container) CreatedAt() time.Time {
+ return c.state.Created
+}
+
+// Name returns the name of the container.
+func (c *Container) Name() string {
+ return c.name
+}
+
+// ID returns the id of the container.
+func (c *Container) ID() string {
+ return c.id
+}
+
+// BundlePath returns the bundlePath of the container.
+func (c *Container) BundlePath() string {
+ return c.bundlePath
+}
+
+// LogPath returns the log path of the container.
+func (c *Container) LogPath() string {
+ return c.logPath
+}
+
+// Labels returns the labels of the container.
+func (c *Container) Labels() map[string]string {
+ return c.labels
+}
+
+// Annotations returns the annotations of the container.
+func (c *Container) Annotations() map[string]string {
+ return c.annotations
+}
+
+// CrioAnnotations returns the crio annotations of the container.
+func (c *Container) CrioAnnotations() map[string]string {
+ return c.crioAnnotations
+}
+
+// Image returns the image of the container.
+func (c *Container) Image() string {
+ return c.image
+}
+
+// ImageName returns the image name of the container.
+func (c *Container) ImageName() string {
+ return c.imageName
+}
+
+// ImageRef returns the image ref of the container.
+func (c *Container) ImageRef() string {
+ return c.imageRef
+}
+
+// Sandbox returns the sandbox name of the container.
+func (c *Container) Sandbox() string {
+ return c.sandbox
+}
+
+// Dir returns the the dir of the container
+func (c *Container) Dir() string {
+ return c.dir
+}
+
+// NetNsPath returns the path to the network namespace of the container.
+func (c *Container) NetNsPath() (string, error) {
+ if c.state == nil {
+ return "", fmt.Errorf("container state is not populated")
+ }
+
+ if c.netns == nil {
+ return fmt.Sprintf("/proc/%d/ns/net", c.state.Pid), nil
+ }
+
+ return c.netns.Path(), nil
+}
+
+// Metadata returns the metadata of the container.
+func (c *Container) Metadata() *pb.ContainerMetadata {
+ return c.metadata
+}
+
+// State returns the state of the running container
+func (c *Container) State() *ContainerState {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ return c.state
+}
+
+// AddVolume adds a volume to list of container volumes.
+func (c *Container) AddVolume(v ContainerVolume) {
+ c.volumes = append(c.volumes, v)
+}
+
+// Volumes returns the list of container volumes.
+func (c *Container) Volumes() []ContainerVolume {
+ return c.volumes
+
+}
+
+// SetMountPoint sets the container mount point
+func (c *Container) SetMountPoint(mp string) {
+ c.mountPoint = mp
+}
+
+// MountPoint returns the container mount point
+func (c *Container) MountPoint() string {
+ return c.mountPoint
+}
+
+// SetState sets the conainer state
+//
+// XXX: DO NOT EVER USE THIS, THIS IS JUST USEFUL FOR MOCKING!!!
+func (c *Container) SetState(state *ContainerState) {
+ c.state = state
+}
diff --git a/oci/finished.go b/oci/finished.go
new file mode 100644
index 000000000..9fedbfb7a
--- /dev/null
+++ b/oci/finished.go
@@ -0,0 +1,14 @@
+// +build !arm,!386
+
+package oci
+
+import (
+ "os"
+ "syscall"
+ "time"
+)
+
+func getFinishedTime(fi os.FileInfo) time.Time {
+ st := fi.Sys().(*syscall.Stat_t)
+ return time.Unix(st.Ctim.Sec, st.Ctim.Nsec)
+}
diff --git a/oci/finished_32.go b/oci/finished_32.go
new file mode 100644
index 000000000..3f24b1baa
--- /dev/null
+++ b/oci/finished_32.go
@@ -0,0 +1,14 @@
+// +build arm 386
+
+package oci
+
+import (
+ "os"
+ "syscall"
+ "time"
+)
+
+func getFinishedTime(fi os.FileInfo) time.Time {
+ st := fi.Sys().(*syscall.Stat_t)
+ return time.Unix(int64(st.Ctim.Sec), int64(st.Ctim.Nsec))
+}
diff --git a/oci/history.go b/oci/history.go
new file mode 100644
index 000000000..2ced41d61
--- /dev/null
+++ b/oci/history.go
@@ -0,0 +1,31 @@
+package oci
+
+import "sort"
+
+// History is a convenience type for storing a list of containers,
+// sorted by creation date in descendant order.
+type History []*Container
+
+// Len returns the number of containers in the history.
+func (history *History) Len() int {
+ return len(*history)
+}
+
+// Less compares two containers and returns true if the second one
+// was created before the first one.
+func (history *History) Less(i, j int) bool {
+ containers := *history
+ // FIXME: state access should be serialized
+ return containers[j].state.Created.Before(containers[i].state.Created)
+}
+
+// Swap switches containers i and j positions in the history.
+func (history *History) Swap(i, j int) {
+ containers := *history
+ containers[i], containers[j] = containers[j], containers[i]
+}
+
+// sort orders the history by creation date in descendant order.
+func (history *History) sort() {
+ sort.Sort(history)
+}
diff --git a/oci/memory_store.go b/oci/memory_store.go
new file mode 100644
index 000000000..6223ce7f0
--- /dev/null
+++ b/oci/memory_store.go
@@ -0,0 +1,92 @@
+package oci
+
+import "sync"
+
+// memoryStore implements a Store in memory.
+type memoryStore struct {
+ s map[string]*Container
+ sync.RWMutex
+}
+
+// NewMemoryStore initializes a new memory store.
+func NewMemoryStore() ContainerStorer {
+ return &memoryStore{
+ s: make(map[string]*Container),
+ }
+}
+
+// Add appends a new container to the memory store.
+// It overrides the id if it existed before.
+func (c *memoryStore) Add(id string, cont *Container) {
+ c.Lock()
+ c.s[id] = cont
+ c.Unlock()
+}
+
+// Get returns a container from the store by id.
+func (c *memoryStore) Get(id string) *Container {
+ c.RLock()
+ res := c.s[id]
+ c.RUnlock()
+ return res
+}
+
+// Delete removes a container from the store by id.
+func (c *memoryStore) Delete(id string) {
+ c.Lock()
+ delete(c.s, id)
+ c.Unlock()
+}
+
+// List returns a sorted list of containers from the store.
+// The containers are ordered by creation date.
+func (c *memoryStore) List() []*Container {
+ containers := History(c.all())
+ containers.sort()
+ return containers
+}
+
+// Size returns the number of containers in the store.
+func (c *memoryStore) Size() int {
+ c.RLock()
+ defer c.RUnlock()
+ return len(c.s)
+}
+
+// First returns the first container found in the store by a given filter.
+func (c *memoryStore) First(filter StoreFilter) *Container {
+ for _, cont := range c.all() {
+ if filter(cont) {
+ return cont
+ }
+ }
+ return nil
+}
+
+// ApplyAll calls the reducer function with every container in the store.
+// This operation is asynchronous in the memory store.
+// NOTE: Modifications to the store MUST NOT be done by the StoreReducer.
+func (c *memoryStore) ApplyAll(apply StoreReducer) {
+ wg := new(sync.WaitGroup)
+ for _, cont := range c.all() {
+ wg.Add(1)
+ go func(container *Container) {
+ apply(container)
+ wg.Done()
+ }(cont)
+ }
+
+ wg.Wait()
+}
+
+func (c *memoryStore) all() []*Container {
+ c.RLock()
+ containers := make([]*Container, 0, len(c.s))
+ for _, cont := range c.s {
+ containers = append(containers, cont)
+ }
+ c.RUnlock()
+ return containers
+}
+
+var _ ContainerStorer = &memoryStore{}
diff --git a/oci/oci.go b/oci/oci.go
new file mode 100644
index 000000000..756be44bf
--- /dev/null
+++ b/oci/oci.go
@@ -0,0 +1,748 @@
+package oci
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/containerd/cgroups"
+ "github.com/kubernetes-incubator/cri-o/utils"
+ rspec "github.com/opencontainers/runtime-spec/specs-go"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/net/context"
+ "golang.org/x/sys/unix"
+ kwait "k8s.io/apimachinery/pkg/util/wait"
+)
+
+const (
+ // ContainerStateCreated represents the created state of a container
+ ContainerStateCreated = "created"
+ // ContainerStatePaused represents the paused state of a container
+ ContainerStatePaused = "paused"
+ // ContainerStateRunning represents the running state of a container
+ ContainerStateRunning = "running"
+ // ContainerStateStopped represents the stopped state of a container
+ ContainerStateStopped = "stopped"
+ // ContainerCreateTimeout represents the value of container creating timeout
+ ContainerCreateTimeout = 240 * time.Second
+
+ // CgroupfsCgroupsManager represents cgroupfs native cgroup manager
+ CgroupfsCgroupsManager = "cgroupfs"
+ // SystemdCgroupsManager represents systemd native cgroup manager
+ SystemdCgroupsManager = "systemd"
+ // ContainerExitsDir is the location of container exit dirs
+ ContainerExitsDir = "/var/run/crio/exits"
+ // ContainerAttachSocketDir is the location for container attach sockets
+ ContainerAttachSocketDir = "/var/run/crio"
+
+ // killContainerTimeout is the timeout that we wait for the container to
+ // be SIGKILLed.
+ killContainerTimeout = 2 * time.Minute
+)
+
+// New creates a new Runtime with options provided
+func New(runtimeTrustedPath string,
+ runtimeUntrustedPath string,
+ trustLevel string,
+ conmonPath string,
+ conmonEnv []string,
+ cgroupManager string,
+ containerExitsDir string,
+ logSizeMax int64,
+ noPivot bool) (*Runtime, error) {
+ r := &Runtime{
+ name: filepath.Base(runtimeTrustedPath),
+ trustedPath: runtimeTrustedPath,
+ untrustedPath: runtimeUntrustedPath,
+ trustLevel: trustLevel,
+ conmonPath: conmonPath,
+ conmonEnv: conmonEnv,
+ cgroupManager: cgroupManager,
+ containerExitsDir: containerExitsDir,
+ logSizeMax: logSizeMax,
+ noPivot: noPivot,
+ }
+ return r, nil
+}
+
+// Runtime stores the information about a oci runtime
+type Runtime struct {
+ name string
+ trustedPath string
+ untrustedPath string
+ trustLevel string
+ conmonPath string
+ conmonEnv []string
+ cgroupManager string
+ containerExitsDir string
+ logSizeMax int64
+ noPivot bool
+}
+
+// syncInfo is used to return data from monitor process to daemon
+type syncInfo struct {
+ Pid int `json:"pid"`
+ Message string `json:"message,omitempty"`
+}
+
+// exitCodeInfo is used to return the monitored process exit code to the daemon
+type exitCodeInfo struct {
+ ExitCode int32 `json:"exit_code"`
+ Message string `json:"message,omitempty"`
+}
+
+// Name returns the name of the OCI Runtime
+func (r *Runtime) Name() string {
+ return r.name
+}
+
+// Path returns the full path the OCI Runtime executable.
+// Depending if the container is privileged and/or trusted,
+// this will return either the trusted or untrusted runtime path.
+func (r *Runtime) Path(c *Container) string {
+ if !c.trusted {
+ // We have an explicitly untrusted container.
+ if c.privileged {
+ logrus.Warnf("Running an untrusted but privileged container")
+ return r.trustedPath
+ }
+
+ if r.untrustedPath != "" {
+ return r.untrustedPath
+ }
+
+ return r.trustedPath
+ }
+
+ // Our container is trusted. Let's look at the configured trust level.
+ if r.trustLevel == "trusted" {
+ return r.trustedPath
+ }
+
+ // Our container is trusted, but we are running untrusted.
+ // We will use the untrusted container runtime if it's set
+ // and if it's not a privileged container.
+ if c.privileged || r.untrustedPath == "" {
+ return r.trustedPath
+ }
+
+ return r.untrustedPath
+}
+
+// Version returns the version of the OCI Runtime
+func (r *Runtime) Version() (string, error) {
+ runtimeVersion, err := getOCIVersion(r.trustedPath, "-v")
+ if err != nil {
+ return "", err
+ }
+ return runtimeVersion, nil
+}
+
+func getOCIVersion(name string, args ...string) (string, error) {
+ out, err := utils.ExecCmd(name, args...)
+ if err != nil {
+ return "", err
+ }
+
+ firstLine := out[:strings.Index(out, "\n")]
+ v := firstLine[strings.LastIndex(firstLine, " ")+1:]
+ return v, nil
+}
+
+// CreateContainer creates a container.
+func (r *Runtime) CreateContainer(c *Container, cgroupParent string) (err error) {
+ var stderrBuf bytes.Buffer
+ parentPipe, childPipe, err := newPipe()
+ childStartPipe, parentStartPipe, err := newPipe()
+ if err != nil {
+ return fmt.Errorf("error creating socket pair: %v", err)
+ }
+ defer parentPipe.Close()
+ defer parentStartPipe.Close()
+
+ var args []string
+ if r.cgroupManager == SystemdCgroupsManager {
+ args = append(args, "-s")
+ }
+ args = append(args, "-c", c.id)
+ args = append(args, "-u", c.id)
+ args = append(args, "-r", r.Path(c))
+ args = append(args, "-b", c.bundlePath)
+ args = append(args, "-p", filepath.Join(c.bundlePath, "pidfile"))
+ args = append(args, "-l", c.logPath)
+ args = append(args, "--exit-dir", r.containerExitsDir)
+ args = append(args, "--socket-dir-path", ContainerAttachSocketDir)
+ if r.logSizeMax >= 0 {
+ args = append(args, "--log-size-max", fmt.Sprintf("%v", r.logSizeMax))
+ }
+ if r.noPivot {
+ args = append(args, "--no-pivot")
+ }
+ if c.terminal {
+ args = append(args, "-t")
+ } else if c.stdin {
+ args = append(args, "-i")
+ }
+ logrus.WithFields(logrus.Fields{
+ "args": args,
+ }).Debugf("running conmon: %s", r.conmonPath)
+
+ cmd := exec.Command(r.conmonPath, args...)
+ cmd.Dir = c.bundlePath
+ cmd.SysProcAttr = &syscall.SysProcAttr{
+ Setpgid: true,
+ }
+ cmd.Stdin = os.Stdin
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ if c.terminal {
+ cmd.Stderr = &stderrBuf
+ }
+ cmd.ExtraFiles = append(cmd.ExtraFiles, childPipe, childStartPipe)
+ // 0, 1 and 2 are stdin, stdout and stderr
+ cmd.Env = append(r.conmonEnv, fmt.Sprintf("_OCI_SYNCPIPE=%d", 3))
+ cmd.Env = append(cmd.Env, fmt.Sprintf("_OCI_STARTPIPE=%d", 4))
+
+ err = cmd.Start()
+ if err != nil {
+ childPipe.Close()
+ return err
+ }
+
+ // We don't need childPipe on the parent side
+ childPipe.Close()
+ childStartPipe.Close()
+
+ // Move conmon to specified cgroup
+ if r.cgroupManager == SystemdCgroupsManager {
+ logrus.Infof("Running conmon under slice %s and unitName %s", cgroupParent, createUnitName("crio-conmon", c.id))
+ if err = utils.RunUnderSystemdScope(cmd.Process.Pid, cgroupParent, createUnitName("crio-conmon", c.id)); err != nil {
+ logrus.Warnf("Failed to add conmon to systemd sandbox cgroup: %v", err)
+ }
+ } else {
+ control, err := cgroups.New(cgroups.V1, cgroups.StaticPath(filepath.Join(cgroupParent, "/crio-conmon-"+c.id)), &rspec.LinuxResources{})
+ if err != nil {
+ logrus.Warnf("Failed to add conmon to cgroupfs sandbox cgroup: %v", err)
+ } else {
+ // Here we should defer a crio-connmon- cgroup hierarchy deletion, but it will
+ // always fail as conmon's pid is still there.
+ // Fortunately, kubelet takes care of deleting this for us, so the leak will
+ // only happens in corner case where one does a manual deletion of the container
+ // through e.g. runc. This should be handled by implementing a conmon monitoring
+ // routine that does the cgroup cleanup once conmon is terminated.
+ if err := control.Add(cgroups.Process{Pid: cmd.Process.Pid}); err != nil {
+ logrus.Warnf("Failed to add conmon to cgroupfs sandbox cgroup: %v", err)
+ }
+ }
+ }
+
+ /* We set the cgroup, now the child can start creating children */
+ someData := []byte{0}
+ _, err = parentStartPipe.Write(someData)
+ if err != nil {
+ return err
+ }
+
+ /* Wait for initial setup and fork, and reap child */
+ err = cmd.Wait()
+ if err != nil {
+ return err
+ }
+
+ // We will delete all container resources if creation fails
+ defer func() {
+ if err != nil {
+ r.DeleteContainer(c)
+ }
+ }()
+
+ // Wait to get container pid from conmon
+ type syncStruct struct {
+ si *syncInfo
+ err error
+ }
+ ch := make(chan syncStruct)
+ go func() {
+ var si *syncInfo
+ if err = json.NewDecoder(parentPipe).Decode(&si); err != nil {
+ ch <- syncStruct{err: err}
+ return
+ }
+ ch <- syncStruct{si: si}
+ }()
+
+ select {
+ case ss := <-ch:
+ if ss.err != nil {
+ return fmt.Errorf("error reading container (probably exited) json message: %v", ss.err)
+ }
+ logrus.Debugf("Received container pid: %d", ss.si.Pid)
+ if ss.si.Pid == -1 {
+ if ss.si.Message != "" {
+ logrus.Errorf("Container creation error: %s", ss.si.Message)
+ return fmt.Errorf("container create failed: %s", ss.si.Message)
+ }
+ logrus.Errorf("Container creation failed")
+ return fmt.Errorf("container create failed")
+ }
+ case <-time.After(ContainerCreateTimeout):
+ logrus.Errorf("Container creation timeout (%v)", ContainerCreateTimeout)
+ return fmt.Errorf("create container timeout")
+ }
+ return nil
+}
+
+func createUnitName(prefix string, name string) string {
+ return fmt.Sprintf("%s-%s.scope", prefix, name)
+}
+
+// StartContainer starts a container.
+func (r *Runtime) StartContainer(c *Container) error {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "start", c.id); err != nil {
+ return err
+ }
+ c.state.Started = time.Now()
+ return nil
+}
+
+// ExecSyncResponse is returned from ExecSync.
+type ExecSyncResponse struct {
+ Stdout []byte
+ Stderr []byte
+ ExitCode int32
+}
+
+// ExecSyncError wraps command's streams, exit code and error on ExecSync error.
+type ExecSyncError struct {
+ Stdout bytes.Buffer
+ Stderr bytes.Buffer
+ ExitCode int32
+ Err error
+}
+
+func (e ExecSyncError) Error() string {
+ return fmt.Sprintf("command error: %+v, stdout: %s, stderr: %s, exit code %d", e.Err, e.Stdout.Bytes(), e.Stderr.Bytes(), e.ExitCode)
+}
+
+func prepareExec() (pidFile, parentPipe, childPipe *os.File, err error) {
+ parentPipe, childPipe, err = os.Pipe()
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ pidFile, err = ioutil.TempFile("", "pidfile")
+ if err != nil {
+ parentPipe.Close()
+ childPipe.Close()
+ return nil, nil, nil, err
+ }
+
+ return
+}
+
+func parseLog(log []byte) (stdout, stderr []byte) {
+ // Split the log on newlines, which is what separates entries.
+ lines := bytes.SplitAfter(log, []byte{'\n'})
+ for _, line := range lines {
+ // Ignore empty lines.
+ if len(line) == 0 {
+ continue
+ }
+
+ // The format of log lines is "DATE pipe REST".
+ parts := bytes.SplitN(line, []byte{' '}, 3)
+ if len(parts) < 3 {
+ // Ignore the line if it's formatted incorrectly, but complain
+ // about it so it can be debugged.
+ logrus.Warnf("hit invalid log format: %q", string(line))
+ continue
+ }
+
+ pipe := string(parts[1])
+ content := parts[2]
+
+ switch pipe {
+ case "stdout":
+ stdout = append(stdout, content...)
+ case "stderr":
+ stderr = append(stderr, content...)
+ default:
+ // Complain about unknown pipes.
+ logrus.Warnf("hit invalid log format [unknown pipe %s]: %q", pipe, string(line))
+ continue
+ }
+ }
+
+ return stdout, stderr
+}
+
+// ExecSync execs a command in a container and returns it's stdout, stderr and return code.
+func (r *Runtime) ExecSync(c *Container, command []string, timeout int64) (resp *ExecSyncResponse, err error) {
+ pidFile, parentPipe, childPipe, err := prepareExec()
+ if err != nil {
+ return nil, ExecSyncError{
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+ defer parentPipe.Close()
+ defer func() {
+ if e := os.Remove(pidFile.Name()); e != nil {
+ logrus.Warnf("could not remove temporary PID file %s", pidFile.Name())
+ }
+ }()
+
+ logFile, err := ioutil.TempFile("", "crio-log-"+c.id)
+ if err != nil {
+ return nil, ExecSyncError{
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+ logPath := logFile.Name()
+ defer func() {
+ logFile.Close()
+ os.RemoveAll(logPath)
+ }()
+
+ f, err := ioutil.TempFile("", "exec-process")
+ if err != nil {
+ return nil, ExecSyncError{
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+ defer os.RemoveAll(f.Name())
+
+ var args []string
+ args = append(args, "-c", c.id)
+ args = append(args, "-r", r.Path(c))
+ args = append(args, "-p", pidFile.Name())
+ args = append(args, "-e")
+ if c.terminal {
+ args = append(args, "-t")
+ }
+ if timeout > 0 {
+ args = append(args, "-T")
+ args = append(args, fmt.Sprintf("%d", timeout))
+ }
+ args = append(args, "-l", logPath)
+ args = append(args, "--socket-dir-path", ContainerAttachSocketDir)
+
+ pspec := c.Spec().Process
+ pspec.Env = append(pspec.Env, r.conmonEnv...)
+ pspec.Args = command
+ processJSON, err := json.Marshal(pspec)
+ if err != nil {
+ return nil, ExecSyncError{
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+
+ if err := ioutil.WriteFile(f.Name(), processJSON, 0644); err != nil {
+ return nil, ExecSyncError{
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+
+ args = append(args, "--exec-process-spec", f.Name())
+
+ cmd := exec.Command(r.conmonPath, args...)
+
+ var stdoutBuf, stderrBuf bytes.Buffer
+ cmd.Stdout = &stdoutBuf
+ cmd.Stderr = &stderrBuf
+ cmd.ExtraFiles = append(cmd.ExtraFiles, childPipe)
+ // 0, 1 and 2 are stdin, stdout and stderr
+ cmd.Env = append(r.conmonEnv, fmt.Sprintf("_OCI_SYNCPIPE=%d", 3))
+
+ err = cmd.Start()
+ if err != nil {
+ childPipe.Close()
+ return nil, ExecSyncError{
+ Stdout: stdoutBuf,
+ Stderr: stderrBuf,
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+
+ // We don't need childPipe on the parent side
+ childPipe.Close()
+
+ err = cmd.Wait()
+ if err != nil {
+ if exitErr, ok := err.(*exec.ExitError); ok {
+ if status, ok := exitErr.Sys().(unix.WaitStatus); ok {
+ return nil, ExecSyncError{
+ Stdout: stdoutBuf,
+ Stderr: stderrBuf,
+ ExitCode: int32(status.ExitStatus()),
+ Err: err,
+ }
+ }
+ } else {
+ return nil, ExecSyncError{
+ Stdout: stdoutBuf,
+ Stderr: stderrBuf,
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+ }
+
+ var ec *exitCodeInfo
+ if err := json.NewDecoder(parentPipe).Decode(&ec); err != nil {
+ return nil, ExecSyncError{
+ Stdout: stdoutBuf,
+ Stderr: stderrBuf,
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+
+ logrus.Infof("Received container exit code: %v, message: %s", ec.ExitCode, ec.Message)
+
+ if ec.ExitCode == -1 {
+ return nil, ExecSyncError{
+ Stdout: stdoutBuf,
+ Stderr: stderrBuf,
+ ExitCode: -1,
+ Err: fmt.Errorf(ec.Message),
+ }
+ }
+
+ // The actual logged output is not the same as stdoutBuf and stderrBuf,
+ // which are used for getting error information. For the actual
+ // ExecSyncResponse we have to read the logfile.
+ // XXX: Currently runC dups the same console over both stdout and stderr,
+ // so we can't differentiate between the two.
+
+ logBytes, err := ioutil.ReadFile(logPath)
+ if err != nil {
+ return nil, ExecSyncError{
+ Stdout: stdoutBuf,
+ Stderr: stderrBuf,
+ ExitCode: -1,
+ Err: err,
+ }
+ }
+
+ // We have to parse the log output into {stdout, stderr} buffers.
+ stdoutBytes, stderrBytes := parseLog(logBytes)
+ return &ExecSyncResponse{
+ Stdout: stdoutBytes,
+ Stderr: stderrBytes,
+ ExitCode: ec.ExitCode,
+ }, nil
+}
+
+func waitContainerStop(ctx context.Context, c *Container, timeout time.Duration) error {
+ done := make(chan struct{})
+ // we could potentially re-use "done" channel to exit the loop on timeout
+ // but we use another channel "chControl" so that we won't never incur in the
+ // case the "done" channel is closed in the "default" select case and we also
+ // reach the timeout in the select below. If that happens we could raise
+ // a panic closing a closed channel so better be safe and use another new
+ // channel just to control the loop.
+ chControl := make(chan struct{})
+ go func() {
+ for {
+ select {
+ case <-chControl:
+ return
+ default:
+ // Check if the process is still around
+ err := unix.Kill(c.state.Pid, 0)
+ if err == unix.ESRCH {
+ close(done)
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ }()
+ select {
+ case <-done:
+ return nil
+ case <-ctx.Done():
+ close(chControl)
+ return ctx.Err()
+ case <-time.After(timeout):
+ close(chControl)
+ err := unix.Kill(c.state.Pid, unix.SIGKILL)
+ if err != nil && err != unix.ESRCH {
+ return fmt.Errorf("failed to kill process: %v", err)
+ }
+ }
+
+ c.state.Finished = time.Now()
+ return nil
+}
+
+// StopContainer stops a container. Timeout is given in seconds.
+func (r *Runtime) StopContainer(ctx context.Context, c *Container, timeout int64) error {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+
+ // Check if the process is around before sending a signal
+ err := unix.Kill(c.state.Pid, 0)
+ if err == unix.ESRCH {
+ c.state.Finished = time.Now()
+ return nil
+ }
+
+ if timeout > 0 {
+ if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "kill", c.id, c.GetStopSignal()); err != nil {
+ return fmt.Errorf("failed to stop container %s, %v", c.id, err)
+ }
+ err = waitContainerStop(ctx, c, time.Duration(timeout)*time.Second)
+ if err == nil {
+ return nil
+ }
+ logrus.Warnf("Stop container %q timed out: %v", c.ID(), err)
+ }
+
+ if err := utils.ExecCmdWithStdStreams(os.Stdin, os.Stdout, os.Stderr, r.Path(c), "kill", "--all", c.id, "KILL"); err != nil {
+ return fmt.Errorf("failed to stop container %s, %v", c.id, err)
+ }
+
+ return waitContainerStop(ctx, c, killContainerTimeout)
+}
+
+// DeleteContainer deletes a container.
+func (r *Runtime) DeleteContainer(c *Container) error {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ _, err := utils.ExecCmd(r.Path(c), "delete", "--force", c.id)
+ return err
+}
+
+// SetStartFailed sets the container state appropriately after a start failure
+func (r *Runtime) SetStartFailed(c *Container, err error) {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ // adjust finished and started times
+ c.state.Finished, c.state.Started = c.state.Created, c.state.Created
+ c.state.Error = err.Error()
+}
+
+// UpdateStatus refreshes the status of the container.
+func (r *Runtime) UpdateStatus(c *Container) error {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ out, err := exec.Command(r.Path(c), "state", c.id).CombinedOutput()
+ if err != nil {
+ // there are many code paths that could lead to have a bad state in the
+ // underlying runtime.
+ // On any error like a container went away or we rebooted and containers
+ // went away we do not error out stopping kubernetes to recover.
+ // We always populate the fields below so kube can restart/reschedule
+ // containers failing.
+ c.state.Status = ContainerStateStopped
+ c.state.Finished = time.Now()
+ c.state.ExitCode = 255
+ return nil
+ }
+ if err := json.NewDecoder(bytes.NewBuffer(out)).Decode(&c.state); err != nil {
+ return fmt.Errorf("failed to decode container status for %s: %s", c.id, err)
+ }
+
+ if c.state.Status == ContainerStateStopped {
+ exitFilePath := filepath.Join(r.containerExitsDir, c.id)
+ var fi os.FileInfo
+ err = kwait.ExponentialBackoff(
+ kwait.Backoff{
+ Duration: 500 * time.Millisecond,
+ Factor: 1.2,
+ Steps: 6,
+ },
+ func() (bool, error) {
+ var err error
+ fi, err = os.Stat(exitFilePath)
+ if err != nil {
+ // wait longer
+ return false, nil
+ }
+ return true, nil
+ })
+ if err != nil {
+ logrus.Warnf("failed to find container exit file: %v", err)
+ c.state.ExitCode = -1
+ } else {
+ c.state.Finished = getFinishedTime(fi)
+ statusCodeStr, err := ioutil.ReadFile(exitFilePath)
+ if err != nil {
+ return fmt.Errorf("failed to read exit file: %v", err)
+ }
+ statusCode, err := strconv.Atoi(string(statusCodeStr))
+ if err != nil {
+ return fmt.Errorf("status code conversion failed: %v", err)
+ }
+ c.state.ExitCode = int32(statusCode)
+ }
+
+ oomFilePath := filepath.Join(c.bundlePath, "oom")
+ if _, err = os.Stat(oomFilePath); err == nil {
+ c.state.OOMKilled = true
+ }
+ }
+
+ return nil
+}
+
+// ContainerStatus returns the state of a container.
+func (r *Runtime) ContainerStatus(c *Container) *ContainerState {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ return c.state
+}
+
+// newPipe creates a unix socket pair for communication
+func newPipe() (parent *os.File, child *os.File, err error) {
+ fds, err := unix.Socketpair(unix.AF_LOCAL, unix.SOCK_STREAM|unix.SOCK_CLOEXEC, 0)
+ if err != nil {
+ return nil, nil, err
+ }
+ return os.NewFile(uintptr(fds[1]), "parent"), os.NewFile(uintptr(fds[0]), "child"), nil
+}
+
+// RuntimeReady checks if the runtime is up and ready to accept
+// basic containers e.g. container only needs host network.
+func (r *Runtime) RuntimeReady() (bool, error) {
+ return true, nil
+}
+
+// NetworkReady checks if the runtime network is up and ready to
+// accept containers which require container network.
+func (r *Runtime) NetworkReady() (bool, error) {
+ return true, nil
+}
+
+// PauseContainer pauses a container.
+func (r *Runtime) PauseContainer(c *Container) error {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ _, err := utils.ExecCmd(r.Path(c), "pause", c.id)
+ return err
+}
+
+// UnpauseContainer unpauses a container.
+func (r *Runtime) UnpauseContainer(c *Container) error {
+ c.opLock.Lock()
+ defer c.opLock.Unlock()
+ _, err := utils.ExecCmd(r.Path(c), "resume", c.id)
+ return err
+}
diff --git a/oci/store.go b/oci/store.go
new file mode 100644
index 000000000..1d27a0f9e
--- /dev/null
+++ b/oci/store.go
@@ -0,0 +1,27 @@
+package oci
+
+// StoreFilter defines a function to filter
+// container in the store.
+type StoreFilter func(*Container) bool
+
+// StoreReducer defines a function to
+// manipulate containers in the store
+type StoreReducer func(*Container)
+
+// ContainerStorer defines an interface that any container store must implement.
+type ContainerStorer interface {
+ // Add appends a new container to the store.
+ Add(string, *Container)
+ // Get returns a container from the store by the identifier it was stored with.
+ Get(string) *Container
+ // Delete removes a container from the store by the identifier it was stored with.
+ Delete(string)
+ // List returns a list of containers from the store.
+ List() []*Container
+ // Size returns the number of containers in the store.
+ Size() int
+ // First returns the first container found in the store by a given filter.
+ First(StoreFilter) *Container
+ // ApplyAll calls the reducer function with every container in the store.
+ ApplyAll(StoreReducer)
+}