aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libpod/container_api.go98
-rw-r--r--libpod/oci_conmon_linux.go38
-rw-r--r--libpod/util.go31
-rw-r--r--pkg/api/handlers/compat/containers_attach.go35
-rw-r--r--pkg/api/server/register_containers.go8
5 files changed, 157 insertions, 53 deletions
diff --git a/libpod/container_api.go b/libpod/container_api.go
index 55c79fa74..b31079b26 100644
--- a/libpod/container_api.go
+++ b/libpod/container_api.go
@@ -6,10 +6,13 @@ import (
"io/ioutil"
"net"
"os"
+ "strings"
+ "sync"
"time"
"github.com/containers/libpod/libpod/define"
"github.com/containers/libpod/libpod/events"
+ "github.com/containers/libpod/libpod/logs"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -244,15 +247,28 @@ func (c *Container) Attach(streams *define.AttachStreams, keys string, resize <-
// forwarded to the client.
// This function returns when the attach finishes. It does not hold the lock for
// the duration of its runtime, only using it at the beginning to verify state.
-func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error {
+// The streamLogs parameter indicates that all the container's logs until present
+// will be streamed at the beginning of the attach.
+// The streamAttach parameter indicates that the attach itself will be streamed
+// over the socket; if this is not set, but streamLogs is, only the logs will be
+// sent.
+// At least one of streamAttach and streamLogs must be set.
+func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, streamAttach, streamLogs bool) (deferredErr error) {
+ isTerminal := false
+ if c.config.Spec.Process != nil {
+ isTerminal = c.config.Spec.Process.Terminal
+ }
+ // Ensure our contract of writing errors to and closing the HTTP conn is
+ // honored.
+ defer func() {
+ hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
+ }()
+
if !c.batched {
c.lock.Lock()
if err := c.syncContainer(); err != nil {
c.lock.Unlock()
- // Write any errors to the HTTP buffer before we close.
- hijackWriteErrorAndClose(err, c.ID(), httpCon, httpBuf)
-
return err
}
// We are NOT holding the lock for the duration of the function.
@@ -260,16 +276,80 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
}
if !c.ensureState(define.ContainerStateCreated, define.ContainerStateRunning) {
- toReturn := errors.Wrapf(define.ErrCtrStateInvalid, "can only attach to created or running containers")
-
- // Write any errors to the HTTP buffer before we close.
- hijackWriteErrorAndClose(toReturn, c.ID(), httpCon, httpBuf)
+ return errors.Wrapf(define.ErrCtrStateInvalid, "can only attach to created or running containers")
+ }
- return toReturn
+ if !streamAttach && !streamLogs {
+ return errors.Wrapf(define.ErrInvalidArg, "must specify at least one of stream or logs")
}
logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())
+ if streamLogs {
+ // Get all logs for the container
+ logChan := make(chan *logs.LogLine)
+ logOpts := new(logs.LogOptions)
+ logOpts.Tail = -1
+ logOpts.WaitGroup = new(sync.WaitGroup)
+ errChan := make(chan error)
+ go func() {
+ var err error
+ // In non-terminal mode we need to prepend with the
+ // stream header.
+ logrus.Debugf("Writing logs for container %s to HTTP attach", c.ID())
+ for logLine := range logChan {
+ if !isTerminal {
+ device := logLine.Device
+ var header []byte
+ headerLen := uint32(len(logLine.Msg))
+
+ switch strings.ToLower(device) {
+ case "stdin":
+ header = makeHTTPAttachHeader(0, headerLen)
+ case "stdout":
+ header = makeHTTPAttachHeader(1, headerLen)
+ case "stderr":
+ header = makeHTTPAttachHeader(2, headerLen)
+ default:
+ logrus.Errorf("Unknown device for log line: %s", device)
+ header = makeHTTPAttachHeader(1, headerLen)
+ }
+ _, err = httpBuf.Write(header)
+ if err != nil {
+ break
+ }
+ }
+ _, err = httpBuf.Write([]byte(logLine.Msg))
+ if err != nil {
+ break
+ }
+ _, err = httpBuf.Write([]byte("\n"))
+ if err != nil {
+ break
+ }
+ err = httpBuf.Flush()
+ if err != nil {
+ break
+ }
+ }
+ errChan <- err
+ }()
+ go func() {
+ logOpts.WaitGroup.Wait()
+ close(logChan)
+ }()
+ if err := c.ReadLog(logOpts, logChan); err != nil {
+ return err
+ }
+ logrus.Debugf("Done reading logs for container %s", c.ID())
+ if err := <-errChan; err != nil {
+ return err
+ }
+ }
+ if !streamAttach {
+ return nil
+ }
+
c.newContainerEvent(events.Attach)
return c.ociRuntime.HTTPAttach(c, httpCon, httpBuf, streams, detachKeys, cancel)
}
diff --git a/libpod/oci_conmon_linux.go b/libpod/oci_conmon_linux.go
index c20e3f0b4..18b438792 100644
--- a/libpod/oci_conmon_linux.go
+++ b/libpod/oci_conmon_linux.go
@@ -5,7 +5,6 @@ package libpod
import (
"bufio"
"bytes"
- "encoding/binary"
"fmt"
"io"
"io/ioutil"
@@ -480,8 +479,7 @@ func (r *ConmonOCIRuntime) UnpauseContainer(ctr *Container) error {
}
// HTTPAttach performs an attach for the HTTP API.
-// This will consume, and automatically close, the hijacked HTTP session.
-// It is not necessary to close it independently.
+// The caller must handle closing the HTTP connection after this returns.
// The cancel channel is not closed; it is up to the caller to do so after
// this function returns.
// If this is a container with a terminal, we will stream raw. If it is not, we
@@ -492,13 +490,7 @@ func (r *ConmonOCIRuntime) HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf
isTerminal = ctr.config.Spec.Process.Terminal
}
- // Ensure that our contract of closing the HTTP connection is honored.
- defer hijackWriteErrorAndClose(deferredErr, ctr.ID(), httpConn, httpBuf)
-
if streams != nil {
- if isTerminal {
- return errors.Wrapf(define.ErrInvalidArg, "cannot specify which streams to attach as container %s has a terminal", ctr.ID())
- }
if !streams.Stdin && !streams.Stdout && !streams.Stderr {
return errors.Wrapf(define.ErrInvalidArg, "must specify at least one stream to attach to")
}
@@ -547,8 +539,16 @@ func (r *ConmonOCIRuntime) HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf
go func() {
var err error
if isTerminal {
+ // Hack: return immediately if attachStdout not set to
+ // emulate Docker.
+ // Basically, when terminal is set, STDERR goes nowhere.
+ // Everything does over STDOUT.
+ // Therefore, if not attaching STDOUT - we'll never copy
+ // anything from here.
logrus.Debugf("Performing terminal HTTP attach for container %s", ctr.ID())
- err = httpAttachTerminalCopy(conn, httpBuf, ctr.ID())
+ if attachStdout {
+ err = httpAttachTerminalCopy(conn, httpBuf, ctr.ID())
+ }
} else {
logrus.Debugf("Performing non-terminal HTTP attach for container %s", ctr.ID())
err = httpAttachNonTerminalCopy(conn, httpBuf, ctr.ID(), attachStdin, attachStdout, attachStderr)
@@ -1725,13 +1725,16 @@ func httpAttachNonTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter,
for {
numR, err := container.Read(buf)
if numR > 0 {
- headerBuf := []byte{0, 0, 0, 0}
+ var headerBuf []byte
+ // Subtract 1 because we strip the first byte (used for
+ // multiplexing by Conmon).
+ headerLen := uint32(numR - 1)
// Practically speaking, we could make this buf[0] - 1,
// but we need to validate it anyways...
switch buf[0] {
case AttachPipeStdin:
- headerBuf[0] = 0
+ headerBuf = makeHTTPAttachHeader(0, headerLen)
if !stdin {
continue
}
@@ -1739,24 +1742,17 @@ func httpAttachNonTerminalCopy(container *net.UnixConn, http *bufio.ReadWriter,
if !stdout {
continue
}
- headerBuf[0] = 1
+ headerBuf = makeHTTPAttachHeader(1, headerLen)
case AttachPipeStderr:
if !stderr {
continue
}
- headerBuf[0] = 2
+ headerBuf = makeHTTPAttachHeader(2, headerLen)
default:
logrus.Errorf("Received unexpected attach type %+d, discarding %d bytes", buf[0], numR)
continue
}
- // Get big-endian length and append.
- // Subtract 1 because we strip the first byte (used for
- // multiplexing by Conmon).
- lenBuf := []byte{0, 0, 0, 0}
- binary.BigEndian.PutUint32(lenBuf, uint32(numR-1))
- headerBuf = append(headerBuf, lenBuf...)
-
numH, err2 := http.Write(headerBuf)
if err2 != nil {
if err != nil {
diff --git a/libpod/util.go b/libpod/util.go
index e9d234bbe..6457dac1c 100644
--- a/libpod/util.go
+++ b/libpod/util.go
@@ -2,6 +2,7 @@ package libpod
import (
"bufio"
+ "encoding/binary"
"fmt"
"io"
"os"
@@ -239,11 +240,22 @@ func checkDependencyContainer(depCtr, ctr *Container) error {
// hijackWriteErrorAndClose writes an error to a hijacked HTTP session and
// closes it. Intended to HTTPAttach function.
// If error is nil, it will not be written; we'll only close the connection.
-func hijackWriteErrorAndClose(toWrite error, cid string, httpCon io.Closer, httpBuf *bufio.ReadWriter) {
+func hijackWriteErrorAndClose(toWrite error, cid string, terminal bool, httpCon io.Closer, httpBuf *bufio.ReadWriter) {
if toWrite != nil {
- if _, err := httpBuf.Write([]byte(toWrite.Error())); err != nil {
- logrus.Errorf("Error writing error %q to container %s HTTP attach connection: %v", toWrite, cid, err)
- } else if err := httpBuf.Flush(); err != nil {
+ errString := []byte(fmt.Sprintf("%v\n", toWrite))
+ if !terminal {
+ // We need a header.
+ header := makeHTTPAttachHeader(2, uint32(len(errString)))
+ if _, err := httpBuf.Write(header); err != nil {
+ logrus.Errorf("Error writing header for container %s attach connection error: %v", cid, err)
+ }
+ // TODO: May want to return immediately here to avoid
+ // writing garbage to the socket?
+ }
+ if _, err := httpBuf.Write(errString); err != nil {
+ logrus.Errorf("Error writing error to container %s HTTP attach connection: %v", cid, err)
+ }
+ if err := httpBuf.Flush(); err != nil {
logrus.Errorf("Error flushing HTTP buffer for container %s HTTP attach connection: %v", cid, err)
}
}
@@ -252,3 +264,14 @@ func hijackWriteErrorAndClose(toWrite error, cid string, httpCon io.Closer, http
logrus.Errorf("Error closing container %s HTTP attach connection: %v", cid, err)
}
}
+
+// makeHTTPAttachHeader makes an 8-byte HTTP header for a buffer of the given
+// length and stream. Accepts an integer indicating which stream we are sending
+// to (STDIN = 0, STDOUT = 1, STDERR = 2).
+func makeHTTPAttachHeader(stream byte, length uint32) []byte {
+ headerBuf := []byte{stream, 0, 0, 0}
+ lenBuf := []byte{0, 0, 0, 0}
+ binary.BigEndian.PutUint32(lenBuf, length)
+ headerBuf = append(headerBuf, lenBuf...)
+ return headerBuf
+}
diff --git a/pkg/api/handlers/compat/containers_attach.go b/pkg/api/handlers/compat/containers_attach.go
index da7b5bb0c..80ad52aee 100644
--- a/pkg/api/handlers/compat/containers_attach.go
+++ b/pkg/api/handlers/compat/containers_attach.go
@@ -1,6 +1,7 @@
package compat
import (
+ "fmt"
"net/http"
"github.com/containers/libpod/libpod"
@@ -23,7 +24,9 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
Stdin bool `schema:"stdin"`
Stdout bool `schema:"stdout"`
Stderr bool `schema:"stderr"`
- }{}
+ }{
+ Stream: true,
+ }
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, "Error parsing parameters", http.StatusBadRequest, err)
return
@@ -61,16 +64,9 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
return
}
- // TODO: Investigate supporting these.
- // Logs replays container logs over the attach socket.
- // Stream seems to break things up somehow? Not 100% clear.
- if query.Logs {
- utils.Error(w, "Unsupported parameter", http.StatusBadRequest, errors.Errorf("the logs parameter to attach is not presently supported"))
- return
- }
- // We only support stream=true or unset
- if _, found := r.URL.Query()["stream"]; found && query.Stream {
- utils.Error(w, "Unsupported parameter", http.StatusBadRequest, errors.Errorf("the stream parameter to attach is not presently supported"))
+ // At least one of these must be set
+ if !query.Stream && !query.Logs {
+ utils.Error(w, "Unsupported parameter", http.StatusBadRequest, errors.Errorf("at least one of Logs or Stream must be set"))
return
}
@@ -86,7 +82,13 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
utils.InternalServerError(w, err)
return
}
- if !(state == define.ContainerStateCreated || state == define.ContainerStateRunning) {
+ // For Docker compatibility, we need to re-initialize containers in these states.
+ if state == define.ContainerStateConfigured || state == define.ContainerStateExited {
+ if err := ctr.Init(r.Context()); err != nil {
+ utils.InternalServerError(w, errors.Wrapf(err, "error preparing container %s for attach", ctr.ID()))
+ return
+ }
+ } else if !(state == define.ContainerStateCreated || state == define.ContainerStateRunning) {
utils.InternalServerError(w, errors.Wrapf(define.ErrCtrStateInvalid, "can only attach to created or running containers"))
return
}
@@ -98,20 +100,23 @@ func AttachContainer(w http.ResponseWriter, r *http.Request) {
return
}
- w.WriteHeader(http.StatusSwitchingProtocols)
-
connection, buffer, err := hijacker.Hijack()
if err != nil {
utils.InternalServerError(w, errors.Wrapf(err, "error hijacking connection"))
return
}
+ // This header string sourced from Docker:
+ // https://raw.githubusercontent.com/moby/moby/b95fad8e51bd064be4f4e58a996924f343846c85/api/server/router/container/container_routes.go
+ // Using literally to ensure compatability with existing clients.
+ fmt.Fprintf(connection, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
+
logrus.Debugf("Hijack for attach of container %s successful", ctr.ID())
// Perform HTTP attach.
// HTTPAttach will handle everything about the connection from here on
// (including closing it and writing errors to it).
- if err := ctr.HTTPAttach(connection, buffer, streams, detachKeys, nil); err != nil {
+ if err := ctr.HTTPAttach(connection, buffer, streams, detachKeys, nil, query.Stream, query.Logs); err != nil {
// We can't really do anything about errors anymore. HTTPAttach
// should be writing them to the connection.
logrus.Errorf("Error attaching to container %s: %v", ctr.ID(), err)
diff --git a/pkg/api/server/register_containers.go b/pkg/api/server/register_containers.go
index f126112d0..9ee900a11 100644
--- a/pkg/api/server/register_containers.go
+++ b/pkg/api/server/register_containers.go
@@ -517,13 +517,13 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// name: logs
// required: false
// type: boolean
- // description: Not yet supported
+ // description: Stream all logs from the container across the connection. Happens before streaming attach (if requested). At least one of logs or stream must be set
// - in: query
// name: stream
// required: false
// type: boolean
// default: true
- // description: If passed, must be set to true; stream=false is not yet supported
+ // description: Attach to the container. If unset, and logs is set, only the container's logs will be sent. At least one of stream or logs must be set
// - in: query
// name: stdout
// required: false
@@ -1194,13 +1194,13 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// name: logs
// required: false
// type: boolean
- // description: Not yet supported
+ // description: Stream all logs from the container across the connection. Happens before streaming attach (if requested). At least one of logs or stream must be set
// - in: query
// name: stream
// required: false
// type: boolean
// default: true
- // description: If passed, must be set to true; stream=false is not yet supported
+ // description: Attach to the container. If unset, and logs is set, only the container's logs will be sent. At least one of stream or logs must be set
// - in: query
// name: stdout
// required: false