summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libpod/container_api.go5
-rw-r--r--libpod/oci_conmon_linux.go2
-rw-r--r--libpod/util.go9
-rw-r--r--pkg/api/handlers/compat/containers_attach.go2
-rw-r--r--pkg/bindings/containers/containers.go130
-rw-r--r--pkg/bindings/test/attach_test.go63
-rw-r--r--pkg/bindings/test/common_test.go2
-rw-r--r--pkg/bindings/test/containers_test.go2
-rw-r--r--pkg/bindings/test/test_suite_test.go5
9 files changed, 210 insertions, 10 deletions
diff --git a/libpod/container_api.go b/libpod/container_api.go
index b31079b26..d366ffb84 100644
--- a/libpod/container_api.go
+++ b/libpod/container_api.go
@@ -285,6 +285,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())
+ logSize := 0
if streamLogs {
// Get all logs for the container
logChan := make(chan *logs.LogLine)
@@ -302,7 +303,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
device := logLine.Device
var header []byte
headerLen := uint32(len(logLine.Msg))
-
+ logSize += len(logLine.Msg)
switch strings.ToLower(device) {
case "stdin":
header = makeHTTPAttachHeader(0, headerLen)
@@ -341,7 +342,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
if err := c.ReadLog(logOpts, logChan); err != nil {
return err
}
- logrus.Debugf("Done reading logs for container %s", c.ID())
+ logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
if err := <-errChan; err != nil {
return err
}
diff --git a/libpod/oci_conmon_linux.go b/libpod/oci_conmon_linux.go
index d59ff18ca..d1c1a1fc2 100644
--- a/libpod/oci_conmon_linux.go
+++ b/libpod/oci_conmon_linux.go
@@ -1704,6 +1704,8 @@ func httpAttachTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter, cid
buf := make([]byte, bufferSize)
for {
numR, err := container.Read(buf)
+ logrus.Debugf("Read fd(%d) %d/%d bytes for container %s", int(buf[0]), numR, len(buf), cid)
+
if numR > 0 {
switch buf[0] {
case AttachPipeStdout:
diff --git a/libpod/util.go b/libpod/util.go
index bdfd153ed..ba9f1fa05 100644
--- a/libpod/util.go
+++ b/libpod/util.go
@@ -249,9 +249,8 @@ func hijackWriteErrorAndClose(toWrite error, cid string, terminal bool, httpCon
// length and stream. Accepts an integer indicating which stream we are sending
// to (STDIN = 0, STDOUT = 1, STDERR = 2).
func makeHTTPAttachHeader(stream byte, length uint32) []byte {
- headerBuf := []byte{stream, 0, 0, 0}
- lenBuf := []byte{0, 0, 0, 0}
- binary.BigEndian.PutUint32(lenBuf, length)
- headerBuf = append(headerBuf, lenBuf...)
- return headerBuf
+ header := make([]byte, 8)
+ header[0] = stream
+ binary.BigEndian.PutUint32(header[4:], length)
+ return header
}
diff --git a/pkg/api/handlers/compat/containers_attach.go b/pkg/api/handlers/compat/containers_attach.go
index 80ad52aee..52c851b8c 100644
--- a/pkg/api/handlers/compat/containers_attach.go
+++ b/pkg/api/handlers/compat/containers_attach.go
@@ -108,7 +108,7 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
// This header string sourced from Docker:
// https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go
- // Using literally to ensure compatability with existing clients.
+ // Using literally to ensure compatibility with existing clients.
fmt.Fprintf(connection, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
logrus.Debugf("Hijack for attach of container %s successful", ctr.ID())
diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go
index e74a256c7..de7b792b4 100644
--- a/pkg/bindings/containers/containers.go
+++ b/pkg/bindings/containers/containers.go
@@ -2,6 +2,8 @@ package containers
import (
"context"
+ "encoding/binary"
+ "fmt"
"io"
"net/http"
"net/url"
@@ -15,6 +17,10 @@ import (
"github.com/pkg/errors"
)
+var (
+ ErrLostSync = errors.New("lost synchronization with attach multiplexed result")
+)
+
// List obtains a list of containers in local storage. All parameters to this method are optional.
// The filters are used to determine which containers are listed. The last parameter indicates to only return
// the most recent number of containers. The pod and size booleans indicate that pod information and rootfs
@@ -247,7 +253,7 @@ func Unpause(ctx context.Context, nameOrID string) error {
// Wait blocks until the given container reaches a condition. If not provided, the condition will
// default to stopped. If the condition is stopped, an exit code for the container will be provided. The
// nameOrID can be a container name or a partial/full ID.
-func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { //nolint
+func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { // nolint
var exitCode int32
conn, err := bindings.GetClient(ctx)
if err != nil {
@@ -333,3 +339,125 @@ func ContainerInit(ctx context.Context, nameOrID string) error {
}
return response.Process(nil)
}
+
+// Attach attaches to a running container
+func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin *bool, stdout io.Writer, stderr io.Writer) error {
+ conn, err := bindings.GetClient(ctx)
+ if err != nil {
+ return err
+ }
+
+ params := url.Values{}
+ if detachKeys != nil {
+ params.Add("detachKeys", *detachKeys)
+ }
+ if logs != nil {
+ params.Add("logs", fmt.Sprintf("%t", *logs))
+ }
+ if stream != nil {
+ params.Add("stream", fmt.Sprintf("%t", *stream))
+ }
+ if stdin != nil && *stdin {
+ params.Add("stdin", "true")
+ }
+ if stdout != nil {
+ params.Add("stdout", "true")
+ }
+ if stderr != nil {
+ params.Add("stderr", "true")
+ }
+
+ response, err := conn.DoRequest(nil, http.MethodPost, "/containers/%s/attach", params, nameOrId)
+ if err != nil {
+ return err
+ }
+ defer response.Body.Close()
+
+ ctype := response.Header.Get("Content-Type")
+ upgrade := response.Header.Get("Connection")
+
+ buffer := make([]byte, 1024)
+ if ctype == "application/vnd.docker.raw-stream" && upgrade == "Upgrade" {
+ for {
+ // Read multiplexed channels and write to appropriate stream
+ fd, l, err := DemuxHeader(response.Body, buffer)
+ if err != nil {
+ switch {
+ case errors.Is(err, io.EOF):
+ return nil
+ case errors.Is(err, io.ErrUnexpectedEOF):
+ continue
+ }
+ return err
+ }
+ frame, err := DemuxFrame(response.Body, buffer, l)
+ if err != nil {
+ return err
+ }
+
+ switch {
+ case fd == 0 && stdin != nil && *stdin:
+ stdout.Write(frame)
+ case fd == 1 && stdout != nil:
+ stdout.Write(frame)
+ case fd == 2 && stderr != nil:
+ stderr.Write(frame)
+ case fd == 3:
+ return fmt.Errorf("error from daemon in stream: %s", frame)
+ default:
+ return fmt.Errorf("unrecognized input header: %d", fd)
+ }
+ }
+ } else {
+ // If not multiplex'ed from server just dump stream to stdout
+ for {
+ _, err := response.Body.Read(buffer)
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return err
+ }
+ break
+ }
+ stdout.Write(buffer)
+ }
+ }
+ return err
+}
+
+// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
+func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
+ n, err := io.ReadFull(r, buffer[0:8])
+ if err != nil {
+ return
+ }
+ if n < 8 {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+
+ fd = int(buffer[0])
+ if fd < 0 || fd > 3 {
+ err = ErrLostSync
+ return
+ }
+
+ sz = int(binary.BigEndian.Uint32(buffer[4:8]))
+ return
+}
+
+// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
+func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
+ if len(buffer) < length {
+ buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
+ }
+ n, err := io.ReadFull(r, buffer[0:length])
+ if err != nil {
+ return nil, nil
+ }
+ if n < length {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+
+ return buffer[0:length], nil
+}
diff --git a/pkg/bindings/test/attach_test.go b/pkg/bindings/test/attach_test.go
new file mode 100644
index 000000000..8e89ff8ff
--- /dev/null
+++ b/pkg/bindings/test/attach_test.go
@@ -0,0 +1,63 @@
+package test_bindings
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/containers/libpod/pkg/bindings"
+ "github.com/containers/libpod/pkg/bindings/containers"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gexec"
+)
+
+var _ = Describe("Podman containers attach", func() {
+ var (
+ bt *bindingTest
+ s *gexec.Session
+ )
+
+ BeforeEach(func() {
+ bt = newBindingTest()
+ bt.RestoreImagesFromCache()
+ s = bt.startAPIService()
+ time.Sleep(1 * time.Second)
+ err := bt.NewConnection()
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ s.Kill()
+ bt.cleanup()
+ })
+
+ It("attach", func() {
+ name := "TopAttachTest"
+ id, err := bt.RunTopContainer(&name, nil, nil)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tickTock := time.NewTimer(2 * time.Second)
+ go func() {
+ <-tickTock.C
+ timeout := uint(5)
+ err := containers.Stop(bt.conn, id, &timeout)
+ if err != nil {
+ GinkgoWriter.Write([]byte(err.Error()))
+ }
+ }()
+
+ stdout := &bytes.Buffer{}
+ stderr := &bytes.Buffer{}
+ go func() {
+ defer GinkgoRecover()
+
+ err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, &bindings.PTrue, stdout, stderr)
+ Expect(err).ShouldNot(HaveOccurred())
+ }()
+
+ time.Sleep(5 * time.Second)
+
+ // First character/First line of top output
+ Expect(stdout.String()).Should(ContainSubstring("Mem: "))
+ })
+})
diff --git a/pkg/bindings/test/common_test.go b/pkg/bindings/test/common_test.go
index f33e42440..a86e6f2e3 100644
--- a/pkg/bindings/test/common_test.go
+++ b/pkg/bindings/test/common_test.go
@@ -191,7 +191,7 @@ func (b *bindingTest) restoreImageFromCache(i testImage) {
func (b *bindingTest) RunTopContainer(containerName *string, insidePod *bool, podName *string) (string, error) {
s := specgen.NewSpecGenerator(alpine.name, false)
s.Terminal = false
- s.Command = []string{"top"}
+ s.Command = []string{"/usr/bin/top"}
if containerName != nil {
s.Name = *containerName
}
diff --git a/pkg/bindings/test/containers_test.go b/pkg/bindings/test/containers_test.go
index 328691df2..d130c146a 100644
--- a/pkg/bindings/test/containers_test.go
+++ b/pkg/bindings/test/containers_test.go
@@ -302,6 +302,8 @@ var _ = Describe("Podman containers ", func() {
errChan = make(chan error)
go func() {
+ defer GinkgoRecover()
+
_, waitErr := containers.Wait(bt.conn, name, &running)
errChan <- waitErr
close(errChan)
diff --git a/pkg/bindings/test/test_suite_test.go b/pkg/bindings/test/test_suite_test.go
index dc2b49b88..d2c2c7838 100644
--- a/pkg/bindings/test/test_suite_test.go
+++ b/pkg/bindings/test/test_suite_test.go
@@ -5,9 +5,14 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/sirupsen/logrus"
)
func TestTest(t *testing.T) {
+ if testing.Verbose() {
+ logrus.SetLevel(logrus.DebugLevel)
+ }
+
RegisterFailHandler(Fail)
RunSpecs(t, "Test Suite")
}