diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/api/handlers/compat/containers.go | 27 | ||||
-rw-r--r-- | pkg/bindings/containers/containers.go | 7 | ||||
-rw-r--r-- | pkg/bindings/containers/logs.go | 77 | ||||
-rw-r--r-- | pkg/bindings/test/containers_test.go | 4 | ||||
-rw-r--r-- | pkg/domain/infra/tunnel/containers.go | 35 |
5 files changed, 73 insertions, 77 deletions
diff --git a/pkg/api/handlers/compat/containers.go b/pkg/api/handlers/compat/containers.go index 239e41af4..cea4bd0f6 100644 --- a/pkg/api/handlers/compat/containers.go +++ b/pkg/api/handlers/compat/containers.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "io" "net/http" "strconv" "strings" @@ -295,7 +296,9 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { }() w.WriteHeader(http.StatusOK) - var builder strings.Builder + + var frame strings.Builder + header := make([]byte, 8) for ok := true; ok; ok = query.Follow { for line := range logChannel { if _, found := r.URL.Query()["until"]; found { @@ -304,10 +307,8 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { } } - // Reset variables we're ready to loop again - builder.Reset() - header := [8]byte{} - + // Reset buffer we're ready to loop again + frame.Reset() switch line.Device { case "stdout": if !query.Stdout { @@ -327,17 +328,17 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { } if query.Timestamps { - builder.WriteString(line.Time.Format(time.RFC3339)) - builder.WriteRune(' ') + frame.WriteString(line.Time.Format(time.RFC3339)) + frame.WriteString(" ") } - builder.WriteString(line.Msg) - // Build header and output entry - binary.BigEndian.PutUint32(header[4:], uint32(len(header)+builder.Len())) - if _, err := w.Write(header[:]); err != nil { + frame.WriteString(line.Msg) + + binary.BigEndian.PutUint32(header[4:], uint32(frame.Len())) + if _, err := w.Write(header[0:8]); err != nil { log.Errorf("unable to write log output header: %q", err) } - if _, err := fmt.Fprint(w, builder.String()); err != nil { - log.Errorf("unable to write builder string: %q", err) + if _, err := io.WriteString(w, frame.String()); err != nil { + log.Errorf("unable to write frame string: %q", err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go index 1ed4919e0..39a077f36 100644 --- a/pkg/bindings/containers/containers.go +++ b/pkg/bindings/containers/containers.go @@ -23,7 +23,7 @@ import ( ) var ( - ErrLostSync = errors.New("lost synchronization with attach multiplexed result") + ErrLostSync = errors.New("lost synchronization with multiplexed stream") ) // List obtains a list of containers in local storage. All parameters to this method are optional. @@ -485,7 +485,7 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre return err } case fd == 3: - return fmt.Errorf("error from daemon in stream: %s", frame) + return errors.New("error from service in stream: " + string(frame)) default: return fmt.Errorf("unrecognized input header: %d", fd) } @@ -507,7 +507,7 @@ func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) { fd = int(buffer[0]) if fd < 0 || fd > 3 { - err = ErrLostSync + err = errors.Wrapf(ErrLostSync, fmt.Sprintf(`channel "%d" found, 0-3 supported`, fd)) return } @@ -528,7 +528,6 @@ func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error err = io.ErrUnexpectedEOF return } - return buffer[0:length], nil } diff --git a/pkg/bindings/containers/logs.go b/pkg/bindings/containers/logs.go index b7ecb3c7e..20c8b4292 100644 --- a/pkg/bindings/containers/logs.go +++ b/pkg/bindings/containers/logs.go @@ -1,8 +1,9 @@ package containers import ( + "bytes" "context" - "encoding/binary" + "fmt" "io" "net/http" "net/url" @@ -49,68 +50,34 @@ func Logs(ctx context.Context, nameOrID string, opts LogOptions, stdoutChan, std if err != nil { return err } + defer response.Body.Close() - // read 8 bytes - // first byte determines stderr=2|stdout=1 - // bytes 4-7 len(msg) in uint32 + buffer := make([]byte, 1024) for { - stream, msgSize, err := readHeader(response.Body) + fd, l, err := DemuxHeader(response.Body, buffer) if err != nil { - // In case the server side closes up shop because !follow - if err == io.EOF { - break + if errors.Is(err, io.EOF) { + return nil } - return errors.Wrap(err, "unable to read log header") + return err } - msg, err := readMsg(response.Body, msgSize) + frame, err := DemuxFrame(response.Body, buffer, l) if err != nil { - return errors.Wrap(err, "unable to read log message") + return err } - if stream == 1 { - stdoutChan <- msg - } else { - stderrChan <- msg - } - } - return nil -} + frame = bytes.Replace(frame[0:l], []byte{13}, []byte{10}, -1) -func readMsg(r io.Reader, msgSize int) (string, error) { - var msg []byte - size := msgSize - for { - b := make([]byte, size) - _, err := r.Read(b) - if err != nil { - return "", err - } - msg = append(msg, b...) - if len(msg) == msgSize { - break - } - size = msgSize - len(msg) - } - return string(msg), nil -} - -func readHeader(r io.Reader) (byte, int, error) { - var ( - header []byte - size = 8 - ) - for { - b := make([]byte, size) - _, err := r.Read(b) - if err != nil { - return 0, 0, err - } - header = append(header, b...) - if len(header) == 8 { - break + switch fd { + case 0: + stdoutChan <- string(frame) + case 1: + stdoutChan <- string(frame) + case 2: + stderrChan <- string(frame) + case 3: + return errors.New("error from service in stream: " + string(frame)) + default: + return fmt.Errorf("unrecognized input header: %d", fd) } - size = 8 - len(header) } - stream := header[0] - msgSize := int(binary.BigEndian.Uint32(header[4:]) - 8) - return stream, msgSize, nil } diff --git a/pkg/bindings/test/containers_test.go b/pkg/bindings/test/containers_test.go index f725d1cf2..3b94b10eb 100644 --- a/pkg/bindings/test/containers_test.go +++ b/pkg/bindings/test/containers_test.go @@ -378,9 +378,9 @@ var _ = Describe("Podman containers ", func() { containers.Logs(bt.conn, r.ID, opts, stdoutChan, nil) }() o := <-stdoutChan - o = strings.ReplaceAll(o, "\r", "") + o = strings.TrimSpace(o) _, err = time.Parse(time.RFC1123Z, o) - Expect(err).To(BeNil()) + Expect(err).ShouldNot(HaveOccurred()) }) It("podman top", func() { diff --git a/pkg/domain/infra/tunnel/containers.go b/pkg/domain/infra/tunnel/containers.go index 30c4a8359..beba55c2b 100644 --- a/pkg/domain/infra/tunnel/containers.go +++ b/pkg/domain/infra/tunnel/containers.go @@ -4,7 +4,9 @@ import ( "context" "io" "os" + "strconv" "strings" + "time" "github.com/containers/common/pkg/config" "github.com/containers/image/v5/docker/reference" @@ -336,9 +338,36 @@ func (ic *ContainerEngine) ContainerCreate(ctx context.Context, s *specgen.SpecG return &entities.ContainerCreateReport{Id: response.ID}, nil } -func (ic *ContainerEngine) ContainerLogs(ctx context.Context, containers []string, options entities.ContainerLogsOptions) error { - // The endpoint is not ready yet and requires some more work. - return errors.New("not implemented yet") +func (ic *ContainerEngine) ContainerLogs(_ context.Context, nameOrIds []string, options entities.ContainerLogsOptions) error { + since := options.Since.Format(time.RFC3339) + tail := strconv.FormatInt(options.Tail, 10) + stdout := options.Writer != nil + opts := containers.LogOptions{ + Follow: &options.Follow, + Since: &since, + Stderr: &stdout, + Stdout: &stdout, + Tail: &tail, + Timestamps: &options.Timestamps, + Until: nil, + } + + var err error + outCh := make(chan string) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err = containers.Logs(ic.ClientCxt, nameOrIds[0], opts, outCh, outCh) + cancel() + }() + + for { + select { + case <-ctx.Done(): + return err + case line := <-outCh: + _, _ = io.WriteString(options.Writer, line) + } + } } func (ic *ContainerEngine) ContainerAttach(ctx context.Context, nameOrId string, options entities.AttachOptions) error { |