aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2020-05-14 11:41:09 -0700
committerGitHub <noreply@github.com>2020-05-14 11:41:09 -0700
commit77dbfc753097d4b3a07c1382cb388e79515ffdb0 (patch)
treeb120842f7d53edf98e82d1cd7b8ad06e4e30f9c2
parent7e9ed37c0997d6dd2d6fc6ed6476039ee954286c (diff)
parentd34e5a142a57a8341fdc928b9f653392a27800e9 (diff)
downloadpodman-77dbfc753097d4b3a07c1382cb388e79515ffdb0.tar.gz
podman-77dbfc753097d4b3a07c1382cb388e79515ffdb0.tar.bz2
podman-77dbfc753097d4b3a07c1382cb388e79515ffdb0.zip
Merge pull request #6220 from jwhonce/wip/attach
V2 Update attach bindings to use Readers/Writers vs chan
-rw-r--r--pkg/bindings/connection.go73
-rw-r--r--pkg/bindings/containers/containers.go58
-rw-r--r--pkg/bindings/test/attach_test.go52
3 files changed, 136 insertions, 47 deletions
diff --git a/pkg/bindings/connection.go b/pkg/bindings/connection.go
index da3755fc8..d83c0482c 100644
--- a/pkg/bindings/connection.go
+++ b/pkg/bindings/connection.go
@@ -39,6 +39,7 @@ type APIResponse struct {
type Connection struct {
_url *url.URL
client *http.Client
+ conn *net.Conn
}
type valueKey string
@@ -88,26 +89,26 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context
}
// Now we setup the http client to use the connection above
- var client *http.Client
+ var connection Connection
switch _url.Scheme {
case "ssh":
secure, err = strconv.ParseBool(_url.Query().Get("secure"))
if err != nil {
secure = false
}
- client, err = sshClient(_url, identity[0], secure)
+ connection, err = sshClient(_url, identity[0], secure)
case "unix":
if !strings.HasPrefix(uri, "unix:///") {
// autofix unix://path_element vs unix:///path_element
_url.Path = JoinURL(_url.Host, _url.Path)
_url.Host = ""
}
- client, err = unixClient(_url)
+ connection, err = unixClient(_url)
case "tcp":
if !strings.HasPrefix(uri, "tcp://") {
return nil, errors.New("tcp URIs should begin with tcp://")
}
- client, err = tcpClient(_url)
+ connection, err = tcpClient(_url)
default:
return nil, errors.Errorf("'%s' is not a supported schema", _url.Scheme)
}
@@ -115,22 +116,30 @@ func NewConnection(ctx context.Context, uri string, identity ...string) (context
return nil, errors.Wrapf(err, "Failed to create %sClient", _url.Scheme)
}
- ctx = context.WithValue(ctx, clientKey, &Connection{_url, client})
+ ctx = context.WithValue(ctx, clientKey, &connection)
if err := pingNewConnection(ctx); err != nil {
return nil, err
}
return ctx, nil
}
-func tcpClient(_url *url.URL) (*http.Client, error) {
- return &http.Client{
+func tcpClient(_url *url.URL) (Connection, error) {
+ connection := Connection{
+ _url: _url,
+ }
+ connection.client = &http.Client{
Transport: &http.Transport{
- DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
- return net.Dial("tcp", _url.Host)
+ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
+ conn, err := net.Dial("tcp", _url.Host)
+ if c, ok := ctx.Value(clientKey).(*Connection); ok {
+ c.conn = &conn
+ }
+ return conn, err
},
DisableCompression: true,
},
- }, nil
+ }
+ return connection, nil
}
// pingNewConnection pings to make sure the RESTFUL service is up
@@ -151,10 +160,10 @@ func pingNewConnection(ctx context.Context) error {
return errors.Errorf("ping response was %q", response.StatusCode)
}
-func sshClient(_url *url.URL, identity string, secure bool) (*http.Client, error) {
+func sshClient(_url *url.URL, identity string, secure bool) (Connection, error) {
auth, err := publicKey(identity)
if err != nil {
- return nil, errors.Wrapf(err, "Failed to parse identity %s: %v\n", _url.String(), identity)
+ return Connection{}, errors.Wrapf(err, "Failed to parse identity %s: %v\n", _url.String(), identity)
}
callback := ssh.InsecureIgnoreHostKey()
@@ -188,26 +197,39 @@ func sshClient(_url *url.URL, identity string, secure bool) (*http.Client, error
},
)
if err != nil {
- return nil, errors.Wrapf(err, "Connection to bastion host (%s) failed.", _url.String())
+ return Connection{}, errors.Wrapf(err, "Connection to bastion host (%s) failed.", _url.String())
}
- return &http.Client{
+
+ connection := Connection{_url: _url}
+ connection.client = &http.Client{
Transport: &http.Transport{
- DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
- return bastion.Dial("unix", _url.Path)
+ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
+ conn, err := bastion.Dial("unix", _url.Path)
+ if c, ok := ctx.Value(clientKey).(*Connection); ok {
+ c.conn = &conn
+ }
+ return conn, err
},
- }}, nil
+ }}
+ return connection, nil
}
-func unixClient(_url *url.URL) (*http.Client, error) {
- return &http.Client{
+func unixClient(_url *url.URL) (Connection, error) {
+ connection := Connection{_url: _url}
+ connection.client = &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
d := net.Dialer{}
- return d.DialContext(ctx, "unix", _url.Path)
+ conn, err := d.DialContext(ctx, "unix", _url.Path)
+ if c, ok := ctx.Value(clientKey).(*Connection); ok {
+ c.conn = &conn
+ }
+ return conn, err
},
DisableCompression: true,
},
- }, nil
+ }
+ return connection, nil
}
// DoRequest assembles the http request and returns the response
@@ -232,6 +254,7 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
if len(queryParams) > 0 {
req.URL.RawQuery = queryParams.Encode()
}
+ req = req.WithContext(context.WithValue(context.Background(), clientKey, c))
// Give the Do three chances in the case of a comm/service hiccup
for i := 0; i < 3; i++ {
response, err = c.client.Do(req) // nolint
@@ -243,6 +266,10 @@ func (c *Connection) DoRequest(httpBody io.Reader, httpMethod, endpoint string,
return &APIResponse{response, req}, err
}
+func (c *Connection) Write(b []byte) (int, error) {
+ return (*c.conn).Write(b)
+}
+
// FiltersToString converts our typical filter format of a
// map[string][]string to a query/html safe string.
func FiltersToString(filters map[string][]string) (string, error) {
@@ -295,8 +322,8 @@ func publicKey(path string) (ssh.AuthMethod, error) {
func hostKey(host string) ssh.PublicKey {
// parse OpenSSH known_hosts file
// ssh or use ssh-keyscan to get initial key
- known_hosts := filepath.Join(homedir.HomeDir(), ".ssh", "known_hosts")
- fd, err := os.Open(known_hosts)
+ knownHosts := filepath.Join(homedir.HomeDir(), ".ssh", "known_hosts")
+ fd, err := os.Open(knownHosts)
if err != nil {
logrus.Error(err)
return nil
diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go
index de7b792b4..c736709c6 100644
--- a/pkg/bindings/containers/containers.go
+++ b/pkg/bindings/containers/containers.go
@@ -15,6 +15,7 @@ import (
"github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/domain/entities"
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
)
var (
@@ -341,12 +342,18 @@ func ContainerInit(ctx context.Context, nameOrID string) error {
}
// Attach attaches to a running container
-func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin *bool, stdout io.Writer, stderr io.Writer) error {
+func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stream *bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) error {
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
}
+ // Do we need to wire in stdin?
+ ctnr, err := Inspect(ctx, nameOrId, &bindings.PFalse)
+ if err != nil {
+ return err
+ }
+
params := url.Values{}
if detachKeys != nil {
params.Add("detachKeys", *detachKeys)
@@ -357,7 +364,7 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre
if stream != nil {
params.Add("stream", fmt.Sprintf("%t", *stream))
}
- if stdin != nil && *stdin {
+ if stdin != nil {
params.Add("stdin", "true")
}
if stdout != nil {
@@ -373,11 +380,23 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre
}
defer response.Body.Close()
- ctype := response.Header.Get("Content-Type")
- upgrade := response.Header.Get("Connection")
+ if stdin != nil {
+ go func() {
+ _, err := io.Copy(conn, stdin)
+ if err != nil {
+ logrus.Error("failed to write input to service: " + err.Error())
+ }
+ }()
+ }
buffer := make([]byte, 1024)
- if ctype == "application/vnd.docker.raw-stream" && upgrade == "Upgrade" {
+ if ctnr.Config.Tty {
+ // If not multiplex'ed, read from server and write to stdout
+ _, err := io.Copy(stdout, response.Body)
+ if err != nil {
+ return err
+ }
+ } else {
for {
// Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(response.Body, buffer)
@@ -396,30 +415,27 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre
}
switch {
- case fd == 0 && stdin != nil && *stdin:
- stdout.Write(frame)
+ case fd == 0 && stdin != nil:
+ _, err := stdout.Write(frame[0:l])
+ if err != nil {
+ return err
+ }
case fd == 1 && stdout != nil:
- stdout.Write(frame)
+ _, err := stdout.Write(frame[0:l])
+ if err != nil {
+ return err
+ }
case fd == 2 && stderr != nil:
- stderr.Write(frame)
+ _, err := stderr.Write(frame[0:l])
+ if err != nil {
+ return err
+ }
case fd == 3:
return fmt.Errorf("error from daemon in stream: %s", frame)
default:
return fmt.Errorf("unrecognized input header: %d", fd)
}
}
- } else {
- // If not multiplex'ed from server just dump stream to stdout
- for {
- _, err := response.Body.Read(buffer)
- if err != nil {
- if !errors.Is(err, io.EOF) {
- return err
- }
- break
- }
- stdout.Write(buffer)
- }
}
return err
}
diff --git a/pkg/bindings/test/attach_test.go b/pkg/bindings/test/attach_test.go
index 8e89ff8ff..809e536ca 100644
--- a/pkg/bindings/test/attach_test.go
+++ b/pkg/bindings/test/attach_test.go
@@ -2,10 +2,13 @@ package test_bindings
import (
"bytes"
+ "fmt"
"time"
+ "github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/pkg/bindings"
"github.com/containers/libpod/pkg/bindings/containers"
+ "github.com/containers/libpod/pkg/specgen"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gexec"
@@ -31,7 +34,7 @@ var _ = Describe("Podman containers attach", func() {
bt.cleanup()
})
- It("attach", func() {
+ It("can run top in container", func() {
name := "TopAttachTest"
id, err := bt.RunTopContainer(&name, nil, nil)
Expect(err).ShouldNot(HaveOccurred())
@@ -51,13 +54,56 @@ var _ = Describe("Podman containers attach", func() {
go func() {
defer GinkgoRecover()
- err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, &bindings.PTrue, stdout, stderr)
+ err := containers.Attach(bt.conn, id, nil, &bindings.PTrue, &bindings.PTrue, nil, stdout, stderr)
Expect(err).ShouldNot(HaveOccurred())
}()
time.Sleep(5 * time.Second)
-
// First character/First line of top output
Expect(stdout.String()).Should(ContainSubstring("Mem: "))
})
+
+ It("can echo data via cat in container", func() {
+ s := specgen.NewSpecGenerator(alpine.name, false)
+ s.Name = "CatAttachTest"
+ s.Terminal = true
+ s.Command = []string{"/bin/cat"}
+ ctnr, err := containers.CreateWithSpec(bt.conn, s)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = containers.Start(bt.conn, ctnr.ID, nil)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ wait := define.ContainerStateRunning
+ _, err = containers.Wait(bt.conn, ctnr.ID, &wait)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tickTock := time.NewTimer(2 * time.Second)
+ go func() {
+ <-tickTock.C
+ timeout := uint(5)
+ err := containers.Stop(bt.conn, ctnr.ID, &timeout)
+ if err != nil {
+ GinkgoWriter.Write([]byte(err.Error()))
+ }
+ }()
+
+ msg := "Hello, World"
+ stdin := &bytes.Buffer{}
+ stdin.WriteString(msg + "\n")
+
+ stdout := &bytes.Buffer{}
+ stderr := &bytes.Buffer{}
+ go func() {
+ defer GinkgoRecover()
+
+ err := containers.Attach(bt.conn, ctnr.ID, nil, &bindings.PFalse, &bindings.PTrue, stdin, stdout, stderr)
+ Expect(err).ShouldNot(HaveOccurred())
+ }()
+
+ time.Sleep(5 * time.Second)
+ // Tty==true so we get echo'ed stdin + expected output
+ Expect(stdout.String()).Should(Equal(fmt.Sprintf("%[1]s\r\n%[1]s\r\n", msg)))
+ Expect(stderr.String()).Should(BeEmpty())
+ })
})