From a1a79c08b72793cf2f75490d8ffc844c3d16bd4a Mon Sep 17 00:00:00 2001 From: Peter Hunt Date: Mon, 1 Jul 2019 13:55:03 -0400 Subject: Implement conmon exec This includes: Implement exec -i and fix some typos in description of -i docs pass failed runtime status to caller Add resize handling for a terminal connection Customize exec systemd-cgroup slice fix healthcheck fix top add --detach-keys Implement podman-remote exec (jhonce) * Cleanup some orphaned code (jhonce) adapt remote exec for conmon exec (pehunt) Fix healthcheck and exec to match docs Introduce two new OCIRuntime errors to more comprehensively describe situations in which the runtime can error Use these different errors in branching for exit code in healthcheck and exec Set conmon to use new api version Signed-off-by: Jhon Honce Signed-off-by: Peter Hunt --- pkg/adapter/containers.go | 130 ++++++++++++++++++-------------- pkg/adapter/containers_remote.go | 42 ++++++++++- pkg/adapter/terminal_linux.go | 57 ++++++++++---- pkg/varlinkapi/attach.go | 7 +- pkg/varlinkapi/containers.go | 82 ++++++++++++++++++++ pkg/varlinkapi/virtwriter/virtwriter.go | 21 +++++- 6 files changed, 260 insertions(+), 79 deletions(-) (limited to 'pkg') diff --git a/pkg/adapter/containers.go b/pkg/adapter/containers.go index 9726b237f..47f1b091e 100644 --- a/pkg/adapter/containers.go +++ b/pkg/adapter/containers.go @@ -924,13 +924,85 @@ func (r *LocalRuntime) execPS(c *libpod.Container, args []string) ([]string, err }() cmd := append([]string{"ps"}, args...) - if err := c.Exec(false, false, []string{}, cmd, "", "", streams, 0); err != nil { + ec, err := c.Exec(false, false, []string{}, cmd, "", "", streams, 0, nil, "") + if err != nil { return nil, err + } else if ec != 0 { + return nil, errors.Errorf("Runtime failed with exit status: %d and output: %s", ec, strings.Join(psOutput, " ")) } return psOutput, nil } +// ExecContainer executes a command in the container +func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecValues) (int, error) { + var ( + ctr *Container + err error + cmd []string + ) + // default invalid command exit code + ec := 125 + + if cli.Latest { + if ctr, err = r.GetLatestContainer(); err != nil { + return ec, err + } + cmd = cli.InputArgs[0:] + } else { + if ctr, err = r.LookupContainer(cli.InputArgs[0]); err != nil { + return ec, err + } + cmd = cli.InputArgs[1:] + } + + if cli.PreserveFDs > 0 { + entries, err := ioutil.ReadDir("/proc/self/fd") + if err != nil { + return ec, errors.Wrapf(err, "unable to read /proc/self/fd") + } + + m := make(map[int]bool) + for _, e := range entries { + i, err := strconv.Atoi(e.Name()) + if err != nil { + return ec, errors.Wrapf(err, "cannot parse %s in /proc/self/fd", e.Name()) + } + m[i] = true + } + + for i := 3; i < 3+cli.PreserveFDs; i++ { + if _, found := m[i]; !found { + return ec, errors.New("invalid --preserve-fds=N specified. Not enough FDs available") + } + } + } + + // Validate given environment variables + env := map[string]string{} + if err := parse.ReadKVStrings(env, []string{}, cli.Env); err != nil { + return ec, errors.Wrapf(err, "unable to process environment variables") + } + + // Build env slice of key=value strings for Exec + envs := []string{} + for k, v := range env { + envs = append(envs, fmt.Sprintf("%s=%s", k, v)) + } + + streams := new(libpod.AttachStreams) + streams.OutputStream = os.Stdout + streams.ErrorStream = os.Stderr + if cli.Interactive { + streams.InputStream = os.Stdin + streams.AttachInput = true + } + streams.AttachOutput = true + streams.AttachError = true + + return ExecAttachCtr(ctx, ctr.Container, cli.Tty, cli.Privileged, envs, cmd, cli.User, cli.Workdir, streams, cli.PreserveFDs, cli.DetachKeys) +} + // Prune removes stopped containers func (r *LocalRuntime) Prune(ctx context.Context, maxWorkers int, force bool) ([]string, map[string]error, error) { var ( @@ -1129,59 +1201,3 @@ func (r *LocalRuntime) Commit(ctx context.Context, c *cliconfig.CommitValues, co } return newImage.ID(), nil } - -// Exec a command in a container -func (r *LocalRuntime) Exec(c *cliconfig.ExecValues, cmd []string) error { - var ctr *Container - var err error - - if c.Latest { - ctr, err = r.GetLatestContainer() - } else { - ctr, err = r.LookupContainer(c.InputArgs[0]) - } - if err != nil { - return errors.Wrapf(err, "unable to exec into %s", c.InputArgs[0]) - } - - if c.PreserveFDs > 0 { - entries, err := ioutil.ReadDir("/proc/self/fd") - if err != nil { - return errors.Wrapf(err, "unable to read /proc/self/fd") - } - m := make(map[int]bool) - for _, e := range entries { - i, err := strconv.Atoi(e.Name()) - if err != nil { - return errors.Wrapf(err, "cannot parse %s in /proc/self/fd", e.Name()) - } - m[i] = true - } - for i := 3; i < 3+c.PreserveFDs; i++ { - if _, found := m[i]; !found { - return errors.New("invalid --preserve-fds=N specified. Not enough FDs available") - } - } - } - - // ENVIRONMENT VARIABLES - env := map[string]string{} - - if err := parse.ReadKVStrings(env, []string{}, c.Env); err != nil { - return errors.Wrapf(err, "unable to process environment variables") - } - envs := []string{} - for k, v := range env { - envs = append(envs, fmt.Sprintf("%s=%s", k, v)) - } - - streams := new(libpod.AttachStreams) - streams.OutputStream = os.Stdout - streams.ErrorStream = os.Stderr - streams.InputStream = os.Stdin - streams.AttachOutput = true - streams.AttachError = true - streams.AttachInput = true - - return ctr.Exec(c.Tty, c.Privileged, envs, cmd, c.User, c.Workdir, streams, c.PreserveFDs) -} diff --git a/pkg/adapter/containers_remote.go b/pkg/adapter/containers_remote.go index fc23381a4..6b9fc8ee7 100644 --- a/pkg/adapter/containers_remote.go +++ b/pkg/adapter/containers_remote.go @@ -14,6 +14,7 @@ import ( "github.com/containers/libpod/cmd/podman/cliconfig" "github.com/containers/libpod/cmd/podman/shared" + "github.com/containers/libpod/cmd/podman/shared/parse" iopodman "github.com/containers/libpod/cmd/podman/varlink" "github.com/containers/libpod/libpod" "github.com/containers/libpod/libpod/define" @@ -1034,7 +1035,42 @@ func (r *LocalRuntime) Commit(ctx context.Context, c *cliconfig.CommitValues, co return iid, nil } -// Exec executes a container in a running container -func (r *LocalRuntime) Exec(c *cliconfig.ExecValues, cmd []string) error { - return define.ErrNotImplemented +// ExecContainer executes a command in the container +func (r *LocalRuntime) ExecContainer(ctx context.Context, cli *cliconfig.ExecValues) (int, error) { + // default invalid command exit code + ec := 125 + // Validate given environment variables + env := map[string]string{} + if err := parse.ReadKVStrings(env, []string{}, cli.Env); err != nil { + return -1, errors.Wrapf(err, "Exec unable to process environment variables") + } + + // Build env slice of key=value strings for Exec + envs := []string{} + for k, v := range env { + envs = append(envs, fmt.Sprintf("%s=%s", k, v)) + } + + opts := iopodman.ExecOpts{ + Name: cli.InputArgs[0], + Tty: cli.Tty, + Privileged: cli.Privileged, + Cmd: cli.InputArgs[1:], + User: &cli.User, + Workdir: &cli.Workdir, + Env: &envs, + } + + receive, err := iopodman.ExecContainer().Send(r.Conn, varlink.Upgrade, opts) + if err != nil { + return ec, errors.Wrapf(err, "Exec failed to contact service for %s", cli.InputArgs) + } + + _, err = receive() + if err != nil { + return ec, errors.Wrapf(err, "Exec operation failed for %s", cli.InputArgs) + } + + // TODO return exit code from exec call + return 0, nil } diff --git a/pkg/adapter/terminal_linux.go b/pkg/adapter/terminal_linux.go index 9f6ddc2e6..6e63dd87b 100644 --- a/pkg/adapter/terminal_linux.go +++ b/pkg/adapter/terminal_linux.go @@ -13,6 +13,25 @@ import ( "k8s.io/client-go/tools/remotecommand" ) +// ExecAttachCtr execs and attaches to a container +func ExecAttachCtr(ctx context.Context, ctr *libpod.Container, tty, privileged bool, env, cmd []string, user, workDir string, streams *libpod.AttachStreams, preserveFDs int, detachKeys string) (int, error) { + resize := make(chan remotecommand.TerminalSize) + + haveTerminal := terminal.IsTerminal(int(os.Stdin.Fd())) + + // Check if we are attached to a terminal. If we are, generate resize + // events, and set the terminal to raw mode + if haveTerminal && tty { + cancel, oldTermState, err := handleTerminalAttach(ctx, resize) + if err != nil { + return -1, err + } + defer cancel() + defer restoreTerminal(oldTermState) + } + return ctr.Exec(tty, privileged, env, cmd, user, workDir, streams, preserveFDs, resize, detachKeys) +} + // StartAttachCtr starts and (if required) attaches to a container // if you change the signature of this function from os.File to io.Writer, it will trigger a downstream // error. we may need to just lint disable this one. @@ -24,28 +43,16 @@ func StartAttachCtr(ctx context.Context, ctr *libpod.Container, stdout, stderr, // Check if we are attached to a terminal. If we are, generate resize // events, and set the terminal to raw mode if haveTerminal && ctr.Spec().Process.Terminal { - logrus.Debugf("Handling terminal attach") - - subCtx, cancel := context.WithCancel(ctx) - defer cancel() - - resizeTty(subCtx, resize) - - oldTermState, err := term.SaveState(os.Stdin.Fd()) + cancel, oldTermState, err := handleTerminalAttach(ctx, resize) if err != nil { - return errors.Wrapf(err, "unable to save terminal state") - } - - logrus.SetFormatter(&RawTtyFormatter{}) - if _, err := term.SetRawTerminal(os.Stdin.Fd()); err != nil { return err } - defer func() { if err := restoreTerminal(oldTermState); err != nil { logrus.Errorf("unable to restore terminal: %q", err) } }() + defer cancel() } streams := new(libpod.AttachStreams) @@ -97,3 +104,25 @@ func StartAttachCtr(ctx context.Context, ctr *libpod.Container, stdout, stderr, return nil } + +func handleTerminalAttach(ctx context.Context, resize chan remotecommand.TerminalSize) (context.CancelFunc, *term.State, error) { + logrus.Debugf("Handling terminal attach") + + subCtx, cancel := context.WithCancel(ctx) + + resizeTty(subCtx, resize) + + oldTermState, err := term.SaveState(os.Stdin.Fd()) + if err != nil { + // allow caller to not have to do any cleaning up if we error here + cancel() + return nil, nil, errors.Wrapf(err, "unable to save terminal state") + } + + logrus.SetFormatter(&RawTtyFormatter{}) + if _, err := term.SetRawTerminal(os.Stdin.Fd()); err != nil { + return nil, nil, err + } + + return cancel, oldTermState, nil +} diff --git a/pkg/varlinkapi/attach.go b/pkg/varlinkapi/attach.go index afa88e6a3..97ba525a5 100644 --- a/pkg/varlinkapi/attach.go +++ b/pkg/varlinkapi/attach.go @@ -82,9 +82,10 @@ func (i *LibpodAPI) Attach(call iopodman.VarlinkCall, name string, detachKeys st if finalErr != define.ErrDetach && finalErr != nil { logrus.Error(finalErr) } - quitWriter := virtwriter.NewVirtWriteCloser(writer, virtwriter.Quit) - _, err = quitWriter.Write([]byte("HANG-UP")) - // TODO error handling is not quite right here yet + + if err = virtwriter.HangUp(writer); err != nil { + logrus.Errorf("Failed to HANG-UP attach to %s: %s", ctr.ID(), err.Error()) + } return call.Writer.Flush() } diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go index 6f6909fac..19a8bfd2e 100644 --- a/pkg/varlinkapi/containers.go +++ b/pkg/varlinkapi/containers.go @@ -19,8 +19,11 @@ import ( "github.com/containers/libpod/libpod/define" "github.com/containers/libpod/libpod/logs" "github.com/containers/libpod/pkg/adapter/shortcuts" + "github.com/containers/libpod/pkg/varlinkapi/virtwriter" "github.com/containers/storage/pkg/archive" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/remotecommand" ) // ListContainers ... @@ -756,3 +759,82 @@ func (i *LibpodAPI) Top(call iopodman.VarlinkCall, nameOrID string, descriptors } return call.ReplyTop(topInfo) } + +// ExecContainer is the varlink endpoint to execute a command in a container +func (i *LibpodAPI) ExecContainer(call iopodman.VarlinkCall, opts iopodman.ExecOpts) error { + if !call.WantsUpgrade() { + return call.ReplyErrorOccurred("client must use upgraded connection to exec") + } + + ctr, err := i.Runtime.LookupContainer(opts.Name) + if err != nil { + return call.ReplyContainerNotFound(opts.Name, err.Error()) + } + + state, err := ctr.State() + if err != nil { + return call.ReplyErrorOccurred( + fmt.Sprintf("exec failed to obtain container %s state: %s", ctr.ID(), err.Error())) + } + + if state != define.ContainerStateRunning { + return call.ReplyErrorOccurred( + fmt.Sprintf("exec requires a running container, %s is %s", ctr.ID(), state.String())) + } + + envs := []string{} + if opts.Env != nil { + envs = *opts.Env + } + + var user string + if opts.User != nil { + user = *opts.User + } + + var workDir string + if opts.Workdir != nil { + workDir = *opts.Workdir + } + + resizeChan := make(chan remotecommand.TerminalSize) + errChan := make(chan error) + + reader, writer, _, pipeWriter, streams := setupStreams(call) + + go func() { + fmt.Printf("ExecContainer Start Reader\n") + if err := virtwriter.Reader(reader, nil, nil, pipeWriter, resizeChan); err != nil { + fmt.Printf("ExecContainer Reader err %s, %s\n", err.Error(), errors.Cause(err).Error()) + errChan <- err + } + }() + + // Debugging... + time.Sleep(5 * time.Second) + + go func() { + fmt.Printf("ExecContainer Start ctr.Exec\n") + // TODO detach keys and resize + // TODO add handling for exit code + // TODO capture exit code and return to main thread + _, err := ctr.Exec(opts.Tty, opts.Privileged, envs, opts.Cmd, user, workDir, streams, 0, nil, "") + if err != nil { + fmt.Printf("ExecContainer Exec err %s, %s\n", err.Error(), errors.Cause(err).Error()) + errChan <- errors.Wrapf(err, "ExecContainer failed for container %s", ctr.ID()) + } + }() + + execErr := <-errChan + + if execErr != nil && errors.Cause(execErr) != io.EOF { + fmt.Printf("ExecContainer err: %s\n", execErr.Error()) + return call.ReplyErrorOccurred(execErr.Error()) + } + + if err = virtwriter.HangUp(writer); err != nil { + fmt.Printf("ExecContainer hangup err: %s\n", err.Error()) + logrus.Errorf("ExecContainer failed to HANG-UP on %s: %s", ctr.ID(), err.Error()) + } + return call.Writer.Flush() +} diff --git a/pkg/varlinkapi/virtwriter/virtwriter.go b/pkg/varlinkapi/virtwriter/virtwriter.go index 5e88914b2..0da2a91fc 100644 --- a/pkg/varlinkapi/virtwriter/virtwriter.go +++ b/pkg/varlinkapi/virtwriter/virtwriter.go @@ -4,8 +4,9 @@ import ( "bufio" "encoding/binary" "encoding/json" - "errors" "io" + + "github.com/pkg/errors" "k8s.io/client-go/tools/remotecommand" ) @@ -95,7 +96,7 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer for { n, err := io.ReadFull(r, headerBytes) if err != nil { - return err + return errors.Wrapf(err, "Virtual Read failed, %d", n) } if n < 8 { return errors.New("short read and no full header read") @@ -151,3 +152,19 @@ func Reader(r *bufio.Reader, output io.Writer, errput io.Writer, input io.Writer } } } + +// HangUp sends message to peer to close connection +func HangUp(writer *bufio.Writer) (err error) { + n := 0 + msg := []byte("HANG-UP") + + writeQuit := NewVirtWriteCloser(writer, Quit) + if n, err = writeQuit.Write(msg); err != nil { + return + } + + if n != len(msg) { + return errors.Errorf("Failed to send complete %s message", string(msg)) + } + return +} -- cgit v1.2.3-54-g00ecf