summaryrefslogtreecommitdiff
path: root/libpod
diff options
context:
space:
mode:
Diffstat (limited to 'libpod')
-rw-r--r--libpod/boltdb_state_internal.go8
-rw-r--r--libpod/container_exec.go227
-rw-r--r--libpod/container_internal.go27
-rw-r--r--libpod/oci.go15
-rw-r--r--libpod/oci_conmon_linux.go502
-rw-r--r--libpod/oci_missing.go7
-rw-r--r--libpod/runtime_ctr.go19
7 files changed, 601 insertions, 204 deletions
diff --git a/libpod/boltdb_state_internal.go b/libpod/boltdb_state_internal.go
index 33ff0720f..21d55bf77 100644
--- a/libpod/boltdb_state_internal.go
+++ b/libpod/boltdb_state_internal.go
@@ -695,7 +695,10 @@ func (s *BoltState) addContainer(ctr *Container, pod *Pod) error {
return errors.Wrapf(define.ErrNoSuchVolume, "no volume with name %s found in database when adding container %s", vol.Name, ctr.ID())
}
- ctrDepsBkt := volDB.Bucket(volDependenciesBkt)
+ ctrDepsBkt, err := volDB.CreateBucketIfNotExists(volDependenciesBkt)
+ if err != nil {
+ return errors.Wrapf(err, "error creating volume %s dependencies bucket to add container %s", vol.Name, ctr.ID())
+ }
if depExists := ctrDepsBkt.Get(ctrID); depExists == nil {
if err := ctrDepsBkt.Put(ctrID, ctrID); err != nil {
return errors.Wrapf(err, "error adding container %s to volume %s dependencies", ctr.ID(), vol.Name)
@@ -890,6 +893,9 @@ func (s *BoltState) removeContainer(ctr *Container, pod *Pod, tx *bolt.Tx) error
}
ctrDepsBkt := volDB.Bucket(volDependenciesBkt)
+ if ctrDepsBkt == nil {
+ return errors.Wrapf(define.ErrInternal, "volume %s is missing container dependencies bucket, cannot remove container %s from dependencies", vol.Name, ctr.ID())
+ }
if depExists := ctrDepsBkt.Get(ctrID); depExists == nil {
if err := ctrDepsBkt.Delete(ctrID); err != nil {
return errors.Wrapf(err, "error deleting container %s dependency on volume %s", ctr.ID(), vol.Name)
diff --git a/libpod/container_exec.go b/libpod/container_exec.go
index c1ce8b724..6ad767b4b 100644
--- a/libpod/container_exec.go
+++ b/libpod/container_exec.go
@@ -1,7 +1,9 @@
package libpod
import (
+ "bufio"
"io/ioutil"
+ "net"
"os"
"path/filepath"
"strconv"
@@ -102,7 +104,7 @@ func (e *ExecSession) Inspect() (*define.InspectExecSession, error) {
}
output := new(define.InspectExecSession)
- output.CanRemove = e.State != define.ExecStateRunning
+ output.CanRemove = e.State == define.ExecStateStopped
output.ContainerID = e.ContainerId
if e.Config.DetachKeys != nil {
output.DetachKeys = *e.Config.DetachKeys
@@ -156,9 +158,6 @@ func (c *Container) ExecCreate(config *ExecConfig) (string, error) {
if len(config.Command) == 0 {
return "", errors.Wrapf(define.ErrInvalidArg, "must provide a non-empty command to start an exec session")
}
- if config.Terminal && (config.AttachStdin || config.AttachStdout || config.AttachStderr) {
- return "", errors.Wrapf(define.ErrInvalidArg, "cannot specify streams to attach to when exec session has a pseudoterminal")
- }
// Verify that we are in a good state to continue
if !c.ensureState(define.ContainerStateRunning) {
@@ -247,34 +246,12 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
logrus.Infof("Going to start container %s exec session %s and attach to it", c.ID(), session.ID())
- // TODO: check logic here - should we set Privileged if the container is
- // privileged?
- var capList []string
- if session.Config.Privileged || c.config.Privileged {
- capList = capabilities.AllCapabilities()
- }
-
- user := c.config.User
- if session.Config.User != "" {
- user = session.Config.User
- }
-
- if err := c.createExecBundle(session.ID()); err != nil {
+ opts, err := prepareForExec(c, session)
+ if err != nil {
return err
}
- opts := new(ExecOptions)
- opts.Cmd = session.Config.Command
- opts.CapAdd = capList
- opts.Env = session.Config.Environment
- opts.Terminal = session.Config.Terminal
- opts.Cwd = session.Config.WorkDir
- opts.User = user
- opts.Streams = streams
- opts.PreserveFDs = session.Config.PreserveFDs
- opts.DetachKeys = session.Config.DetachKeys
-
- pid, attachChan, err := c.ociRuntime.ExecContainer(c, session.ID(), opts)
+ pid, attachChan, err := c.ociRuntime.ExecContainer(c, session.ID(), opts, streams)
if err != nil {
return err
}
@@ -318,28 +295,124 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
c.lock.Lock()
}
- // Sync the container to pick up state changes
- if err := c.syncContainer(); err != nil {
+ if err := writeExecExitCode(c, session.ID(), exitCode); err != nil {
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
- return errors.Wrapf(err, "error syncing container %s state to remove exec session %s", c.ID(), session.ID())
+ lastErr = err
}
- // Update status
- // Since we did a syncContainer, the old session has been overwritten.
- // Grab a fresh one from the database.
- session, ok = c.state.ExecSessions[sessionID]
+ // Clean up after ourselves
+ if err := c.cleanupExecBundle(session.ID()); err != nil {
+ if lastErr != nil {
+ logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
+ }
+ lastErr = err
+ }
+
+ return lastErr
+}
+
+// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
+func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) (deferredErr error) {
+ // TODO: How do we combine streams with the default streams set in the exec session?
+
+ // The flow here is somewhat strange, because we need to determine if
+ // there's a terminal ASAP (for error handling).
+ // Until we know, assume it's true (don't add standard stream headers).
+ // Add a defer to ensure our invariant (HTTP session is closed) is
+ // maintained.
+ isTerminal := true
+ defer func() {
+ hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
+ }()
+
+ if !c.batched {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if err := c.syncContainer(); err != nil {
+ return err
+ }
+ }
+
+ session, ok := c.state.ExecSessions[sessionID]
if !ok {
- // Exec session already removed.
- logrus.Infof("Container %s exec session %s already removed from database", c.ID(), sessionID)
- return nil
+ return errors.Wrapf(define.ErrNoSuchExecSession, "container %s has no exec session with ID %s", c.ID(), sessionID)
}
- session.State = define.ExecStateStopped
- session.ExitCode = exitCode
- session.PID = 0
+ // We can now finally get the real value of isTerminal.
+ isTerminal = session.Config.Terminal
+
+ // Verify that we are in a good state to continue
+ if !c.ensureState(define.ContainerStateRunning) {
+ return errors.Wrapf(define.ErrCtrStateInvalid, "can only start exec sessions when their container is running")
+ }
+
+ if session.State != define.ExecStateCreated {
+ return errors.Wrapf(define.ErrExecSessionStateInvalid, "can only start created exec sessions, while container %s session %s state is %q", c.ID(), session.ID(), session.State.String())
+ }
+
+ logrus.Infof("Going to start container %s exec session %s and attach to it", c.ID(), session.ID())
+
+ execOpts, err := prepareForExec(c, session)
+ if err != nil {
+ return err
+ }
+
+ if streams == nil {
+ streams = new(HTTPAttachStreams)
+ streams.Stdin = session.Config.AttachStdin
+ streams.Stdout = session.Config.AttachStdout
+ streams.Stderr = session.Config.AttachStderr
+ }
+
+ pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, httpCon, httpBuf, streams, cancel)
+ if err != nil {
+ return err
+ }
+
+ // TODO: Investigate whether more of this can be made common with
+ // ExecStartAndAttach
+
+ c.newContainerEvent(events.Exec)
+ logrus.Debugf("Successfully started exec session %s in container %s", session.ID(), c.ID())
+
+ var lastErr error
+
+ session.PID = pid
+ session.State = define.ExecStateRunning
if err := c.save(); err != nil {
+ lastErr = err
+ }
+
+ // Unlock so other processes can use the container
+ if !c.batched {
+ c.lock.Unlock()
+ }
+
+ tmpErr := <-attachChan
+ if lastErr != nil {
+ logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
+ }
+ lastErr = tmpErr
+
+ exitCode, err := c.readExecExitCode(session.ID())
+ if err != nil {
+ if lastErr != nil {
+ logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
+ }
+ lastErr = err
+ }
+
+ logrus.Debugf("Container %s exec session %s completed with exit code %d", c.ID(), session.ID(), exitCode)
+
+ // Lock again
+ if !c.batched {
+ c.lock.Lock()
+ }
+
+ if err := writeExecExitCode(c, session.ID(), exitCode); err != nil {
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
@@ -357,12 +430,6 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
return lastErr
}
-// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
-func (c *Container) ExecHTTPStartAndAttach(sessionID string) error {
- // Will be implemented in part 2, migrating Start.
- return define.ErrNotImplemented
-}
-
// ExecStop stops an exec session in the container.
// If a timeout is provided, it will be used; otherwise, the timeout will
// default to the stop timeout of the container.
@@ -814,3 +881,67 @@ func (c *Container) removeAllExecSessions() error {
return lastErr
}
+
+// Make an ExecOptions struct to start the OCI runtime and prepare its exec
+// bundle.
+func prepareForExec(c *Container, session *ExecSession) (*ExecOptions, error) {
+ // TODO: check logic here - should we set Privileged if the container is
+ // privileged?
+ var capList []string
+ if session.Config.Privileged || c.config.Privileged {
+ capList = capabilities.AllCapabilities()
+ }
+
+ user := c.config.User
+ if session.Config.User != "" {
+ user = session.Config.User
+ }
+
+ if err := c.createExecBundle(session.ID()); err != nil {
+ return nil, err
+ }
+
+ opts := new(ExecOptions)
+ opts.Cmd = session.Config.Command
+ opts.CapAdd = capList
+ opts.Env = session.Config.Environment
+ opts.Terminal = session.Config.Terminal
+ opts.Cwd = session.Config.WorkDir
+ opts.User = user
+ opts.PreserveFDs = session.Config.PreserveFDs
+ opts.DetachKeys = session.Config.DetachKeys
+
+ return opts, nil
+}
+
+// Write an exec session's exit code to the database
+func writeExecExitCode(c *Container, sessionID string, exitCode int) error {
+ // We can't reuse the old exec session (things may have changed from
+ // under use, the container was unlocked).
+ // So re-sync and get a fresh copy.
+ // If we can't do this, no point in continuing, any attempt to save
+ // would write garbage to the DB.
+ if err := c.syncContainer(); err != nil {
+ if errors.Cause(err) == define.ErrNoSuchCtr || errors.Cause(err) == define.ErrCtrRemoved {
+ // Container's entirely removed. We can't save status,
+ // but the container's entirely removed, so we don't
+ // need to. Exit without error.
+ return nil
+ }
+ return errors.Wrapf(err, "error syncing container %s state to remove exec session %s", c.ID(), sessionID)
+ }
+
+ session, ok := c.state.ExecSessions[sessionID]
+ if !ok {
+ // Exec session already removed.
+ logrus.Infof("Container %s exec session %s already removed from database", c.ID(), sessionID)
+ return nil
+ }
+
+ session.State = define.ExecStateStopped
+ session.ExitCode = exitCode
+ session.PID = 0
+
+ // Finally, save our changes.
+ return c.save()
+}
diff --git a/libpod/container_internal.go b/libpod/container_internal.go
index 3fcf687ec..909ad9851 100644
--- a/libpod/container_internal.go
+++ b/libpod/container_internal.go
@@ -1011,6 +1011,14 @@ func (c *Container) init(ctx context.Context, retainRetries bool) error {
logrus.Debugf("Created container %s in OCI runtime", c.ID())
+ // Remove any exec sessions leftover from a potential prior run.
+ if len(c.state.ExecSessions) > 0 {
+ if err := c.runtime.state.RemoveContainerExecSessions(c); err != nil {
+ logrus.Errorf("Error removing container %s exec sessions from DB: %v", c.ID(), err)
+ }
+ c.state.ExecSessions = make(map[string]*ExecSession)
+ }
+
c.state.ExitCode = 0
c.state.Exited = false
c.state.State = define.ContainerStateCreated
@@ -1562,21 +1570,24 @@ func (c *Container) cleanup(ctx context.Context) error {
lastError = errors.Wrapf(err, "error removing container %s network", c.ID())
}
- // Unmount storage
- if err := c.cleanupStorage(); err != nil {
+ // Remove the container from the runtime, if necessary.
+ // Do this *before* unmounting storage - some runtimes (e.g. Kata)
+ // apparently object to having storage removed while the container still
+ // exists.
+ if err := c.cleanupRuntime(ctx); err != nil {
if lastError != nil {
- logrus.Errorf("Error unmounting container %s storage: %v", c.ID(), err)
+ logrus.Errorf("Error removing container %s from OCI runtime: %v", c.ID(), err)
} else {
- lastError = errors.Wrapf(err, "error unmounting container %s storage", c.ID())
+ lastError = err
}
}
- // Remove the container from the runtime, if necessary
- if err := c.cleanupRuntime(ctx); err != nil {
+ // Unmount storage
+ if err := c.cleanupStorage(); err != nil {
if lastError != nil {
- logrus.Errorf("Error removing container %s from OCI runtime: %v", c.ID(), err)
+ logrus.Errorf("Error unmounting container %s storage: %v", c.ID(), err)
} else {
- lastError = err
+ lastError = errors.Wrapf(err, "error unmounting container %s storage", c.ID())
}
}
diff --git a/libpod/oci.go b/libpod/oci.go
index 9991c5625..6b1886f80 100644
--- a/libpod/oci.go
+++ b/libpod/oci.go
@@ -61,8 +61,7 @@ type OCIRuntime interface {
// the attach session to be terminated if provided via the STDIN
// channel. If they are not provided, the default detach keys will be
// used instead. Detach keys of "" will disable detaching via keyboard.
- // The streams parameter may be passed for containers that did not
- // create a terminal and will determine which streams to forward to the
+ // The streams parameter will determine which streams to forward to the
// client.
HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error
// AttachResize resizes the terminal in use by the given container.
@@ -71,7 +70,17 @@ type OCIRuntime interface {
// ExecContainer executes a command in a running container.
// Returns an int (exit code), error channel (errors from attach), and
// error (errors that occurred attempting to start the exec session).
- ExecContainer(ctr *Container, sessionID string, options *ExecOptions) (int, chan error, error)
+ // This returns once the exec session is running - not once it has
+ // completed, as one might expect. The attach session will remain
+ // running, in a goroutine that will return via the chan error in the
+ // return signature.
+ ExecContainer(ctr *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error)
+ // ExecContainerHTTP executes a command in a running container and
+ // attaches its standard streams to a provided hijacked HTTP session.
+ // Maintains the same invariants as ExecContainer (returns on session
+ // start, with a goroutine running in the background to handle attach).
+ // The HTTP attach itself maintains the same invariants as HTTPAttach.
+ ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error)
// ExecAttachResize resizes the terminal of a running exec session. Only
// allowed with sessions that were created with a TTY.
ExecAttachResize(ctr *Container, sessionID string, newSize remotecommand.TerminalSize) error
diff --git a/libpod/oci_conmon_linux.go b/libpod/oci_conmon_linux.go
index d1c1a1fc2..7ba36fe7c 100644
--- a/libpod/oci_conmon_linux.go
+++ b/libpod/oci_conmon_linux.go
@@ -636,8 +636,7 @@ func (r *ConmonOCIRuntime) AttachResize(ctr *Container, newSize remotecommand.Te
}
// ExecContainer executes a command in a running container
-// TODO: Split into Create/Start/Attach/Wait
-func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options *ExecOptions) (int, chan error, error) {
+func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error) {
if options == nil {
return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide an ExecOptions struct to ExecContainer")
}
@@ -649,178 +648,111 @@ func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options
return -1, nil, errors.Wrapf(define.ErrEmptyID, "must provide a session ID for exec")
}
- // create sync pipe to receive the pid
- parentSyncPipe, childSyncPipe, err := newPipe()
- if err != nil {
- return -1, nil, errors.Wrapf(err, "error creating socket pair")
+ // TODO: Should we default this to false?
+ // Or maybe make streams mandatory?
+ attachStdin := true
+ if streams != nil {
+ attachStdin = streams.AttachInput
}
- defer errorhandling.CloseQuiet(parentSyncPipe)
-
- // create start pipe to set the cgroup before running
- // attachToExec is responsible for closing parentStartPipe
- childStartPipe, parentStartPipe, err := newPipe()
- if err != nil {
- return -1, nil, errors.Wrapf(err, "error creating socket pair")
+ var ociLog string
+ if logrus.GetLevel() != logrus.DebugLevel && r.supportsJSON {
+ ociLog = c.execOCILog(sessionID)
}
- // We want to make sure we close the parent{Start,Attach}Pipes if we fail
- // but also don't want to close them after attach to exec is called
- attachToExecCalled := false
-
- defer func() {
- if !attachToExecCalled {
- errorhandling.CloseQuiet(parentStartPipe)
- }
- }()
-
- // create the attach pipe to allow attach socket to be created before
- // $RUNTIME exec starts running. This is to make sure we can capture all output
- // from the process through that socket, rather than half reading the log, half attaching to the socket
- // attachToExec is responsible for closing parentAttachPipe
- parentAttachPipe, childAttachPipe, err := newPipe()
+ execCmd, pipes, err := r.startExec(c, sessionID, options, attachStdin, ociLog)
if err != nil {
- return -1, nil, errors.Wrapf(err, "error creating socket pair")
+ return -1, nil, err
}
+ // Only close sync pipe. Start and attach are consumed in the attach
+ // goroutine.
defer func() {
- if !attachToExecCalled {
- errorhandling.CloseQuiet(parentAttachPipe)
+ if pipes.syncPipe != nil && !pipes.syncClosed {
+ errorhandling.CloseQuiet(pipes.syncPipe)
+ pipes.syncClosed = true
}
}()
- childrenClosed := false
- defer func() {
- if !childrenClosed {
- errorhandling.CloseQuiet(childSyncPipe)
- errorhandling.CloseQuiet(childAttachPipe)
- errorhandling.CloseQuiet(childStartPipe)
- }
+ // TODO Only create if !detach
+ // Attach to the container before starting it
+ attachChan := make(chan error)
+ go func() {
+ // attachToExec is responsible for closing pipes
+ attachChan <- c.attachToExec(streams, options.DetachKeys, sessionID, pipes.startPipe, pipes.attachPipe)
+ close(attachChan)
}()
- runtimeDir, err := util.GetRuntimeDir()
- if err != nil {
- return -1, nil, err
- }
-
- finalEnv := make([]string, 0, len(options.Env))
- for k, v := range options.Env {
- finalEnv = append(finalEnv, fmt.Sprintf("%s=%s", k, v))
- }
-
- processFile, err := prepareProcessExec(c, options.Cmd, finalEnv, options.Terminal, options.Cwd, options.User, sessionID)
- if err != nil {
- return -1, nil, err
- }
-
- var ociLog string
- if logrus.GetLevel() != logrus.DebugLevel && r.supportsJSON {
- ociLog = c.execOCILog(sessionID)
+ if err := execCmd.Wait(); err != nil {
+ return -1, nil, errors.Wrapf(err, "cannot run conmon")
}
- args := r.sharedConmonArgs(c, sessionID, c.execBundlePath(sessionID), c.execPidPath(sessionID), c.execLogPath(sessionID), c.execExitFileDir(sessionID), ociLog, "")
- if options.PreserveFDs > 0 {
- args = append(args, formatRuntimeOpts("--preserve-fds", fmt.Sprintf("%d", options.PreserveFDs))...)
- }
+ pid, err := readConmonPipeData(pipes.syncPipe, ociLog)
- for _, capability := range options.CapAdd {
- args = append(args, formatRuntimeOpts("--cap", capability)...)
- }
+ return pid, attachChan, err
+}
- if options.Terminal {
- args = append(args, "-t")
+// ExecContainerHTTP executes a new command in an existing container and
+// forwards its standard streams over an attach
+func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) {
+ if streams != nil {
+ if !streams.Stdin && !streams.Stdout && !streams.Stderr {
+ return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide at least one stream to attach to")
+ }
}
- if options.Streams != nil && options.Streams.AttachInput {
- args = append(args, "-i")
+ if options == nil {
+ return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide exec options to ExecContainerHTTP")
}
- // Append container ID and command
- args = append(args, "-e")
- // TODO make this optional when we can detach
- args = append(args, "--exec-attach")
- args = append(args, "--exec-process-spec", processFile.Name())
-
- logrus.WithFields(logrus.Fields{
- "args": args,
- }).Debugf("running conmon: %s", r.conmonPath)
- execCmd := exec.Command(r.conmonPath, args...)
-
- if options.Streams != nil {
- // Don't add the InputStream to the execCmd. Instead, the data should be passed
- // through CopyDetachable
- if options.Streams.AttachOutput {
- execCmd.Stdout = options.Streams.OutputStream
- }
- if options.Streams.AttachError {
- execCmd.Stderr = options.Streams.ErrorStream
- }
+ detachString := config.DefaultDetachKeys
+ if options.DetachKeys != nil {
+ detachString = *options.DetachKeys
}
-
- conmonEnv, extraFiles, err := r.configureConmonEnv(runtimeDir)
+ detachKeys, err := processDetachKeys(detachString)
if err != nil {
return -1, nil, err
}
- if options.PreserveFDs > 0 {
- for fd := 3; fd < int(3+options.PreserveFDs); fd++ {
- execCmd.ExtraFiles = append(execCmd.ExtraFiles, os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd)))
- }
+ // TODO: Should we default this to false?
+ // Or maybe make streams mandatory?
+ attachStdin := true
+ if streams != nil {
+ attachStdin = streams.Stdin
}
- // we don't want to step on users fds they asked to preserve
- // Since 0-2 are used for stdio, start the fds we pass in at preserveFDs+3
- execCmd.Env = r.conmonEnv
- execCmd.Env = append(execCmd.Env, fmt.Sprintf("_OCI_SYNCPIPE=%d", options.PreserveFDs+3), fmt.Sprintf("_OCI_STARTPIPE=%d", options.PreserveFDs+4), fmt.Sprintf("_OCI_ATTACHPIPE=%d", options.PreserveFDs+5))
- execCmd.Env = append(execCmd.Env, conmonEnv...)
-
- execCmd.ExtraFiles = append(execCmd.ExtraFiles, childSyncPipe, childStartPipe, childAttachPipe)
- execCmd.ExtraFiles = append(execCmd.ExtraFiles, extraFiles...)
- execCmd.Dir = c.execBundlePath(sessionID)
- execCmd.SysProcAttr = &syscall.SysProcAttr{
- Setpgid: true,
+ var ociLog string
+ if logrus.GetLevel() != logrus.DebugLevel && r.supportsJSON {
+ ociLog = ctr.execOCILog(sessionID)
}
- err = startCommandGivenSelinux(execCmd)
-
- // We don't need children pipes on the parent side
- errorhandling.CloseQuiet(childSyncPipe)
- errorhandling.CloseQuiet(childAttachPipe)
- errorhandling.CloseQuiet(childStartPipe)
- childrenClosed = true
-
+ execCmd, pipes, err := r.startExec(ctr, sessionID, options, attachStdin, ociLog)
if err != nil {
- return -1, nil, errors.Wrapf(err, "cannot start container %s", c.ID())
- }
- if err := r.moveConmonToCgroupAndSignal(c, execCmd, parentStartPipe); err != nil {
return -1, nil, err
}
- if options.PreserveFDs > 0 {
- for fd := 3; fd < int(3+options.PreserveFDs); fd++ {
- // These fds were passed down to the runtime. Close them
- // and not interfere
- if err := os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd)).Close(); err != nil {
- logrus.Debugf("unable to close file fd-%d", fd)
- }
+ // Only close sync pipe. Start and attach are consumed in the attach
+ // goroutine.
+ defer func() {
+ if pipes.syncPipe != nil && !pipes.syncClosed {
+ errorhandling.CloseQuiet(pipes.syncPipe)
+ pipes.syncClosed = true
}
- }
+ }()
- // TODO Only create if !detach
- // Attach to the container before starting it
attachChan := make(chan error)
go func() {
// attachToExec is responsible for closing pipes
- attachChan <- c.attachToExec(options.Streams, options.DetachKeys, sessionID, parentStartPipe, parentAttachPipe)
+ attachChan <- attachExecHTTP(ctr, sessionID, httpBuf, streams, pipes, detachKeys, options.Terminal, cancel)
close(attachChan)
}()
- attachToExecCalled = true
+ // Wait for conmon to succeed, when return.
if err := execCmd.Wait(); err != nil {
return -1, nil, errors.Wrapf(err, "cannot run conmon")
}
- pid, err := readConmonPipeData(parentSyncPipe, ociLog)
+ pid, err := readConmonPipeData(pipes.syncPipe, ociLog)
return pid, attachChan, err
}
@@ -1415,15 +1347,21 @@ func (r *ConmonOCIRuntime) configureConmonEnv(runtimeDir string) ([]string, []*o
// sharedConmonArgs takes common arguments for exec and create/restore and formats them for the conmon CLI
func (r *ConmonOCIRuntime) sharedConmonArgs(ctr *Container, cuuid, bundlePath, pidPath, logPath, exitDir, ociLogPath, logTag string) []string {
// set the conmon API version to be able to use the correct sync struct keys
- args := []string{"--api-version", "1"}
+ args := []string{
+ "--api-version", "1",
+ "-c", ctr.ID(),
+ "-u", cuuid,
+ "-r", r.path,
+ "-b", bundlePath,
+ "-p", pidPath,
+ "-n", ctr.Name(),
+ "--exit-dir", exitDir,
+ "--socket-dir-path", r.socketsDir,
+ }
+
if r.cgroupManager == config.SystemdCgroupsManager && !ctr.config.NoCgroups {
args = append(args, "-s")
}
- args = append(args, "-c", ctr.ID())
- args = append(args, "-u", cuuid)
- args = append(args, "-r", r.path)
- args = append(args, "-b", bundlePath)
- args = append(args, "-p", pidPath)
var logDriver string
switch ctr.LogDriver() {
@@ -1444,8 +1382,6 @@ func (r *ConmonOCIRuntime) sharedConmonArgs(ctr *Container, cuuid, bundlePath, p
}
args = append(args, "-l", logDriver)
- args = append(args, "--exit-dir", exitDir)
- args = append(args, "--socket-dir-path", r.socketsDir)
if r.logSizeMax >= 0 {
args = append(args, "--log-size-max", fmt.Sprintf("%v", r.logSizeMax))
}
@@ -1829,3 +1765,297 @@ func httpAttachNonTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter,
}
}
+
+// This contains pipes used by the exec API.
+type execPipes struct {
+ syncPipe *os.File
+ syncClosed bool
+ startPipe *os.File
+ startClosed bool
+ attachPipe *os.File
+ attachClosed bool
+}
+
+func (p *execPipes) cleanup() {
+ if p.syncPipe != nil && !p.syncClosed {
+ errorhandling.CloseQuiet(p.syncPipe)
+ p.syncClosed = true
+ }
+ if p.startPipe != nil && !p.startClosed {
+ errorhandling.CloseQuiet(p.startPipe)
+ p.startClosed = true
+ }
+ if p.attachPipe != nil && !p.attachClosed {
+ errorhandling.CloseQuiet(p.attachPipe)
+ p.attachClosed = true
+ }
+}
+
+// Start an exec session's conmon parent from the given options.
+func (r *ConmonOCIRuntime) startExec(c *Container, sessionID string, options *ExecOptions, attachStdin bool, ociLog string) (_ *exec.Cmd, _ *execPipes, deferredErr error) {
+ pipes := new(execPipes)
+
+ if options == nil {
+ return nil, nil, errors.Wrapf(define.ErrInvalidArg, "must provide an ExecOptions struct to ExecContainer")
+ }
+ if len(options.Cmd) == 0 {
+ return nil, nil, errors.Wrapf(define.ErrInvalidArg, "must provide a command to execute")
+ }
+
+ if sessionID == "" {
+ return nil, nil, errors.Wrapf(define.ErrEmptyID, "must provide a session ID for exec")
+ }
+
+ // create sync pipe to receive the pid
+ parentSyncPipe, childSyncPipe, err := newPipe()
+ if err != nil {
+ return nil, nil, errors.Wrapf(err, "error creating socket pair")
+ }
+ pipes.syncPipe = parentSyncPipe
+
+ defer func() {
+ if deferredErr != nil {
+ pipes.cleanup()
+ }
+ }()
+
+ // create start pipe to set the cgroup before running
+ // attachToExec is responsible for closing parentStartPipe
+ childStartPipe, parentStartPipe, err := newPipe()
+ if err != nil {
+ return nil, nil, errors.Wrapf(err, "error creating socket pair")
+ }
+ pipes.startPipe = parentStartPipe
+
+ // create the attach pipe to allow attach socket to be created before
+ // $RUNTIME exec starts running. This is to make sure we can capture all output
+ // from the process through that socket, rather than half reading the log, half attaching to the socket
+ // attachToExec is responsible for closing parentAttachPipe
+ parentAttachPipe, childAttachPipe, err := newPipe()
+ if err != nil {
+ return nil, nil, errors.Wrapf(err, "error creating socket pair")
+ }
+ pipes.attachPipe = parentAttachPipe
+
+ childrenClosed := false
+ defer func() {
+ if !childrenClosed {
+ errorhandling.CloseQuiet(childSyncPipe)
+ errorhandling.CloseQuiet(childAttachPipe)
+ errorhandling.CloseQuiet(childStartPipe)
+ }
+ }()
+
+ runtimeDir, err := util.GetRuntimeDir()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ finalEnv := make([]string, 0, len(options.Env))
+ for k, v := range options.Env {
+ finalEnv = append(finalEnv, fmt.Sprintf("%s=%s", k, v))
+ }
+
+ processFile, err := prepareProcessExec(c, options.Cmd, finalEnv, options.Terminal, options.Cwd, options.User, sessionID)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ args := r.sharedConmonArgs(c, sessionID, c.execBundlePath(sessionID), c.execPidPath(sessionID), c.execLogPath(sessionID), c.execExitFileDir(sessionID), ociLog, "")
+
+ if options.PreserveFDs > 0 {
+ args = append(args, formatRuntimeOpts("--preserve-fds", fmt.Sprintf("%d", options.PreserveFDs))...)
+ }
+
+ for _, capability := range options.CapAdd {
+ args = append(args, formatRuntimeOpts("--cap", capability)...)
+ }
+
+ if options.Terminal {
+ args = append(args, "-t")
+ }
+
+ if attachStdin {
+ args = append(args, "-i")
+ }
+
+ // Append container ID and command
+ args = append(args, "-e")
+ // TODO make this optional when we can detach
+ args = append(args, "--exec-attach")
+ args = append(args, "--exec-process-spec", processFile.Name())
+
+ logrus.WithFields(logrus.Fields{
+ "args": args,
+ }).Debugf("running conmon: %s", r.conmonPath)
+ // TODO: Need to pass this back so we can wait on it.
+ execCmd := exec.Command(r.conmonPath, args...)
+
+ // TODO: This is commented because it doesn't make much sense in HTTP
+ // attach, and I'm not certain it does for non-HTTP attach as well.
+ // if streams != nil {
+ // // Don't add the InputStream to the execCmd. Instead, the data should be passed
+ // // through CopyDetachable
+ // if streams.AttachOutput {
+ // execCmd.Stdout = options.Streams.OutputStream
+ // }
+ // if streams.AttachError {
+ // execCmd.Stderr = options.Streams.ErrorStream
+ // }
+ // }
+
+ conmonEnv, extraFiles, err := r.configureConmonEnv(runtimeDir)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if options.PreserveFDs > 0 {
+ for fd := 3; fd < int(3+options.PreserveFDs); fd++ {
+ execCmd.ExtraFiles = append(execCmd.ExtraFiles, os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd)))
+ }
+ }
+
+ // we don't want to step on users fds they asked to preserve
+ // Since 0-2 are used for stdio, start the fds we pass in at preserveFDs+3
+ execCmd.Env = r.conmonEnv
+ execCmd.Env = append(execCmd.Env, fmt.Sprintf("_OCI_SYNCPIPE=%d", options.PreserveFDs+3), fmt.Sprintf("_OCI_STARTPIPE=%d", options.PreserveFDs+4), fmt.Sprintf("_OCI_ATTACHPIPE=%d", options.PreserveFDs+5))
+ execCmd.Env = append(execCmd.Env, conmonEnv...)
+
+ execCmd.ExtraFiles = append(execCmd.ExtraFiles, childSyncPipe, childStartPipe, childAttachPipe)
+ execCmd.ExtraFiles = append(execCmd.ExtraFiles, extraFiles...)
+ execCmd.Dir = c.execBundlePath(sessionID)
+ execCmd.SysProcAttr = &syscall.SysProcAttr{
+ Setpgid: true,
+ }
+
+ err = startCommandGivenSelinux(execCmd)
+
+ // We don't need children pipes on the parent side
+ errorhandling.CloseQuiet(childSyncPipe)
+ errorhandling.CloseQuiet(childAttachPipe)
+ errorhandling.CloseQuiet(childStartPipe)
+ childrenClosed = true
+
+ if err != nil {
+ return nil, nil, errors.Wrapf(err, "cannot start container %s", c.ID())
+ }
+ if err := r.moveConmonToCgroupAndSignal(c, execCmd, parentStartPipe); err != nil {
+ return nil, nil, err
+ }
+
+ if options.PreserveFDs > 0 {
+ for fd := 3; fd < int(3+options.PreserveFDs); fd++ {
+ // These fds were passed down to the runtime. Close them
+ // and not interfere
+ if err := os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd)).Close(); err != nil {
+ logrus.Debugf("unable to close file fd-%d", fd)
+ }
+ }
+ }
+
+ return execCmd, pipes, nil
+}
+
+// Attach to a container over HTTP
+func attachExecHTTP(c *Container, sessionID string, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, pipes *execPipes, detachKeys []byte, isTerminal bool, cancel <-chan bool) error {
+ if pipes == nil || pipes.startPipe == nil || pipes.attachPipe == nil {
+ return errors.Wrapf(define.ErrInvalidArg, "must provide a start and attach pipe to finish an exec attach")
+ }
+
+ defer func() {
+ if !pipes.startClosed {
+ errorhandling.CloseQuiet(pipes.startPipe)
+ pipes.startClosed = true
+ }
+ if !pipes.attachClosed {
+ errorhandling.CloseQuiet(pipes.attachPipe)
+ pipes.attachClosed = true
+ }
+ }()
+
+ logrus.Debugf("Attaching to container %s exec session %s", c.ID(), sessionID)
+
+ // set up the socket path, such that it is the correct length and location for exec
+ sockPath, err := c.execAttachSocketPath(sessionID)
+ if err != nil {
+ return err
+ }
+ socketPath := buildSocketPath(sockPath)
+
+ // 2: read from attachFd that the parent process has set up the console socket
+ if _, err := readConmonPipeData(pipes.attachPipe, ""); err != nil {
+ return err
+ }
+
+ // 2: then attach
+ conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: socketPath, Net: "unixpacket"})
+ if err != nil {
+ return errors.Wrapf(err, "failed to connect to container's attach socket: %v", socketPath)
+ }
+ defer func() {
+ if err := conn.Close(); err != nil {
+ logrus.Errorf("unable to close socket: %q", err)
+ }
+ }()
+
+ // Make a channel to pass errors back
+ errChan := make(chan error)
+
+ attachStdout := true
+ attachStderr := true
+ attachStdin := true
+ if streams != nil {
+ attachStdout = streams.Stdout
+ attachStderr = streams.Stderr
+ attachStdin = streams.Stdin
+ }
+
+ // Next, STDIN. Avoid entirely if attachStdin unset.
+ if attachStdin {
+ go func() {
+ logrus.Debugf("Beginning STDIN copy")
+ _, err := utils.CopyDetachable(conn, httpBuf, detachKeys)
+ logrus.Debugf("STDIN copy completed")
+ errChan <- err
+ }()
+ }
+
+ // 4: send start message to child
+ if err := writeConmonPipeData(pipes.startPipe); err != nil {
+ return err
+ }
+
+ // Handle STDOUT/STDERR *after* start message is sent
+ go func() {
+ var err error
+ if isTerminal {
+ // Hack: return immediately if attachStdout not set to
+ // emulate Docker.
+ // Basically, when terminal is set, STDERR goes nowhere.
+ // Everything does over STDOUT.
+ // Therefore, if not attaching STDOUT - we'll never copy
+ // anything from here.
+ logrus.Debugf("Performing terminal HTTP attach for container %s", c.ID())
+ if attachStdout {
+ err = httpAttachTerminalCopy(conn, httpBuf, c.ID())
+ }
+ } else {
+ logrus.Debugf("Performing non-terminal HTTP attach for container %s", c.ID())
+ err = httpAttachNonTerminalCopy(conn, httpBuf, c.ID(), attachStdin, attachStdout, attachStderr)
+ }
+ errChan <- err
+ logrus.Debugf("STDOUT/ERR copy completed")
+ }()
+
+ if cancel != nil {
+ select {
+ case err := <-errChan:
+ return err
+ case <-cancel:
+ return nil
+ }
+ } else {
+ var connErr error = <-errChan
+ return connErr
+ }
+}
diff --git a/libpod/oci_missing.go b/libpod/oci_missing.go
index 172805b0d..626740f72 100644
--- a/libpod/oci_missing.go
+++ b/libpod/oci_missing.go
@@ -121,7 +121,12 @@ func (r *MissingRuntime) AttachResize(ctr *Container, newSize remotecommand.Term
}
// ExecContainer is not available as the runtime is missing
-func (r *MissingRuntime) ExecContainer(ctr *Container, sessionID string, options *ExecOptions) (int, chan error, error) {
+func (r *MissingRuntime) ExecContainer(ctr *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error) {
+ return -1, nil, r.printError()
+}
+
+// ExecContainerHTTP is not available as the runtime is missing
+func (r *MissingRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) {
return -1, nil, r.printError()
}
diff --git a/libpod/runtime_ctr.go b/libpod/runtime_ctr.go
index 1d880531e..c670822a0 100644
--- a/libpod/runtime_ctr.go
+++ b/libpod/runtime_ctr.go
@@ -488,20 +488,25 @@ func (r *Runtime) removeContainer(ctx context.Context, c *Container, force bool,
}
}
+ var cleanupErr error
+
+ // Clean up network namespace, cgroups, mounts.
+ // Do this before we set ContainerStateRemoving, to ensure that we can
+ // actually remove from the OCI runtime.
+ if err := c.cleanup(ctx); err != nil {
+ cleanupErr = errors.Wrapf(err, "error cleaning up container %s", c.ID())
+ }
+
// Set ContainerStateRemoving
c.state.State = define.ContainerStateRemoving
if err := c.save(); err != nil {
+ if cleanupErr != nil {
+ logrus.Errorf(err.Error())
+ }
return errors.Wrapf(err, "unable to set container %s removing state in database", c.ID())
}
- var cleanupErr error
-
- // Clean up network namespace, cgroups, mounts
- if err := c.cleanup(ctx); err != nil {
- cleanupErr = errors.Wrapf(err, "error cleaning up container %s", c.ID())
- }
-
// Stop the container's storage
if err := c.teardownStorage(); err != nil {
if cleanupErr == nil {