diff options
-rw-r--r-- | libpod/container_api.go | 5 | ||||
-rw-r--r-- | libpod/oci_conmon_linux.go | 2 | ||||
-rw-r--r-- | libpod/util.go | 9 | ||||
-rw-r--r-- | pkg/api/handlers/compat/containers_attach.go | 2 | ||||
-rw-r--r-- | pkg/bindings/containers/containers.go | 130 | ||||
-rw-r--r-- | pkg/bindings/test/attach_test.go | 63 | ||||
-rw-r--r-- | pkg/bindings/test/common_test.go | 2 | ||||
-rw-r--r-- | pkg/bindings/test/containers_test.go | 2 | ||||
-rw-r--r-- | pkg/bindings/test/test_suite_test.go | 5 |
9 files changed, 210 insertions, 10 deletions
diff --git a/libpod/container_api.go b/libpod/container_api.go index b31079b26..d366ffb84 100644 --- a/libpod/container_api.go +++ b/libpod/container_api.go @@ -285,6 +285,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID()) + logSize := 0 if streamLogs { // Get all logs for the container logChan := make(chan *logs.LogLine) @@ -302,7 +303,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre device := logLine.Device var header []byte headerLen := uint32(len(logLine.Msg)) - + logSize += len(logLine.Msg) switch strings.ToLower(device) { case "stdin": header = makeHTTPAttachHeader(0, headerLen) @@ -341,7 +342,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre if err := c.ReadLog(logOpts, logChan); err != nil { return err } - logrus.Debugf("Done reading logs for container %s", c.ID()) + logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize) if err := <-errChan; err != nil { return err } diff --git a/libpod/oci_conmon_linux.go b/libpod/oci_conmon_linux.go index d59ff18ca..d1c1a1fc2 100644 --- a/libpod/oci_conmon_linux.go +++ b/libpod/oci_conmon_linux.go @@ -1704,6 +1704,8 @@ func httpAttachTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter, cid buf := make([]byte, bufferSize) for { numR, err := container.Read(buf) + logrus.Debugf("Read fd(%d) %d/%d bytes for container %s", int(buf[0]), numR, len(buf), cid) + if numR > 0 { switch buf[0] { case AttachPipeStdout: diff --git a/libpod/util.go b/libpod/util.go index bdfd153ed..ba9f1fa05 100644 --- a/libpod/util.go +++ b/libpod/util.go @@ -249,9 +249,8 @@ func hijackWriteErrorAndClose(toWrite error, cid string, terminal bool, httpCon // length and stream. Accepts an integer indicating which stream we are sending // to (STDIN = 0, STDOUT = 1, STDERR = 2). func makeHTTPAttachHeader(stream byte, length uint32) []byte { - headerBuf := []byte{stream, 0, 0, 0} - lenBuf := []byte{0, 0, 0, 0} - binary.BigEndian.PutUint32(lenBuf, length) - headerBuf = append(headerBuf, lenBuf...) - return headerBuf + header := make([]byte, 8) + header[0] = stream + binary.BigEndian.PutUint32(header[4:], length) + return header } diff --git a/pkg/api/handlers/compat/containers_attach.go b/pkg/api/handlers/compat/containers_attach.go index 80ad52aee..52c851b8c 100644 --- a/pkg/api/handlers/compat/containers_attach.go +++ b/pkg/api/handlers/compat/containers_attach.go @@ -108,7 +108,7 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) { // This header string sourced from Docker: // https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go - // Using literally to ensure compatability with existing clients. + // Using literally to ensure compatibility with existing clients. fmt.Fprintf(connection, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n") logrus.Debugf("Hijack for attach of container %s successful", ctr.ID()) diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go index e74a256c7..de7b792b4 100644 --- a/pkg/bindings/containers/containers.go +++ b/pkg/bindings/containers/containers.go @@ -2,6 +2,8 @@ package containers import ( "context" + "encoding/binary" + "fmt" "io" "net/http" "net/url" @@ -15,6 +17,10 @@ import ( "github.com/pkg/errors" ) +var ( + ErrLostSync = errors.New("lost synchronization with attach multiplexed result") +) + // List obtains a list of containers in local storage. All parameters to this method are optional. // The filters are used to determine which containers are listed. The last parameter indicates to only return // the most recent number of containers. The pod and size booleans indicate that pod information and rootfs @@ -247,7 +253,7 @@ func Unpause(ctx context.Context, nameOrID string) error { // Wait blocks until the given container reaches a condition. If not provided, the condition will // default to stopped. If the condition is stopped, an exit code for the container will be provided. The // nameOrID can be a container name or a partial/full ID. -func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { //nolint +func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { // nolint var exitCode int32 conn, err := bindings.GetClient(ctx) if err != nil { @@ -333,3 +339,125 @@ func ContainerInit(ctx context.Context, nameOrID string) error { } return response.Process(nil) } + +// Attach attaches to a running container +func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin *bool, stdout io.Writer, stderr io.Writer) error { + conn, err := bindings.GetClient(ctx) + if err != nil { + return err + } + + params := url.Values{} + if detachKeys != nil { + params.Add("detachKeys", *detachKeys) + } + if logs != nil { + params.Add("logs", fmt.Sprintf("%t", *logs)) + } + if stream != nil { + params.Add("stream", fmt.Sprintf("%t", *stream)) + } + if stdin != nil && *stdin { + params.Add("stdin", "true") + } + if stdout != nil { + params.Add("stdout", "true") + } + if stderr != nil { + params.Add("stderr", "true") + } + + response, err := conn.DoRequest(nil, http.MethodPost, "/containers/%s/attach", params, nameOrId) + if err != nil { + return err + } + defer response.Body.Close() + + ctype := response.Header.Get("Content-Type") + upgrade := response.Header.Get("Connection") + + buffer := make([]byte, 1024) + if ctype == "application/vnd.docker.raw-stream" && upgrade == "Upgrade" { + for { + // Read multiplexed channels and write to appropriate stream + fd, l, err := DemuxHeader(response.Body, buffer) + if err != nil { + switch { + case errors.Is(err, io.EOF): + return nil + case errors.Is(err, io.ErrUnexpectedEOF): + continue + } + return err + } + frame, err := DemuxFrame(response.Body, buffer, l) + if err != nil { + return err + } + + switch { + case fd == 0 && stdin != nil && *stdin: + stdout.Write(frame) + case fd == 1 && stdout != nil: + stdout.Write(frame) + case fd == 2 && stderr != nil: + stderr.Write(frame) + case fd == 3: + return fmt.Errorf("error from daemon in stream: %s", frame) + default: + return fmt.Errorf("unrecognized input header: %d", fd) + } + } + } else { + // If not multiplex'ed from server just dump stream to stdout + for { + _, err := response.Body.Read(buffer) + if err != nil { + if !errors.Is(err, io.EOF) { + return err + } + break + } + stdout.Write(buffer) + } + } + return err +} + +// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel +func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) { + n, err := io.ReadFull(r, buffer[0:8]) + if err != nil { + return + } + if n < 8 { + err = io.ErrUnexpectedEOF + return + } + + fd = int(buffer[0]) + if fd < 0 || fd > 3 { + err = ErrLostSync + return + } + + sz = int(binary.BigEndian.Uint32(buffer[4:8])) + return +} + +// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel +func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) { + if len(buffer) < length { + buffer = append(buffer, make([]byte, length-len(buffer)+1)...) + } + n, err := io.ReadFull(r, buffer[0:length]) + if err != nil { + return nil, nil + } + if n < length { + err = io.ErrUnexpectedEOF + return + } + + return buffer[0:length], nil +} diff --git a/pkg/bindings/test/attach_test.go b/pkg/bindings/test/attach_test.go new file mode 100644 index 000000000..8e89ff8ff --- /dev/null +++ b/pkg/bindings/test/attach_test.go @@ -0,0 +1,63 @@ +package test_bindings + +import ( + "bytes" + "time" + + "github.com/containers/libpod/pkg/bindings" + "github.com/containers/libpod/pkg/bindings/containers" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gexec" +) + +var _ = Describe("Podman containers attach", func() { + var ( + bt *bindingTest + s *gexec.Session + ) + + BeforeEach(func() { + bt = newBindingTest() + bt.RestoreImagesFromCache() + s = bt.startAPIService() + time.Sleep(1 * time.Second) + err := bt.NewConnection() + Expect(err).ShouldNot(HaveOccurred()) + }) + + AfterEach(func() { + s.Kill() + bt.cleanup() + }) + + It("attach", func() { + name := "TopAttachTest" + id, err := bt.RunTopContainer(&name, nil, nil) + Expect(err).ShouldNot(HaveOccurred()) + + tickTock := time.NewTimer(2 * time.Second) + go func() { + <-tickTock.C + timeout := uint(5) + err := containers.Stop(bt.conn, id, &timeout) + if err != nil { + GinkgoWriter.Write([]byte(err.Error())) + } + }() + + stdout := &bytes.Buffer{} + stderr := &bytes.Buffer{} + go func() { + defer GinkgoRecover() + + err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, &bindings.PTrue, stdout, stderr) + Expect(err).ShouldNot(HaveOccurred()) + }() + + time.Sleep(5 * time.Second) + + // First character/First line of top output + Expect(stdout.String()).Should(ContainSubstring("Mem: ")) + }) +}) diff --git a/pkg/bindings/test/common_test.go b/pkg/bindings/test/common_test.go index f33e42440..a86e6f2e3 100644 --- a/pkg/bindings/test/common_test.go +++ b/pkg/bindings/test/common_test.go @@ -191,7 +191,7 @@ func (b *bindingTest) restoreImageFromCache(i testImage) { func (b *bindingTest) RunTopContainer(containerName *string, insidePod *bool, podName *string) (string, error) { s := specgen.NewSpecGenerator(alpine.name, false) s.Terminal = false - s.Command = []string{"top"} + s.Command = []string{"/usr/bin/top"} if containerName != nil { s.Name = *containerName } diff --git a/pkg/bindings/test/containers_test.go b/pkg/bindings/test/containers_test.go index 328691df2..d130c146a 100644 --- a/pkg/bindings/test/containers_test.go +++ b/pkg/bindings/test/containers_test.go @@ -302,6 +302,8 @@ var _ = Describe("Podman containers ", func() { errChan = make(chan error) go func() { + defer GinkgoRecover() + _, waitErr := containers.Wait(bt.conn, name, &running) errChan <- waitErr close(errChan) diff --git a/pkg/bindings/test/test_suite_test.go b/pkg/bindings/test/test_suite_test.go index dc2b49b88..d2c2c7838 100644 --- a/pkg/bindings/test/test_suite_test.go +++ b/pkg/bindings/test/test_suite_test.go @@ -5,9 +5,14 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" ) func TestTest(t *testing.T) { + if testing.Verbose() { + logrus.SetLevel(logrus.DebugLevel) + } + RegisterFailHandler(Fail) RunSpecs(t, "Test Suite") } |