summaryrefslogtreecommitdiff
path: root/pkg/bindings
diff options
context:
space:
mode:
authorJhon Honce <jhonce@redhat.com>2020-05-08 08:33:44 -0700
committerJhon Honce <jhonce@redhat.com>2020-05-13 11:49:17 -0700
commitb6113e2b9ea8f397e345a09335c26f953994c6f4 (patch)
tree5dfe310f20bc1b0ad5cadb84ef8416253df438ad /pkg/bindings
parentd147b3ee027580dd7afdeb0fa04d990ae1d2ee91 (diff)
downloadpodman-b6113e2b9ea8f397e345a09335c26f953994c6f4.tar.gz
podman-b6113e2b9ea8f397e345a09335c26f953994c6f4.tar.bz2
podman-b6113e2b9ea8f397e345a09335c26f953994c6f4.zip
WIP V2 attach bindings and test
* Add ErrLostSync to report lost of sync when de-mux'ing stream * Add logus.SetLevel(logrus.DebugLevel) when `go test -v` given * Add context to debugging messages Signed-off-by: Jhon Honce <jhonce@redhat.com>
Diffstat (limited to 'pkg/bindings')
-rw-r--r--pkg/bindings/containers/containers.go130
-rw-r--r--pkg/bindings/test/attach_test.go63
-rw-r--r--pkg/bindings/test/common_test.go2
-rw-r--r--pkg/bindings/test/containers_test.go2
-rw-r--r--pkg/bindings/test/test_suite_test.go5
5 files changed, 200 insertions, 2 deletions
diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go
index e74a256c7..de7b792b4 100644
--- a/pkg/bindings/containers/containers.go
+++ b/pkg/bindings/containers/containers.go
@@ -2,6 +2,8 @@ package containers
import (
"context"
+ "encoding/binary"
+ "fmt"
"io"
"net/http"
"net/url"
@@ -15,6 +17,10 @@ import (
"github.com/pkg/errors"
)
+var (
+ ErrLostSync = errors.New("lost synchronization with attach multiplexed result")
+)
+
// List obtains a list of containers in local storage. All parameters to this method are optional.
// The filters are used to determine which containers are listed. The last parameter indicates to only return
// the most recent number of containers. The pod and size booleans indicate that pod information and rootfs
@@ -247,7 +253,7 @@ func Unpause(ctx context.Context, nameOrID string) error {
// Wait blocks until the given container reaches a condition. If not provided, the condition will
// default to stopped. If the condition is stopped, an exit code for the container will be provided. The
// nameOrID can be a container name or a partial/full ID.
-func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { //nolint
+func Wait(ctx context.Context, nameOrID string, condition *define.ContainerStatus) (int32, error) { // nolint
var exitCode int32
conn, err := bindings.GetClient(ctx)
if err != nil {
@@ -333,3 +339,125 @@ func ContainerInit(ctx context.Context, nameOrID string) error {
}
return response.Process(nil)
}
+
+// Attach attaches to a running container
+func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin *bool, stdout io.Writer, stderr io.Writer) error {
+ conn, err := bindings.GetClient(ctx)
+ if err != nil {
+ return err
+ }
+
+ params := url.Values{}
+ if detachKeys != nil {
+ params.Add("detachKeys", *detachKeys)
+ }
+ if logs != nil {
+ params.Add("logs", fmt.Sprintf("%t", *logs))
+ }
+ if stream != nil {
+ params.Add("stream", fmt.Sprintf("%t", *stream))
+ }
+ if stdin != nil && *stdin {
+ params.Add("stdin", "true")
+ }
+ if stdout != nil {
+ params.Add("stdout", "true")
+ }
+ if stderr != nil {
+ params.Add("stderr", "true")
+ }
+
+ response, err := conn.DoRequest(nil, http.MethodPost, "/containers/%s/attach", params, nameOrId)
+ if err != nil {
+ return err
+ }
+ defer response.Body.Close()
+
+ ctype := response.Header.Get("Content-Type")
+ upgrade := response.Header.Get("Connection")
+
+ buffer := make([]byte, 1024)
+ if ctype == "application/vnd.docker.raw-stream" && upgrade == "Upgrade" {
+ for {
+ // Read multiplexed channels and write to appropriate stream
+ fd, l, err := DemuxHeader(response.Body, buffer)
+ if err != nil {
+ switch {
+ case errors.Is(err, io.EOF):
+ return nil
+ case errors.Is(err, io.ErrUnexpectedEOF):
+ continue
+ }
+ return err
+ }
+ frame, err := DemuxFrame(response.Body, buffer, l)
+ if err != nil {
+ return err
+ }
+
+ switch {
+ case fd == 0 && stdin != nil && *stdin:
+ stdout.Write(frame)
+ case fd == 1 && stdout != nil:
+ stdout.Write(frame)
+ case fd == 2 && stderr != nil:
+ stderr.Write(frame)
+ case fd == 3:
+ return fmt.Errorf("error from daemon in stream: %s", frame)
+ default:
+ return fmt.Errorf("unrecognized input header: %d", fd)
+ }
+ }
+ } else {
+ // If not multiplex'ed from server just dump stream to stdout
+ for {
+ _, err := response.Body.Read(buffer)
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return err
+ }
+ break
+ }
+ stdout.Write(buffer)
+ }
+ }
+ return err
+}
+
+// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
+func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
+ n, err := io.ReadFull(r, buffer[0:8])
+ if err != nil {
+ return
+ }
+ if n < 8 {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+
+ fd = int(buffer[0])
+ if fd < 0 || fd > 3 {
+ err = ErrLostSync
+ return
+ }
+
+ sz = int(binary.BigEndian.Uint32(buffer[4:8]))
+ return
+}
+
+// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
+func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
+ if len(buffer) < length {
+ buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
+ }
+ n, err := io.ReadFull(r, buffer[0:length])
+ if err != nil {
+ return nil, nil
+ }
+ if n < length {
+ err = io.ErrUnexpectedEOF
+ return
+ }
+
+ return buffer[0:length], nil
+}
diff --git a/pkg/bindings/test/attach_test.go b/pkg/bindings/test/attach_test.go
new file mode 100644
index 000000000..8e89ff8ff
--- /dev/null
+++ b/pkg/bindings/test/attach_test.go
@@ -0,0 +1,63 @@
+package test_bindings
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/containers/libpod/pkg/bindings"
+ "github.com/containers/libpod/pkg/bindings/containers"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gexec"
+)
+
+var _ = Describe("Podman containers attach", func() {
+ var (
+ bt *bindingTest
+ s *gexec.Session
+ )
+
+ BeforeEach(func() {
+ bt = newBindingTest()
+ bt.RestoreImagesFromCache()
+ s = bt.startAPIService()
+ time.Sleep(1 * time.Second)
+ err := bt.NewConnection()
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ s.Kill()
+ bt.cleanup()
+ })
+
+ It("attach", func() {
+ name := "TopAttachTest"
+ id, err := bt.RunTopContainer(&name, nil, nil)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tickTock := time.NewTimer(2 * time.Second)
+ go func() {
+ <-tickTock.C
+ timeout := uint(5)
+ err := containers.Stop(bt.conn, id, &timeout)
+ if err != nil {
+ GinkgoWriter.Write([]byte(err.Error()))
+ }
+ }()
+
+ stdout := &bytes.Buffer{}
+ stderr := &bytes.Buffer{}
+ go func() {
+ defer GinkgoRecover()
+
+ err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, &bindings.PTrue, stdout, stderr)
+ Expect(err).ShouldNot(HaveOccurred())
+ }()
+
+ time.Sleep(5 * time.Second)
+
+ // First character/First line of top output
+ Expect(stdout.String()).Should(ContainSubstring("Mem: "))
+ })
+})
diff --git a/pkg/bindings/test/common_test.go b/pkg/bindings/test/common_test.go
index f33e42440..a86e6f2e3 100644
--- a/pkg/bindings/test/common_test.go
+++ b/pkg/bindings/test/common_test.go
@@ -191,7 +191,7 @@ func (b *bindingTest) restoreImageFromCache(i testImage) {
func (b *bindingTest) RunTopContainer(containerName *string, insidePod *bool, podName *string) (string, error) {
s := specgen.NewSpecGenerator(alpine.name, false)
s.Terminal = false
- s.Command = []string{"top"}
+ s.Command = []string{"/usr/bin/top"}
if containerName != nil {
s.Name = *containerName
}
diff --git a/pkg/bindings/test/containers_test.go b/pkg/bindings/test/containers_test.go
index 328691df2..d130c146a 100644
--- a/pkg/bindings/test/containers_test.go
+++ b/pkg/bindings/test/containers_test.go
@@ -302,6 +302,8 @@ var _ = Describe("Podman containers ", func() {
errChan = make(chan error)
go func() {
+ defer GinkgoRecover()
+
_, waitErr := containers.Wait(bt.conn, name, &running)
errChan <- waitErr
close(errChan)
diff --git a/pkg/bindings/test/test_suite_test.go b/pkg/bindings/test/test_suite_test.go
index dc2b49b88..d2c2c7838 100644
--- a/pkg/bindings/test/test_suite_test.go
+++ b/pkg/bindings/test/test_suite_test.go
@@ -5,9 +5,14 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/sirupsen/logrus"
)
func TestTest(t *testing.T) {
+ if testing.Verbose() {
+ logrus.SetLevel(logrus.DebugLevel)
+ }
+
RegisterFailHandler(Fail)
RunSpecs(t, "Test Suite")
}