diff options
-rw-r--r-- | libpod/container_exec.go | 227 | ||||
-rw-r--r-- | libpod/container_internal.go | 8 | ||||
-rw-r--r-- | libpod/oci.go | 15 | ||||
-rw-r--r-- | libpod/oci_conmon_linux.go | 482 | ||||
-rw-r--r-- | libpod/oci_missing.go | 7 | ||||
-rw-r--r-- | pkg/api/handlers/compat/containers_attach.go | 11 | ||||
-rw-r--r-- | pkg/api/handlers/compat/exec.go | 72 | ||||
-rw-r--r-- | pkg/api/handlers/types.go | 5 | ||||
-rw-r--r-- | pkg/api/server/register_exec.go | 20 |
9 files changed, 653 insertions, 194 deletions
diff --git a/libpod/container_exec.go b/libpod/container_exec.go index c1ce8b724..6ad767b4b 100644 --- a/libpod/container_exec.go +++ b/libpod/container_exec.go @@ -1,7 +1,9 @@ package libpod import ( + "bufio" "io/ioutil" + "net" "os" "path/filepath" "strconv" @@ -102,7 +104,7 @@ func (e *ExecSession) Inspect() (*define.InspectExecSession, error) { } output := new(define.InspectExecSession) - output.CanRemove = e.State != define.ExecStateRunning + output.CanRemove = e.State == define.ExecStateStopped output.ContainerID = e.ContainerId if e.Config.DetachKeys != nil { output.DetachKeys = *e.Config.DetachKeys @@ -156,9 +158,6 @@ func (c *Container) ExecCreate(config *ExecConfig) (string, error) { if len(config.Command) == 0 { return "", errors.Wrapf(define.ErrInvalidArg, "must provide a non-empty command to start an exec session") } - if config.Terminal && (config.AttachStdin || config.AttachStdout || config.AttachStderr) { - return "", errors.Wrapf(define.ErrInvalidArg, "cannot specify streams to attach to when exec session has a pseudoterminal") - } // Verify that we are in a good state to continue if !c.ensureState(define.ContainerStateRunning) { @@ -247,34 +246,12 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS logrus.Infof("Going to start container %s exec session %s and attach to it", c.ID(), session.ID()) - // TODO: check logic here - should we set Privileged if the container is - // privileged? - var capList []string - if session.Config.Privileged || c.config.Privileged { - capList = capabilities.AllCapabilities() - } - - user := c.config.User - if session.Config.User != "" { - user = session.Config.User - } - - if err := c.createExecBundle(session.ID()); err != nil { + opts, err := prepareForExec(c, session) + if err != nil { return err } - opts := new(ExecOptions) - opts.Cmd = session.Config.Command - opts.CapAdd = capList - opts.Env = session.Config.Environment - opts.Terminal = session.Config.Terminal - opts.Cwd = session.Config.WorkDir - opts.User = user - opts.Streams = streams - opts.PreserveFDs = session.Config.PreserveFDs - opts.DetachKeys = session.Config.DetachKeys - - pid, attachChan, err := c.ociRuntime.ExecContainer(c, session.ID(), opts) + pid, attachChan, err := c.ociRuntime.ExecContainer(c, session.ID(), opts, streams) if err != nil { return err } @@ -318,28 +295,124 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS c.lock.Lock() } - // Sync the container to pick up state changes - if err := c.syncContainer(); err != nil { + if err := writeExecExitCode(c, session.ID(), exitCode); err != nil { if lastErr != nil { logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr) } - return errors.Wrapf(err, "error syncing container %s state to remove exec session %s", c.ID(), session.ID()) + lastErr = err } - // Update status - // Since we did a syncContainer, the old session has been overwritten. - // Grab a fresh one from the database. - session, ok = c.state.ExecSessions[sessionID] + // Clean up after ourselves + if err := c.cleanupExecBundle(session.ID()); err != nil { + if lastErr != nil { + logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr) + } + lastErr = err + } + + return lastErr +} + +// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session. +func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) (deferredErr error) { + // TODO: How do we combine streams with the default streams set in the exec session? + + // The flow here is somewhat strange, because we need to determine if + // there's a terminal ASAP (for error handling). + // Until we know, assume it's true (don't add standard stream headers). + // Add a defer to ensure our invariant (HTTP session is closed) is + // maintained. + isTerminal := true + defer func() { + hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf) + }() + + if !c.batched { + c.lock.Lock() + defer c.lock.Unlock() + + if err := c.syncContainer(); err != nil { + return err + } + } + + session, ok := c.state.ExecSessions[sessionID] if !ok { - // Exec session already removed. - logrus.Infof("Container %s exec session %s already removed from database", c.ID(), sessionID) - return nil + return errors.Wrapf(define.ErrNoSuchExecSession, "container %s has no exec session with ID %s", c.ID(), sessionID) } - session.State = define.ExecStateStopped - session.ExitCode = exitCode - session.PID = 0 + // We can now finally get the real value of isTerminal. + isTerminal = session.Config.Terminal + + // Verify that we are in a good state to continue + if !c.ensureState(define.ContainerStateRunning) { + return errors.Wrapf(define.ErrCtrStateInvalid, "can only start exec sessions when their container is running") + } + + if session.State != define.ExecStateCreated { + return errors.Wrapf(define.ErrExecSessionStateInvalid, "can only start created exec sessions, while container %s session %s state is %q", c.ID(), session.ID(), session.State.String()) + } + + logrus.Infof("Going to start container %s exec session %s and attach to it", c.ID(), session.ID()) + + execOpts, err := prepareForExec(c, session) + if err != nil { + return err + } + + if streams == nil { + streams = new(HTTPAttachStreams) + streams.Stdin = session.Config.AttachStdin + streams.Stdout = session.Config.AttachStdout + streams.Stderr = session.Config.AttachStderr + } + + pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, httpCon, httpBuf, streams, cancel) + if err != nil { + return err + } + + // TODO: Investigate whether more of this can be made common with + // ExecStartAndAttach + + c.newContainerEvent(events.Exec) + logrus.Debugf("Successfully started exec session %s in container %s", session.ID(), c.ID()) + + var lastErr error + + session.PID = pid + session.State = define.ExecStateRunning if err := c.save(); err != nil { + lastErr = err + } + + // Unlock so other processes can use the container + if !c.batched { + c.lock.Unlock() + } + + tmpErr := <-attachChan + if lastErr != nil { + logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr) + } + lastErr = tmpErr + + exitCode, err := c.readExecExitCode(session.ID()) + if err != nil { + if lastErr != nil { + logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr) + } + lastErr = err + } + + logrus.Debugf("Container %s exec session %s completed with exit code %d", c.ID(), session.ID(), exitCode) + + // Lock again + if !c.batched { + c.lock.Lock() + } + + if err := writeExecExitCode(c, session.ID(), exitCode); err != nil { if lastErr != nil { logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr) } @@ -357,12 +430,6 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS return lastErr } -// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session. -func (c *Container) ExecHTTPStartAndAttach(sessionID string) error { - // Will be implemented in part 2, migrating Start. - return define.ErrNotImplemented -} - // ExecStop stops an exec session in the container. // If a timeout is provided, it will be used; otherwise, the timeout will // default to the stop timeout of the container. @@ -814,3 +881,67 @@ func (c *Container) removeAllExecSessions() error { return lastErr } + +// Make an ExecOptions struct to start the OCI runtime and prepare its exec +// bundle. +func prepareForExec(c *Container, session *ExecSession) (*ExecOptions, error) { + // TODO: check logic here - should we set Privileged if the container is + // privileged? + var capList []string + if session.Config.Privileged || c.config.Privileged { + capList = capabilities.AllCapabilities() + } + + user := c.config.User + if session.Config.User != "" { + user = session.Config.User + } + + if err := c.createExecBundle(session.ID()); err != nil { + return nil, err + } + + opts := new(ExecOptions) + opts.Cmd = session.Config.Command + opts.CapAdd = capList + opts.Env = session.Config.Environment + opts.Terminal = session.Config.Terminal + opts.Cwd = session.Config.WorkDir + opts.User = user + opts.PreserveFDs = session.Config.PreserveFDs + opts.DetachKeys = session.Config.DetachKeys + + return opts, nil +} + +// Write an exec session's exit code to the database +func writeExecExitCode(c *Container, sessionID string, exitCode int) error { + // We can't reuse the old exec session (things may have changed from + // under use, the container was unlocked). + // So re-sync and get a fresh copy. + // If we can't do this, no point in continuing, any attempt to save + // would write garbage to the DB. + if err := c.syncContainer(); err != nil { + if errors.Cause(err) == define.ErrNoSuchCtr || errors.Cause(err) == define.ErrCtrRemoved { + // Container's entirely removed. We can't save status, + // but the container's entirely removed, so we don't + // need to. Exit without error. + return nil + } + return errors.Wrapf(err, "error syncing container %s state to remove exec session %s", c.ID(), sessionID) + } + + session, ok := c.state.ExecSessions[sessionID] + if !ok { + // Exec session already removed. + logrus.Infof("Container %s exec session %s already removed from database", c.ID(), sessionID) + return nil + } + + session.State = define.ExecStateStopped + session.ExitCode = exitCode + session.PID = 0 + + // Finally, save our changes. + return c.save() +} diff --git a/libpod/container_internal.go b/libpod/container_internal.go index 5baa5fc1c..909ad9851 100644 --- a/libpod/container_internal.go +++ b/libpod/container_internal.go @@ -1011,6 +1011,14 @@ func (c *Container) init(ctx context.Context, retainRetries bool) error { logrus.Debugf("Created container %s in OCI runtime", c.ID()) + // Remove any exec sessions leftover from a potential prior run. + if len(c.state.ExecSessions) > 0 { + if err := c.runtime.state.RemoveContainerExecSessions(c); err != nil { + logrus.Errorf("Error removing container %s exec sessions from DB: %v", c.ID(), err) + } + c.state.ExecSessions = make(map[string]*ExecSession) + } + c.state.ExitCode = 0 c.state.Exited = false c.state.State = define.ContainerStateCreated diff --git a/libpod/oci.go b/libpod/oci.go index 9991c5625..6b1886f80 100644 --- a/libpod/oci.go +++ b/libpod/oci.go @@ -61,8 +61,7 @@ type OCIRuntime interface { // the attach session to be terminated if provided via the STDIN // channel. If they are not provided, the default detach keys will be // used instead. Detach keys of "" will disable detaching via keyboard. - // The streams parameter may be passed for containers that did not - // create a terminal and will determine which streams to forward to the + // The streams parameter will determine which streams to forward to the // client. HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error // AttachResize resizes the terminal in use by the given container. @@ -71,7 +70,17 @@ type OCIRuntime interface { // ExecContainer executes a command in a running container. // Returns an int (exit code), error channel (errors from attach), and // error (errors that occurred attempting to start the exec session). - ExecContainer(ctr *Container, sessionID string, options *ExecOptions) (int, chan error, error) + // This returns once the exec session is running - not once it has + // completed, as one might expect. The attach session will remain + // running, in a goroutine that will return via the chan error in the + // return signature. + ExecContainer(ctr *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error) + // ExecContainerHTTP executes a command in a running container and + // attaches its standard streams to a provided hijacked HTTP session. + // Maintains the same invariants as ExecContainer (returns on session + // start, with a goroutine running in the background to handle attach). + // The HTTP attach itself maintains the same invariants as HTTPAttach. + ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) // ExecAttachResize resizes the terminal of a running exec session. Only // allowed with sessions that were created with a TTY. ExecAttachResize(ctr *Container, sessionID string, newSize remotecommand.TerminalSize) error diff --git a/libpod/oci_conmon_linux.go b/libpod/oci_conmon_linux.go index d1c1a1fc2..895a67747 100644 --- a/libpod/oci_conmon_linux.go +++ b/libpod/oci_conmon_linux.go @@ -636,8 +636,7 @@ func (r *ConmonOCIRuntime) AttachResize(ctr *Container, newSize remotecommand.Te } // ExecContainer executes a command in a running container -// TODO: Split into Create/Start/Attach/Wait -func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options *ExecOptions) (int, chan error, error) { +func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error) { if options == nil { return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide an ExecOptions struct to ExecContainer") } @@ -649,178 +648,111 @@ func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options return -1, nil, errors.Wrapf(define.ErrEmptyID, "must provide a session ID for exec") } - // create sync pipe to receive the pid - parentSyncPipe, childSyncPipe, err := newPipe() - if err != nil { - return -1, nil, errors.Wrapf(err, "error creating socket pair") + // TODO: Should we default this to false? + // Or maybe make streams mandatory? + attachStdin := true + if streams != nil { + attachStdin = streams.AttachInput } - defer errorhandling.CloseQuiet(parentSyncPipe) - - // create start pipe to set the cgroup before running - // attachToExec is responsible for closing parentStartPipe - childStartPipe, parentStartPipe, err := newPipe() - if err != nil { - return -1, nil, errors.Wrapf(err, "error creating socket pair") + var ociLog string + if logrus.GetLevel() != logrus.DebugLevel && r.supportsJSON { + ociLog = c.execOCILog(sessionID) } - // We want to make sure we close the parent{Start,Attach}Pipes if we fail - // but also don't want to close them after attach to exec is called - attachToExecCalled := false - - defer func() { - if !attachToExecCalled { - errorhandling.CloseQuiet(parentStartPipe) - } - }() - - // create the attach pipe to allow attach socket to be created before - // $RUNTIME exec starts running. This is to make sure we can capture all output - // from the process through that socket, rather than half reading the log, half attaching to the socket - // attachToExec is responsible for closing parentAttachPipe - parentAttachPipe, childAttachPipe, err := newPipe() + execCmd, pipes, err := r.startExec(c, sessionID, options, attachStdin, ociLog) if err != nil { - return -1, nil, errors.Wrapf(err, "error creating socket pair") + return -1, nil, err } + // Only close sync pipe. Start and attach are consumed in the attach + // goroutine. defer func() { - if !attachToExecCalled { - errorhandling.CloseQuiet(parentAttachPipe) + if pipes.syncPipe != nil && !pipes.syncClosed { + errorhandling.CloseQuiet(pipes.syncPipe) + pipes.syncClosed = true } }() - childrenClosed := false - defer func() { - if !childrenClosed { - errorhandling.CloseQuiet(childSyncPipe) - errorhandling.CloseQuiet(childAttachPipe) - errorhandling.CloseQuiet(childStartPipe) - } + // TODO Only create if !detach + // Attach to the container before starting it + attachChan := make(chan error) + go func() { + // attachToExec is responsible for closing pipes + attachChan <- c.attachToExec(streams, options.DetachKeys, sessionID, pipes.startPipe, pipes.attachPipe) + close(attachChan) }() - runtimeDir, err := util.GetRuntimeDir() - if err != nil { - return -1, nil, err - } - - finalEnv := make([]string, 0, len(options.Env)) - for k, v := range options.Env { - finalEnv = append(finalEnv, fmt.Sprintf("%s=%s", k, v)) - } - - processFile, err := prepareProcessExec(c, options.Cmd, finalEnv, options.Terminal, options.Cwd, options.User, sessionID) - if err != nil { - return -1, nil, err - } - - var ociLog string - if logrus.GetLevel() != logrus.DebugLevel && r.supportsJSON { - ociLog = c.execOCILog(sessionID) + if err := execCmd.Wait(); err != nil { + return -1, nil, errors.Wrapf(err, "cannot run conmon") } - args := r.sharedConmonArgs(c, sessionID, c.execBundlePath(sessionID), c.execPidPath(sessionID), c.execLogPath(sessionID), c.execExitFileDir(sessionID), ociLog, "") - if options.PreserveFDs > 0 { - args = append(args, formatRuntimeOpts("--preserve-fds", fmt.Sprintf("%d", options.PreserveFDs))...) - } + pid, err := readConmonPipeData(pipes.syncPipe, ociLog) - for _, capability := range options.CapAdd { - args = append(args, formatRuntimeOpts("--cap", capability)...) - } + return pid, attachChan, err +} - if options.Terminal { - args = append(args, "-t") +// ExecContainerHTTP executes a new command in an existing container and +// forwards its standard streams over an attach +func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) { + if streams != nil { + if !streams.Stdin && !streams.Stdout && !streams.Stderr { + return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide at least one stream to attach to") + } } - if options.Streams != nil && options.Streams.AttachInput { - args = append(args, "-i") + if options == nil { + return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide exec options to ExecContainerHTTP") } - // Append container ID and command - args = append(args, "-e") - // TODO make this optional when we can detach - args = append(args, "--exec-attach") - args = append(args, "--exec-process-spec", processFile.Name()) - - logrus.WithFields(logrus.Fields{ - "args": args, - }).Debugf("running conmon: %s", r.conmonPath) - execCmd := exec.Command(r.conmonPath, args...) - - if options.Streams != nil { - // Don't add the InputStream to the execCmd. Instead, the data should be passed - // through CopyDetachable - if options.Streams.AttachOutput { - execCmd.Stdout = options.Streams.OutputStream - } - if options.Streams.AttachError { - execCmd.Stderr = options.Streams.ErrorStream - } + detachString := config.DefaultDetachKeys + if options.DetachKeys != nil { + detachString = *options.DetachKeys } - - conmonEnv, extraFiles, err := r.configureConmonEnv(runtimeDir) + detachKeys, err := processDetachKeys(detachString) if err != nil { return -1, nil, err } - if options.PreserveFDs > 0 { - for fd := 3; fd < int(3+options.PreserveFDs); fd++ { - execCmd.ExtraFiles = append(execCmd.ExtraFiles, os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd))) - } + // TODO: Should we default this to false? + // Or maybe make streams mandatory? + attachStdin := true + if streams != nil { + attachStdin = streams.Stdin } - // we don't want to step on users fds they asked to preserve - // Since 0-2 are used for stdio, start the fds we pass in at preserveFDs+3 - execCmd.Env = r.conmonEnv - execCmd.Env = append(execCmd.Env, fmt.Sprintf("_OCI_SYNCPIPE=%d", options.PreserveFDs+3), fmt.Sprintf("_OCI_STARTPIPE=%d", options.PreserveFDs+4), fmt.Sprintf("_OCI_ATTACHPIPE=%d", options.PreserveFDs+5)) - execCmd.Env = append(execCmd.Env, conmonEnv...) - - execCmd.ExtraFiles = append(execCmd.ExtraFiles, childSyncPipe, childStartPipe, childAttachPipe) - execCmd.ExtraFiles = append(execCmd.ExtraFiles, extraFiles...) - execCmd.Dir = c.execBundlePath(sessionID) - execCmd.SysProcAttr = &syscall.SysProcAttr{ - Setpgid: true, + var ociLog string + if logrus.GetLevel() != logrus.DebugLevel && r.supportsJSON { + ociLog = ctr.execOCILog(sessionID) } - err = startCommandGivenSelinux(execCmd) - - // We don't need children pipes on the parent side - errorhandling.CloseQuiet(childSyncPipe) - errorhandling.CloseQuiet(childAttachPipe) - errorhandling.CloseQuiet(childStartPipe) - childrenClosed = true - + execCmd, pipes, err := r.startExec(ctr, sessionID, options, attachStdin, ociLog) if err != nil { - return -1, nil, errors.Wrapf(err, "cannot start container %s", c.ID()) - } - if err := r.moveConmonToCgroupAndSignal(c, execCmd, parentStartPipe); err != nil { return -1, nil, err } - if options.PreserveFDs > 0 { - for fd := 3; fd < int(3+options.PreserveFDs); fd++ { - // These fds were passed down to the runtime. Close them - // and not interfere - if err := os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd)).Close(); err != nil { - logrus.Debugf("unable to close file fd-%d", fd) - } + // Only close sync pipe. Start and attach are consumed in the attach + // goroutine. + defer func() { + if pipes.syncPipe != nil && !pipes.syncClosed { + errorhandling.CloseQuiet(pipes.syncPipe) + pipes.syncClosed = true } - } + }() - // TODO Only create if !detach - // Attach to the container before starting it attachChan := make(chan error) go func() { // attachToExec is responsible for closing pipes - attachChan <- c.attachToExec(options.Streams, options.DetachKeys, sessionID, parentStartPipe, parentAttachPipe) + attachChan <- attachExecHTTP(ctr, sessionID, httpBuf, streams, pipes, detachKeys, options.Terminal, cancel) close(attachChan) }() - attachToExecCalled = true + // Wait for conmon to succeed, when return. if err := execCmd.Wait(); err != nil { return -1, nil, errors.Wrapf(err, "cannot run conmon") } - pid, err := readConmonPipeData(parentSyncPipe, ociLog) + pid, err := readConmonPipeData(pipes.syncPipe, ociLog) return pid, attachChan, err } @@ -1829,3 +1761,297 @@ func httpAttachNonTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter, } } + +// This contains pipes used by the exec API. +type execPipes struct { + syncPipe *os.File + syncClosed bool + startPipe *os.File + startClosed bool + attachPipe *os.File + attachClosed bool +} + +func (p *execPipes) cleanup() { + if p.syncPipe != nil && !p.syncClosed { + errorhandling.CloseQuiet(p.syncPipe) + p.syncClosed = true + } + if p.startPipe != nil && !p.startClosed { + errorhandling.CloseQuiet(p.startPipe) + p.startClosed = true + } + if p.attachPipe != nil && !p.attachClosed { + errorhandling.CloseQuiet(p.attachPipe) + p.attachClosed = true + } +} + +// Start an exec session's conmon parent from the given options. +func (r *ConmonOCIRuntime) startExec(c *Container, sessionID string, options *ExecOptions, attachStdin bool, ociLog string) (_ *exec.Cmd, _ *execPipes, deferredErr error) { + pipes := new(execPipes) + + if options == nil { + return nil, nil, errors.Wrapf(define.ErrInvalidArg, "must provide an ExecOptions struct to ExecContainer") + } + if len(options.Cmd) == 0 { + return nil, nil, errors.Wrapf(define.ErrInvalidArg, "must provide a command to execute") + } + + if sessionID == "" { + return nil, nil, errors.Wrapf(define.ErrEmptyID, "must provide a session ID for exec") + } + + // create sync pipe to receive the pid + parentSyncPipe, childSyncPipe, err := newPipe() + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating socket pair") + } + pipes.syncPipe = parentSyncPipe + + defer func() { + if deferredErr != nil { + pipes.cleanup() + } + }() + + // create start pipe to set the cgroup before running + // attachToExec is responsible for closing parentStartPipe + childStartPipe, parentStartPipe, err := newPipe() + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating socket pair") + } + pipes.startPipe = parentStartPipe + + // create the attach pipe to allow attach socket to be created before + // $RUNTIME exec starts running. This is to make sure we can capture all output + // from the process through that socket, rather than half reading the log, half attaching to the socket + // attachToExec is responsible for closing parentAttachPipe + parentAttachPipe, childAttachPipe, err := newPipe() + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating socket pair") + } + pipes.attachPipe = parentAttachPipe + + childrenClosed := false + defer func() { + if !childrenClosed { + errorhandling.CloseQuiet(childSyncPipe) + errorhandling.CloseQuiet(childAttachPipe) + errorhandling.CloseQuiet(childStartPipe) + } + }() + + runtimeDir, err := util.GetRuntimeDir() + if err != nil { + return nil, nil, err + } + + finalEnv := make([]string, 0, len(options.Env)) + for k, v := range options.Env { + finalEnv = append(finalEnv, fmt.Sprintf("%s=%s", k, v)) + } + + processFile, err := prepareProcessExec(c, options.Cmd, finalEnv, options.Terminal, options.Cwd, options.User, sessionID) + if err != nil { + return nil, nil, err + } + + args := r.sharedConmonArgs(c, sessionID, c.execBundlePath(sessionID), c.execPidPath(sessionID), c.execLogPath(sessionID), c.execExitFileDir(sessionID), ociLog, "") + + if options.PreserveFDs > 0 { + args = append(args, formatRuntimeOpts("--preserve-fds", fmt.Sprintf("%d", options.PreserveFDs))...) + } + + for _, capability := range options.CapAdd { + args = append(args, formatRuntimeOpts("--cap", capability)...) + } + + if options.Terminal { + args = append(args, "-t") + } + + if attachStdin { + args = append(args, "-i") + } + + // Append container ID and command + args = append(args, "-e") + // TODO make this optional when we can detach + args = append(args, "--exec-attach") + args = append(args, "--exec-process-spec", processFile.Name()) + + logrus.WithFields(logrus.Fields{ + "args": args, + }).Debugf("running conmon: %s", r.conmonPath) + // TODO: Need to pass this back so we can wait on it. + execCmd := exec.Command(r.conmonPath, args...) + + // TODO: This is commented because it doesn't make much sense in HTTP + // attach, and I'm not certain it does for non-HTTP attach as well. + // if streams != nil { + // // Don't add the InputStream to the execCmd. Instead, the data should be passed + // // through CopyDetachable + // if streams.AttachOutput { + // execCmd.Stdout = options.Streams.OutputStream + // } + // if streams.AttachError { + // execCmd.Stderr = options.Streams.ErrorStream + // } + // } + + conmonEnv, extraFiles, err := r.configureConmonEnv(runtimeDir) + if err != nil { + return nil, nil, err + } + + if options.PreserveFDs > 0 { + for fd := 3; fd < int(3+options.PreserveFDs); fd++ { + execCmd.ExtraFiles = append(execCmd.ExtraFiles, os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd))) + } + } + + // we don't want to step on users fds they asked to preserve + // Since 0-2 are used for stdio, start the fds we pass in at preserveFDs+3 + execCmd.Env = r.conmonEnv + execCmd.Env = append(execCmd.Env, fmt.Sprintf("_OCI_SYNCPIPE=%d", options.PreserveFDs+3), fmt.Sprintf("_OCI_STARTPIPE=%d", options.PreserveFDs+4), fmt.Sprintf("_OCI_ATTACHPIPE=%d", options.PreserveFDs+5)) + execCmd.Env = append(execCmd.Env, conmonEnv...) + + execCmd.ExtraFiles = append(execCmd.ExtraFiles, childSyncPipe, childStartPipe, childAttachPipe) + execCmd.ExtraFiles = append(execCmd.ExtraFiles, extraFiles...) + execCmd.Dir = c.execBundlePath(sessionID) + execCmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + err = startCommandGivenSelinux(execCmd) + + // We don't need children pipes on the parent side + errorhandling.CloseQuiet(childSyncPipe) + errorhandling.CloseQuiet(childAttachPipe) + errorhandling.CloseQuiet(childStartPipe) + childrenClosed = true + + if err != nil { + return nil, nil, errors.Wrapf(err, "cannot start container %s", c.ID()) + } + if err := r.moveConmonToCgroupAndSignal(c, execCmd, parentStartPipe); err != nil { + return nil, nil, err + } + + if options.PreserveFDs > 0 { + for fd := 3; fd < int(3+options.PreserveFDs); fd++ { + // These fds were passed down to the runtime. Close them + // and not interfere + if err := os.NewFile(uintptr(fd), fmt.Sprintf("fd-%d", fd)).Close(); err != nil { + logrus.Debugf("unable to close file fd-%d", fd) + } + } + } + + return execCmd, pipes, nil +} + +// Attach to a container over HTTP +func attachExecHTTP(c *Container, sessionID string, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, pipes *execPipes, detachKeys []byte, isTerminal bool, cancel <-chan bool) error { + if pipes == nil || pipes.startPipe == nil || pipes.attachPipe == nil { + return errors.Wrapf(define.ErrInvalidArg, "must provide a start and attach pipe to finish an exec attach") + } + + defer func() { + if !pipes.startClosed { + errorhandling.CloseQuiet(pipes.startPipe) + pipes.startClosed = true + } + if !pipes.attachClosed { + errorhandling.CloseQuiet(pipes.attachPipe) + pipes.attachClosed = true + } + }() + + logrus.Debugf("Attaching to container %s exec session %s", c.ID(), sessionID) + + // set up the socket path, such that it is the correct length and location for exec + sockPath, err := c.execAttachSocketPath(sessionID) + if err != nil { + return err + } + socketPath := buildSocketPath(sockPath) + + // 2: read from attachFd that the parent process has set up the console socket + if _, err := readConmonPipeData(pipes.attachPipe, ""); err != nil { + return err + } + + // 2: then attach + conn, err := net.DialUnix("unixpacket", nil, &net.UnixAddr{Name: socketPath, Net: "unixpacket"}) + if err != nil { + return errors.Wrapf(err, "failed to connect to container's attach socket: %v", socketPath) + } + defer func() { + if err := conn.Close(); err != nil { + logrus.Errorf("unable to close socket: %q", err) + } + }() + + // Make a channel to pass errors back + errChan := make(chan error) + + attachStdout := true + attachStderr := true + attachStdin := true + if streams != nil { + attachStdout = streams.Stdout + attachStderr = streams.Stderr + attachStdin = streams.Stdin + } + + // Next, STDIN. Avoid entirely if attachStdin unset. + if attachStdin { + go func() { + logrus.Debugf("Beginning STDIN copy") + _, err := utils.CopyDetachable(conn, httpBuf, detachKeys) + logrus.Debugf("STDIN copy completed") + errChan <- err + }() + } + + // 4: send start message to child + if err := writeConmonPipeData(pipes.startPipe); err != nil { + return err + } + + // Handle STDOUT/STDERR *after* start message is sent + go func() { + var err error + if isTerminal { + // Hack: return immediately if attachStdout not set to + // emulate Docker. + // Basically, when terminal is set, STDERR goes nowhere. + // Everything does over STDOUT. + // Therefore, if not attaching STDOUT - we'll never copy + // anything from here. + logrus.Debugf("Performing terminal HTTP attach for container %s", c.ID()) + if attachStdout { + err = httpAttachTerminalCopy(conn, httpBuf, c.ID()) + } + } else { + logrus.Debugf("Performing non-terminal HTTP attach for container %s", c.ID()) + err = httpAttachNonTerminalCopy(conn, httpBuf, c.ID(), attachStdin, attachStdout, attachStderr) + } + errChan <- err + logrus.Debugf("STDOUT/ERR copy completed") + }() + + if cancel != nil { + select { + case err := <-errChan: + return err + case <-cancel: + return nil + } + } else { + var connErr error = <-errChan + return connErr + } +} diff --git a/libpod/oci_missing.go b/libpod/oci_missing.go index 172805b0d..626740f72 100644 --- a/libpod/oci_missing.go +++ b/libpod/oci_missing.go @@ -121,7 +121,12 @@ func (r *MissingRuntime) AttachResize(ctr *Container, newSize remotecommand.Term } // ExecContainer is not available as the runtime is missing -func (r *MissingRuntime) ExecContainer(ctr *Container, sessionID string, options *ExecOptions) (int, chan error, error) { +func (r *MissingRuntime) ExecContainer(ctr *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error) { + return -1, nil, r.printError() +} + +// ExecContainerHTTP is not available as the runtime is missing +func (r *MissingRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) { return -1, nil, r.printError() } diff --git a/pkg/api/handlers/compat/containers_attach.go b/pkg/api/handlers/compat/containers_attach.go index 52c851b8c..3c9a6fd69 100644 --- a/pkg/api/handlers/compat/containers_attach.go +++ b/pkg/api/handlers/compat/containers_attach.go @@ -13,6 +13,12 @@ import ( "k8s.io/client-go/tools/remotecommand" ) +// AttachHeader is the literal header sent for upgraded/hijacked connections for +// attach, sourced from Docker at: +// https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go +// Using literally to ensure compatibility with existing clients. +const AttachHeader = "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n" + func AttachContainer(w http.ResponseWriter, r *http.Request) { runtime := r.Context().Value("runtime").(*libpod.Runtime) decoder := r.Context().Value("decoder").(*schema.Decoder) @@ -106,10 +112,7 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) { return } - // This header string sourced from Docker: - // https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go - // Using literally to ensure compatibility with existing clients. - fmt.Fprintf(connection, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n") + fmt.Fprintf(connection, AttachHeader) logrus.Debugf("Hijack for attach of container %s successful", ctr.ID()) diff --git a/pkg/api/handlers/compat/exec.go b/pkg/api/handlers/compat/exec.go index ec1a8ac96..6865a3319 100644 --- a/pkg/api/handlers/compat/exec.go +++ b/pkg/api/handlers/compat/exec.go @@ -104,4 +104,76 @@ func ExecInspectHandler(w http.ResponseWriter, r *http.Request) { } utils.WriteResponse(w, http.StatusOK, inspectOut) + + // Only for the Compat API: we want to remove sessions that were + // stopped. This is very hacky, but should suffice for now. + if !utils.IsLibpodRequest(r) && inspectOut.CanRemove { + logrus.Infof("Pruning stale exec session %s from container %s", sessionID, sessionCtr.ID()) + if err := sessionCtr.ExecRemove(sessionID, false); err != nil && errors.Cause(err) != define.ErrNoSuchExecSession { + logrus.Errorf("Error removing stale exec session %s from container %s: %v", sessionID, sessionCtr.ID(), err) + } + } +} + +// ExecStartHandler runs a given exec session. +func ExecStartHandler(w http.ResponseWriter, r *http.Request) { + runtime := r.Context().Value("runtime").(*libpod.Runtime) + + sessionID := mux.Vars(r)["id"] + + // TODO: We should read/support Tty and Detach from here. + bodyParams := new(handlers.ExecStartConfig) + + if err := json.NewDecoder(r.Body).Decode(&bodyParams); err != nil { + utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, + errors.Wrapf(err, "failed to decode parameters for %s", r.URL.String())) + return + } + if bodyParams.Detach { + utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, + errors.Errorf("Detached exec is not yet supported")) + return + } + // TODO: Verify TTY setting against what inspect session was made with + + sessionCtr, err := runtime.GetExecSessionContainer(sessionID) + if err != nil { + utils.Error(w, fmt.Sprintf("No such exec session: %s", sessionID), http.StatusNotFound, err) + return + } + + logrus.Debugf("Starting exec session %s of container %s", sessionID, sessionCtr.ID()) + + state, err := sessionCtr.State() + if err != nil { + utils.InternalServerError(w, err) + return + } + if state != define.ContainerStateRunning { + utils.Error(w, http.StatusText(http.StatusConflict), http.StatusConflict, errors.Errorf("cannot exec in a container that is not running; container %s is %s", sessionCtr.ID(), state.String())) + return + } + + // Hijack the connection + hijacker, ok := w.(http.Hijacker) + if !ok { + utils.InternalServerError(w, errors.Errorf("unable to hijack connection")) + return + } + + connection, buffer, err := hijacker.Hijack() + if err != nil { + utils.InternalServerError(w, errors.Wrapf(err, "error hijacking connection")) + return + } + + fmt.Fprintf(connection, AttachHeader) + + logrus.Debugf("Hijack for attach of container %s exec session %s successful", sessionCtr.ID(), sessionID) + + if err := sessionCtr.ExecHTTPStartAndAttach(sessionID, connection, buffer, nil, nil, nil); err != nil { + logrus.Errorf("Error attaching to container %s exec session %s: %v", sessionCtr.ID(), sessionID, err) + } + + logrus.Debugf("Attach for container %s exec session %s completed successfully", sessionCtr.ID(), sessionID) } diff --git a/pkg/api/handlers/types.go b/pkg/api/handlers/types.go index 2075d29df..d8cdd9caf 100644 --- a/pkg/api/handlers/types.go +++ b/pkg/api/handlers/types.go @@ -170,6 +170,11 @@ type ExecCreateResponse struct { docker.IDResponse } +type ExecStartConfig struct { + Detach bool `json:"Detach"` + Tty bool `json:"Tty"` +} + func ImageToImageSummary(l *libpodImage.Image) (*entities.ImageSummary, error) { containers, err := l.Containers() if err != nil { diff --git a/pkg/api/server/register_exec.go b/pkg/api/server/register_exec.go index 71fb50307..19b7e2fcd 100644 --- a/pkg/api/server/register_exec.go +++ b/pkg/api/server/register_exec.go @@ -97,10 +97,10 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error { // properties: // Detach: // type: boolean - // description: Detach from the command + // description: Detach from the command. Not presently supported. // Tty: // type: boolean - // description: Allocate a pseudo-TTY + // description: Allocate a pseudo-TTY. Presently ignored. // produces: // - application/json // responses: @@ -109,12 +109,12 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error { // 404: // $ref: "#/responses/NoSuchExecInstance" // 409: - // description: container is stopped or paused + // description: container is not running // 500: // $ref: "#/responses/InternalError" - r.Handle(VersionedPath("/exec/{id}/start"), s.APIHandler(compat.UnsupportedHandler)).Methods(http.MethodPost) + r.Handle(VersionedPath("/exec/{id}/start"), s.APIHandler(compat.ExecStartHandler)).Methods(http.MethodPost) // Added non version path to URI to support docker non versioned paths - r.Handle("/exec/{id}/start", s.APIHandler(compat.UnsupportedHandler)).Methods(http.MethodPost) + r.Handle("/exec/{id}/start", s.APIHandler(compat.ExecStartHandler)).Methods(http.MethodPost) // swagger:operation POST /exec/{id}/resize compat resizeExec // --- // tags: @@ -153,7 +153,7 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error { // tags: // - exec (compat) // summary: Inspect an exec instance - // description: Return low-level information about an exec instance. + // description: Return low-level information about an exec instance. Stale (stopped) exec sessions will be auto-removed after inspect runs. // parameters: // - in: path // name: id @@ -264,10 +264,10 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error { // properties: // Detach: // type: boolean - // description: Detach from the command + // description: Detach from the command. Not presently supported. // Tty: // type: boolean - // description: Allocate a pseudo-TTY + // description: Allocate a pseudo-TTY. Presently ignored. // produces: // - application/json // responses: @@ -276,10 +276,10 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error { // 404: // $ref: "#/responses/NoSuchExecInstance" // 409: - // description: container is stopped or paused + // description: container is not running. // 500: // $ref: "#/responses/InternalError" - r.Handle(VersionedPath("/libpod/exec/{id}/start"), s.APIHandler(compat.UnsupportedHandler)).Methods(http.MethodPost) + r.Handle(VersionedPath("/libpod/exec/{id}/start"), s.APIHandler(compat.ExecStartHandler)).Methods(http.MethodPost) // swagger:operation POST /libpod/exec/{id}/resize libpod libpodResizeExec // --- // tags: |