From 45a7e7266e14f2b759806f05eeb526d0524cf648 Mon Sep 17 00:00:00 2001 From: Matthew Heon Date: Wed, 27 May 2020 17:01:12 -0400 Subject: Add bindings for exec and enable attached remote This adds bindings for starting exec sessions, and then uses them to wire up detached exec. Code is heavily based on Attach code for containers, slightly modified to handle exec sessions. Bindings are presently attached-only, detached is pending on a Conmon update landing in CI. I'll probably get to that next. Signed-off-by: Matthew Heon --- pkg/api/handlers/compat/resize.go | 4 +- pkg/api/server/register_exec.go | 2 +- pkg/bindings/connection.go | 60 ++--- pkg/bindings/containers/attach.go | 481 ++++++++++++++++++++++++++++++++++ pkg/bindings/containers/containers.go | 253 ------------------ pkg/domain/infra/tunnel/containers.go | 36 ++- 6 files changed, 541 insertions(+), 295 deletions(-) create mode 100644 pkg/bindings/containers/attach.go (limited to 'pkg') diff --git a/pkg/api/handlers/compat/resize.go b/pkg/api/handlers/compat/resize.go index 231b53175..478a8fab4 100644 --- a/pkg/api/handlers/compat/resize.go +++ b/pkg/api/handlers/compat/resize.go @@ -8,6 +8,7 @@ import ( "github.com/containers/libpod/libpod" "github.com/containers/libpod/libpod/define" "github.com/containers/libpod/pkg/api/handlers/utils" + "github.com/gorilla/mux" "github.com/gorilla/schema" "github.com/pkg/errors" "k8s.io/client-go/tools/remotecommand" @@ -37,9 +38,9 @@ func ResizeTTY(w http.ResponseWriter, r *http.Request) { } var status int - name := utils.GetName(r) switch { case strings.Contains(r.URL.Path, "/containers/"): + name := utils.GetName(r) ctnr, err := runtime.LookupContainer(name) if err != nil { utils.ContainerNotFound(w, name, err) @@ -61,6 +62,7 @@ func ResizeTTY(w http.ResponseWriter, r *http.Request) { // reasons. status = http.StatusOK case strings.Contains(r.URL.Path, "/exec/"): + name := mux.Vars(r)["id"] ctnr, err := runtime.GetExecSessionContainer(name) if err != nil { utils.SessionNotFound(w, name, err) diff --git a/pkg/api/server/register_exec.go b/pkg/api/server/register_exec.go index 1533edba9..17181d286 100644 --- a/pkg/api/server/register_exec.go +++ b/pkg/api/server/register_exec.go @@ -310,7 +310,7 @@ func (s *APIServer) registerExecHandlers(r *mux.Router) error { // $ref: "#/responses/NoSuchExecInstance" // 500: // $ref: "#/responses/InternalError" - r.Handle(VersionedPath("/libpod/exec/{id}/resize"), s.APIHandler(compat.UnsupportedHandler)).Methods(http.MethodPost) + r.Handle(VersionedPath("/libpod/exec/{id}/resize"), s.APIHandler(compat.ResizeTTY)).Methods(http.MethodPost) // swagger:operation GET /libpod/exec/{id}/json libpod libpodInspectExec // --- // tags: diff --git a/pkg/bindings/connection.go b/pkg/bindings/connection.go index e9032f083..c26093a7f 100644 --- a/pkg/bindings/connection.go +++ b/pkg/bindings/connection.go @@ -24,7 +24,7 @@ import ( ) var ( - basePath = &url.URL{ + BasePath = &url.URL{ Scheme: "http", Host: "d", Path: "/v" + APIVersion.String() + "/libpod", @@ -37,15 +37,14 @@ type APIResponse struct { } type Connection struct { - _url *url.URL - client *http.Client - conn *net.Conn + Uri *url.URL + Client *http.Client } type valueKey string const ( - clientKey = valueKey("client") + clientKey = valueKey("Client") ) // GetClient from context build by NewConnection() @@ -59,7 +58,7 @@ func GetClient(ctx context.Context) (*Connection, error) { // JoinURL elements with '/' func JoinURL(elements ...string) string { - return strings.Join(elements, "/") + return "/" + strings.Join(elements, "/") } // NewConnection takes a URI as a string and returns a context with the @@ -88,7 +87,7 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context return nil, errors.Wrapf(err, "Value of PODMAN_HOST is not a valid url: %s", uri) } - // Now we setup the http client to use the connection above + // Now we setup the http Client to use the connection above var connection Connection switch _url.Scheme { case "ssh": @@ -125,16 +124,12 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context func tcpClient(_url *url.URL) (Connection, error) { connection := Connection{ - _url: _url, + Uri: _url, } - connection.client = &http.Client{ + connection.Client = &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { - conn, err := net.Dial("tcp", _url.Host) - if c, ok := ctx.Value(clientKey).(*Connection); ok { - c.conn = &conn - } - return conn, err + return net.Dial("tcp", _url.Host) }, DisableCompression: true, }, @@ -167,11 +162,11 @@ func pingNewConnection(ctx context.Context) error { } switch APIVersion.Compare(versionSrv) { - case 1, 0: - // Server's job when client version is equal or older + case -1, 0: + // Server's job when Client version is equal or older return nil - case -1: - return errors.Errorf("server API version is too old. client %q server %q", APIVersion.String(), versionSrv.String()) + case 1: + return errors.Errorf("server API version is too old. Client %q server %q", APIVersion.String(), versionSrv.String()) } } return errors.Errorf("ping response was %q", response.StatusCode) @@ -217,31 +212,22 @@ func sshClient(_url *url.URL, identity string, secure bool) (Connection, error) return Connection{}, errors.Wrapf(err, "Connection to bastion host (%s) failed.", _url.String()) } - connection := Connection{_url: _url} - connection.client = &http.Client{ + connection := Connection{Uri: _url} + connection.Client = &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { - conn, err := bastion.Dial("unix", _url.Path) - if c, ok := ctx.Value(clientKey).(*Connection); ok { - c.conn = &conn - } - return conn, err + return bastion.Dial("unix", _url.Path) }, }} return connection, nil } func unixClient(_url *url.URL) (Connection, error) { - connection := Connection{_url: _url} - connection.client = &http.Client{ + connection := Connection{Uri: _url} + connection.Client = &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { - d := net.Dialer{} - conn, err := d.DialContext(ctx, "unix", _url.Path) - if c, ok := ctx.Value(clientKey).(*Connection); ok { - c.conn = &conn - } - return conn, err + return (&net.Dialer{}).DialContext(ctx, "unix", _url.Path) }, DisableCompression: true, }, @@ -263,7 +249,7 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string, // Lets eventually use URL for this which might lead to safer // usage safeEndpoint := fmt.Sprintf(endpoint, safePathValues...) - e := basePath.String() + safeEndpoint + e := BasePath.String() + safeEndpoint req, err := http.NewRequest(httpMethod, e, httpBody) if err != nil { return nil, err @@ -277,7 +263,7 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string, req = req.WithContext(context.WithValue(context.Background(), clientKey, c)) // Give the Do three chances in the case of a comm/service hiccup for i := 0; i < 3; i++ { - response, err = c.client.Do(req) // nolint + response, err = c.Client.Do(req) // nolint if err == nil { break } @@ -286,10 +272,6 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string, return &APIResponse{response, req}, err } -func (c *Connection) Write(b []byte) (int, error) { - return (*c.conn).Write(b) -} - // FiltersToString converts our typical filter format of a // map[string][]string to a query/html safe string. func FiltersToString(filters map[string][]string) (string, error) { diff --git a/pkg/bindings/containers/attach.go b/pkg/bindings/containers/attach.go new file mode 100644 index 000000000..b7f35c30d --- /dev/null +++ b/pkg/bindings/containers/attach.go @@ -0,0 +1,481 @@ +package containers + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "reflect" + "strconv" + "time" + + "github.com/containers/libpod/libpod/define" + "github.com/containers/libpod/pkg/bindings" + sig "github.com/containers/libpod/pkg/signal" + "github.com/containers/libpod/utils" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh/terminal" +) + +// Attach attaches to a running container +func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin io.Reader, stdout io.Writer, stderr io.Writer, attachReady chan bool) error { + isSet := struct { + stdin bool + stdout bool + stderr bool + }{ + stdin: !(stdin == nil || reflect.ValueOf(stdin).IsNil()), + stdout: !(stdout == nil || reflect.ValueOf(stdout).IsNil()), + stderr: !(stderr == nil || reflect.ValueOf(stderr).IsNil()), + } + // Ensure golang can determine that interfaces are "really" nil + if !isSet.stdin { + stdin = (io.Reader)(nil) + } + if !isSet.stdout { + stdout = (io.Writer)(nil) + } + if !isSet.stderr { + stderr = (io.Writer)(nil) + } + + conn, err := bindings.GetClient(ctx) + if err != nil { + return err + } + + // Do we need to wire in stdin? + ctnr, err := Inspect(ctx, nameOrId, bindings.PFalse) + if err != nil { + return err + } + + params := url.Values{} + if detachKeys != nil { + params.Add("detachKeys", *detachKeys) + } + if logs != nil { + params.Add("logs", fmt.Sprintf("%t", *logs)) + } + if stream != nil { + params.Add("stream", fmt.Sprintf("%t", *stream)) + } + if isSet.stdin { + params.Add("stdin", "true") + } + if isSet.stdout { + params.Add("stdout", "true") + } + if isSet.stderr { + params.Add("stderr", "true") + } + + // Unless all requirements are met, don't use "stdin" is a terminal + file, ok := stdin.(*os.File) + needTTY := ok && terminal.IsTerminal(int(file.Fd())) && ctnr.Config.Tty + if needTTY { + state, err := setRawTerminal(file) + if err != nil { + return err + } + defer func() { + if err := terminal.Restore(int(file.Fd()), state); err != nil { + logrus.Errorf("unable to restore terminal: %q", err) + } + logrus.SetFormatter(&logrus.TextFormatter{}) + }() + } + + headers := make(map[string]string) + headers["Connection"] = "Upgrade" + headers["Upgrade"] = "tcp" + + var socket net.Conn + socketSet := false + dialContext := conn.Client.Transport.(*http.Transport).DialContext + t := &http.Transport{ + DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + c, err := dialContext(ctx, network, address) + if err != nil { + return nil, err + } + if !socketSet { + socket = c + socketSet = true + } + return c, err + }, + IdleConnTimeout: time.Duration(0), + } + conn.Client.Transport = t + response, err := conn.DoRequest(nil, http.MethodPost, "/containers/%s/attach", params, headers, nameOrId) + if err != nil { + return err + } + if !(response.IsSuccess() || response.IsInformational()) { + return response.Process(nil) + } + + if needTTY { + winChange := make(chan os.Signal, 1) + signal.Notify(winChange, sig.SIGWINCH) + winCtx, winCancel := context.WithCancel(ctx) + defer winCancel() + + go attachHandleResize(ctx, winCtx, winChange, false, nameOrId, file) + } + + // If we are attaching around a start, we need to "signal" + // back that we are in fact attached so that started does + // not execute before we can attach. + if attachReady != nil { + attachReady <- true + } + + if isSet.stdin { + go func() { + logrus.Debugf("Copying STDIN to socket") + _, err := utils.CopyDetachable(socket, stdin, []byte{}) + if err != nil { + logrus.Error("failed to write input to service: " + err.Error()) + } + }() + } + + buffer := make([]byte, 1024) + if ctnr.Config.Tty { + logrus.Debugf("Copying STDOUT of container in terminal mode") + + if !isSet.stdout { + return fmt.Errorf("container %q requires stdout to be set", ctnr.ID) + } + // If not multiplex'ed, read from server and write to stdout + _, err := io.Copy(stdout, socket) + if err != nil { + return err + } + } else { + logrus.Debugf("Copying standard streams of container in non-terminal mode") + for { + // Read multiplexed channels and write to appropriate stream + fd, l, err := DemuxHeader(socket, buffer) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + frame, err := DemuxFrame(socket, buffer, l) + if err != nil { + return err + } + + switch { + case fd == 0 && isSet.stdout: + _, err := stdout.Write(frame[0:l]) + if err != nil { + return err + } + case fd == 1 && isSet.stdout: + _, err := stdout.Write(frame[0:l]) + if err != nil { + return err + } + case fd == 2 && isSet.stderr: + _, err := stderr.Write(frame[0:l]) + if err != nil { + return err + } + case fd == 3: + return fmt.Errorf("error from service from stream: %s", frame) + default: + return fmt.Errorf("unrecognized channel in header: %d, 0-3 supported", fd) + } + } + } + return nil +} + +// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel +func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) { + n, err := io.ReadFull(r, buffer[0:8]) + if err != nil { + return + } + if n < 8 { + err = io.ErrUnexpectedEOF + return + } + + fd = int(buffer[0]) + if fd < 0 || fd > 3 { + err = errors.Wrapf(ErrLostSync, fmt.Sprintf(`channel "%d" found, 0-3 supported`, fd)) + return + } + + sz = int(binary.BigEndian.Uint32(buffer[4:8])) + return +} + +// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel +func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) { + if len(buffer) < length { + buffer = append(buffer, make([]byte, length-len(buffer)+1)...) + } + + n, err := io.ReadFull(r, buffer[0:length]) + if err != nil { + return nil, nil + } + if n < length { + err = io.ErrUnexpectedEOF + return + } + + return buffer[0:length], nil +} + +// ResizeContainerTTY sets container's TTY height and width in characters +func ResizeContainerTTY(ctx context.Context, nameOrId string, height *int, width *int) error { + return resizeTTY(ctx, bindings.JoinURL("containers", nameOrId, "resize"), height, width) +} + +// ResizeExecTTY sets session's TTY height and width in characters +func ResizeExecTTY(ctx context.Context, nameOrId string, height *int, width *int) error { + return resizeTTY(ctx, bindings.JoinURL("exec", nameOrId, "resize"), height, width) +} + +// resizeTTY set size of TTY of container +func resizeTTY(ctx context.Context, endpoint string, height *int, width *int) error { + conn, err := bindings.GetClient(ctx) + if err != nil { + return err + } + + params := url.Values{} + if height != nil { + params.Set("h", strconv.Itoa(*height)) + } + if width != nil { + params.Set("w", strconv.Itoa(*width)) + } + rsp, err := conn.DoRequest(nil, http.MethodPost, endpoint, params, nil) + if err != nil { + return err + } + return rsp.Process(nil) +} + +type rawFormatter struct { + logrus.TextFormatter +} + +func (f *rawFormatter) Format(entry *logrus.Entry) ([]byte, error) { + buffer, err := f.TextFormatter.Format(entry) + if err != nil { + return buffer, err + } + return append(buffer, '\r'), nil +} + +// This is intended to be run as a goroutine, handling resizing for a container +// or exec session. +func attachHandleResize(ctx, winCtx context.Context, winChange chan os.Signal, isExec bool, id string, file *os.File) { + // Prime the pump, we need one reset to ensure everything is ready + winChange <- sig.SIGWINCH + for { + select { + case <-winCtx.Done(): + return + case <-winChange: + h, w, err := terminal.GetSize(int(file.Fd())) + if err != nil { + logrus.Warnf("failed to obtain TTY size: " + err.Error()) + } + + var resizeErr error + if isExec { + resizeErr = ResizeExecTTY(ctx, id, &h, &w) + } else { + resizeErr = ResizeContainerTTY(ctx, id, &h, &w) + } + if resizeErr != nil { + logrus.Warnf("failed to resize TTY: " + resizeErr.Error()) + } + } + } +} + +// Configure the given terminal for raw mode +func setRawTerminal(file *os.File) (*terminal.State, error) { + state, err := terminal.MakeRaw(int(file.Fd())) + if err != nil { + return nil, err + } + + logrus.SetFormatter(&rawFormatter{}) + + return state, err +} + +// ExecStartAndAttach starts and attaches to a given exec session. +func ExecStartAndAttach(ctx context.Context, sessionID string, streams *define.AttachStreams) error { + conn, err := bindings.GetClient(ctx) + if err != nil { + return err + } + + // TODO: Make this configurable (can't use streams' InputStream as it's + // buffered) + terminalFile := os.Stdin + + logrus.Debugf("Starting & Attaching to exec session ID %q", sessionID) + + // We need to inspect the exec session first to determine whether to use + // -t. + resp, err := conn.DoRequest(nil, http.MethodGet, "/exec/%s/json", nil, nil, sessionID) + if err != nil { + return err + } + + respStruct := new(define.InspectExecSession) + if err := resp.Process(respStruct); err != nil { + return err + } + isTerm := true + if respStruct.ProcessConfig != nil { + isTerm = respStruct.ProcessConfig.Tty + } + + // If we are in TTY mode, we need to set raw mode for the terminal. + // TODO: Share all of this with Attach() for containers. + needTTY := terminalFile != nil && terminal.IsTerminal(int(terminalFile.Fd())) && isTerm + if needTTY { + state, err := setRawTerminal(terminalFile) + if err != nil { + return err + } + defer func() { + if err := terminal.Restore(int(terminalFile.Fd()), state); err != nil { + logrus.Errorf("unable to restore terminal: %q", err) + } + logrus.SetFormatter(&logrus.TextFormatter{}) + }() + } + + body := struct { + Detach bool `json:"Detach"` + }{ + Detach: false, + } + bodyJSON, err := json.Marshal(body) + if err != nil { + return err + } + + var socket net.Conn + socketSet := false + dialContext := conn.Client.Transport.(*http.Transport).DialContext + t := &http.Transport{ + DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + c, err := dialContext(ctx, network, address) + if err != nil { + return nil, err + } + if !socketSet { + socket = c + socketSet = true + } + return c, err + }, + IdleConnTimeout: time.Duration(0), + } + conn.Client.Transport = t + response, err := conn.DoRequest(bytes.NewReader(bodyJSON), http.MethodPost, "/exec/%s/start", nil, nil, sessionID) + if err != nil { + return err + } + if !(response.IsSuccess() || response.IsInformational()) { + return response.Process(nil) + } + + if needTTY { + winChange := make(chan os.Signal, 1) + signal.Notify(winChange, sig.SIGWINCH) + winCtx, winCancel := context.WithCancel(ctx) + defer winCancel() + + go attachHandleResize(ctx, winCtx, winChange, true, sessionID, terminalFile) + } + + if streams.AttachInput { + go func() { + logrus.Debugf("Copying STDIN to socket") + _, err := utils.CopyDetachable(socket, streams.InputStream, []byte{}) + if err != nil { + logrus.Error("failed to write input to service: " + err.Error()) + } + }() + } + + buffer := make([]byte, 1024) + if isTerm { + logrus.Debugf("Handling terminal attach to exec") + if !streams.AttachOutput { + return fmt.Errorf("exec session %s has a terminal and must have STDOUT enabled", sessionID) + } + // If not multiplex'ed, read from server and write to stdout + _, err := utils.CopyDetachable(streams.OutputStream, socket, []byte{}) + if err != nil { + return err + } + } else { + logrus.Debugf("Handling non-terminal attach to exec") + for { + // Read multiplexed channels and write to appropriate stream + fd, l, err := DemuxHeader(socket, buffer) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + frame, err := DemuxFrame(socket, buffer, l) + if err != nil { + return err + } + + switch { + case fd == 0 && streams.AttachOutput: + _, err := streams.OutputStream.Write(frame[0:l]) + if err != nil { + return err + } + case fd == 1 && streams.AttachInput: + // Write STDIN to STDOUT (echoing characters + // typed by another attach session) + _, err := streams.OutputStream.Write(frame[0:l]) + if err != nil { + return err + } + case fd == 2 && streams.AttachError: + _, err := streams.ErrorStream.Write(frame[0:l]) + if err != nil { + return err + } + case fd == 3: + return fmt.Errorf("error from service from stream: %s", frame) + default: + return fmt.Errorf("unrecognized channel in header: %d, 0-3 supported", fd) + } + } + } + return nil +} diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go index 516f3d282..929b6bbd5 100644 --- a/pkg/bindings/containers/containers.go +++ b/pkg/bindings/containers/containers.go @@ -2,14 +2,9 @@ package containers import ( "context" - "encoding/binary" - "fmt" "io" "net/http" "net/url" - "os" - "os/signal" - "reflect" "strconv" "strings" @@ -17,10 +12,7 @@ import ( "github.com/containers/libpod/pkg/api/handlers" "github.com/containers/libpod/pkg/bindings" "github.com/containers/libpod/pkg/domain/entities" - sig "github.com/containers/libpod/pkg/signal" "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "golang.org/x/crypto/ssh/terminal" ) var ( @@ -345,248 +337,3 @@ func ContainerInit(ctx context.Context, nameOrID string) error { } return response.Process(nil) } - -// Attach attaches to a running container -func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin io.Reader, stdout io.Writer, stderr io.Writer, attachReady chan bool) error { - isSet := struct { - stdin bool - stdout bool - stderr bool - }{ - stdin: !(stdin == nil || reflect.ValueOf(stdin).IsNil()), - stdout: !(stdout == nil || reflect.ValueOf(stdout).IsNil()), - stderr: !(stderr == nil || reflect.ValueOf(stderr).IsNil()), - } - // Ensure golang can determine that interfaces are "really" nil - if !isSet.stdin { - stdin = (io.Reader)(nil) - } - if !isSet.stdout { - stdout = (io.Writer)(nil) - } - if !isSet.stderr { - stderr = (io.Writer)(nil) - } - - conn, err := bindings.GetClient(ctx) - if err != nil { - return err - } - - // Do we need to wire in stdin? - ctnr, err := Inspect(ctx, nameOrId, bindings.PFalse) - if err != nil { - return err - } - - params := url.Values{} - if detachKeys != nil { - params.Add("detachKeys", *detachKeys) - } - if logs != nil { - params.Add("logs", fmt.Sprintf("%t", *logs)) - } - if stream != nil { - params.Add("stream", fmt.Sprintf("%t", *stream)) - } - if isSet.stdin { - params.Add("stdin", "true") - } - if isSet.stdout { - params.Add("stdout", "true") - } - if isSet.stderr { - params.Add("stderr", "true") - } - - // Unless all requirements are met, don't use "stdin" is a terminal - file, ok := stdin.(*os.File) - needTTY := ok && terminal.IsTerminal(int(file.Fd())) && ctnr.Config.Tty - if needTTY { - state, err := terminal.MakeRaw(int(file.Fd())) - if err != nil { - return err - } - - logrus.SetFormatter(&rawFormatter{}) - - defer func() { - if err := terminal.Restore(int(file.Fd()), state); err != nil { - logrus.Errorf("unable to restore terminal: %q", err) - } - logrus.SetFormatter(&logrus.TextFormatter{}) - }() - - winChange := make(chan os.Signal, 1) - signal.Notify(winChange, sig.SIGWINCH) - winCtx, winCancel := context.WithCancel(ctx) - defer winCancel() - - go func() { - // Prime the pump, we need one reset to ensure everything is ready - winChange <- sig.SIGWINCH - for { - select { - case <-winCtx.Done(): - return - case <-winChange: - h, w, err := terminal.GetSize(int(file.Fd())) - if err != nil { - logrus.Warnf("failed to obtain TTY size: " + err.Error()) - } - - if err := ResizeContainerTTY(ctx, nameOrId, &h, &w); err != nil { - logrus.Warnf("failed to resize TTY: " + err.Error()) - } - } - } - }() - } - - response, err := conn.DoRequest(stdin, http.MethodPost, "/containers/%s/attach", params, nil, nameOrId) - if err != nil { - return err - } - if !(response.IsSuccess() || response.IsInformational()) { - return response.Process(nil) - } - - // If we are attaching around a start, we need to "signal" - // back that we are in fact attached so that started does - // not execute before we can attach. - if attachReady != nil { - attachReady <- true - } - - buffer := make([]byte, 1024) - if ctnr.Config.Tty { - if !isSet.stdout { - return fmt.Errorf("container %q requires stdout to be set", ctnr.ID) - } - // If not multiplex'ed, read from server and write to stdout - _, err := io.Copy(stdout, response.Body) - if err != nil { - return err - } - } else { - for { - // Read multiplexed channels and write to appropriate stream - fd, l, err := DemuxHeader(response.Body, buffer) - if err != nil { - if errors.Is(err, io.EOF) { - return nil - } - return err - } - frame, err := DemuxFrame(response.Body, buffer, l) - if err != nil { - return err - } - - switch { - case fd == 0 && isSet.stdout: - _, err := stdout.Write(frame[0:l]) - if err != nil { - return err - } - case fd == 1 && isSet.stdout: - _, err := stdout.Write(frame[0:l]) - if err != nil { - return err - } - case fd == 2 && isSet.stderr: - _, err := stderr.Write(frame[0:l]) - if err != nil { - return err - } - case fd == 3: - return fmt.Errorf("error from service from stream: %s", frame) - default: - return fmt.Errorf("unrecognized channel in header: %d, 0-3 supported", fd) - } - } - } - return nil -} - -// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel -func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) { - n, err := io.ReadFull(r, buffer[0:8]) - if err != nil { - return - } - if n < 8 { - err = io.ErrUnexpectedEOF - return - } - - fd = int(buffer[0]) - if fd < 0 || fd > 3 { - err = errors.Wrapf(ErrLostSync, fmt.Sprintf(`channel "%d" found, 0-3 supported`, fd)) - return - } - - sz = int(binary.BigEndian.Uint32(buffer[4:8])) - return -} - -// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel -func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) { - if len(buffer) < length { - buffer = append(buffer, make([]byte, length-len(buffer)+1)...) - } - - n, err := io.ReadFull(r, buffer[0:length]) - if err != nil { - return nil, nil - } - if n < length { - err = io.ErrUnexpectedEOF - return - } - - return buffer[0:length], nil -} - -// ResizeContainerTTY sets container's TTY height and width in characters -func ResizeContainerTTY(ctx context.Context, nameOrId string, height *int, width *int) error { - return resizeTTY(ctx, bindings.JoinURL("containers", nameOrId, "resize"), height, width) -} - -// ResizeExecTTY sets session's TTY height and width in characters -func ResizeExecTTY(ctx context.Context, nameOrId string, height *int, width *int) error { - return resizeTTY(ctx, bindings.JoinURL("exec", nameOrId, "resize"), height, width) -} - -// resizeTTY set size of TTY of container -func resizeTTY(ctx context.Context, endpoint string, height *int, width *int) error { - conn, err := bindings.GetClient(ctx) - if err != nil { - return err - } - - params := url.Values{} - if height != nil { - params.Set("h", strconv.Itoa(*height)) - } - if width != nil { - params.Set("w", strconv.Itoa(*width)) - } - rsp, err := conn.DoRequest(nil, http.MethodPost, endpoint, params, nil) - if err != nil { - return err - } - return rsp.Process(nil) -} - -type rawFormatter struct { - logrus.TextFormatter -} - -func (f *rawFormatter) Format(entry *logrus.Entry) ([]byte, error) { - buffer, err := f.TextFormatter.Format(entry) - if err != nil { - return buffer, err - } - return append(buffer, '\r'), nil -} diff --git a/pkg/domain/infra/tunnel/containers.go b/pkg/domain/infra/tunnel/containers.go index beba55c2b..e1c859e7c 100644 --- a/pkg/domain/infra/tunnel/containers.go +++ b/pkg/domain/infra/tunnel/containers.go @@ -2,6 +2,7 @@ package tunnel import ( "context" + "fmt" "io" "os" "strconv" @@ -11,6 +12,7 @@ import ( "github.com/containers/common/pkg/config" "github.com/containers/image/v5/docker/reference" "github.com/containers/libpod/libpod/define" + "github.com/containers/libpod/pkg/api/handlers" "github.com/containers/libpod/pkg/bindings" "github.com/containers/libpod/pkg/bindings/containers" "github.com/containers/libpod/pkg/domain/entities" @@ -375,7 +377,39 @@ func (ic *ContainerEngine) ContainerAttach(ctx context.Context, nameOrId string, } func (ic *ContainerEngine) ContainerExec(ctx context.Context, nameOrId string, options entities.ExecOptions, streams define.AttachStreams) (int, error) { - return 125, errors.New("not implemented") + env := []string{} + for k, v := range options.Envs { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + createConfig := new(handlers.ExecCreateConfig) + createConfig.User = options.User + createConfig.Privileged = options.Privileged + createConfig.Tty = options.Tty + createConfig.AttachStdin = options.Interactive + createConfig.AttachStdout = true + createConfig.AttachStderr = true + createConfig.Detach = false + createConfig.DetachKeys = options.DetachKeys + createConfig.Env = env + createConfig.WorkingDir = options.WorkDir + createConfig.Cmd = options.Cmd + + sessionID, err := containers.ExecCreate(ic.ClientCxt, nameOrId, createConfig) + if err != nil { + return 125, err + } + + if err := containers.ExecStartAndAttach(ic.ClientCxt, sessionID, &streams); err != nil { + return 125, err + } + + inspectOut, err := containers.ExecInspect(ic.ClientCxt, sessionID) + if err != nil { + return 125, err + } + + return inspectOut.ExitCode, nil } func (ic *ContainerEngine) ContainerExecDetached(ctx context.Context, nameOrID string, options entities.ExecOptions) (string, error) { -- cgit v1.2.3-54-g00ecf