diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/api/handlers/compat/containers.go | 27 | ||||
-rw-r--r-- | pkg/bindings/containers/containers.go | 7 | ||||
-rw-r--r-- | pkg/bindings/containers/logs.go | 77 | ||||
-rw-r--r-- | pkg/bindings/test/containers_test.go | 4 | ||||
-rw-r--r-- | pkg/domain/infra/abi/system.go | 38 | ||||
-rw-r--r-- | pkg/domain/infra/abi/system_novalink.go | 14 | ||||
-rw-r--r-- | pkg/domain/infra/abi/system_varlink.go | 49 | ||||
-rw-r--r-- | pkg/domain/infra/tunnel/containers.go | 35 | ||||
-rw-r--r-- | pkg/specgen/generate/config_linux_nocgo.go | 3 |
9 files changed, 138 insertions, 116 deletions
diff --git a/pkg/api/handlers/compat/containers.go b/pkg/api/handlers/compat/containers.go index 239e41af4..cea4bd0f6 100644 --- a/pkg/api/handlers/compat/containers.go +++ b/pkg/api/handlers/compat/containers.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "encoding/json" "fmt" + "io" "net/http" "strconv" "strings" @@ -295,7 +296,9 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { }() w.WriteHeader(http.StatusOK) - var builder strings.Builder + + var frame strings.Builder + header := make([]byte, 8) for ok := true; ok; ok = query.Follow { for line := range logChannel { if _, found := r.URL.Query()["until"]; found { @@ -304,10 +307,8 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { } } - // Reset variables we're ready to loop again - builder.Reset() - header := [8]byte{} - + // Reset buffer we're ready to loop again + frame.Reset() switch line.Device { case "stdout": if !query.Stdout { @@ -327,17 +328,17 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { } if query.Timestamps { - builder.WriteString(line.Time.Format(time.RFC3339)) - builder.WriteRune(' ') + frame.WriteString(line.Time.Format(time.RFC3339)) + frame.WriteString(" ") } - builder.WriteString(line.Msg) - // Build header and output entry - binary.BigEndian.PutUint32(header[4:], uint32(len(header)+builder.Len())) - if _, err := w.Write(header[:]); err != nil { + frame.WriteString(line.Msg) + + binary.BigEndian.PutUint32(header[4:], uint32(frame.Len())) + if _, err := w.Write(header[0:8]); err != nil { log.Errorf("unable to write log output header: %q", err) } - if _, err := fmt.Fprint(w, builder.String()); err != nil { - log.Errorf("unable to write builder string: %q", err) + if _, err := io.WriteString(w, frame.String()); err != nil { + log.Errorf("unable to write frame string: %q", err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() diff --git a/pkg/bindings/containers/containers.go b/pkg/bindings/containers/containers.go index 1ed4919e0..39a077f36 100644 --- a/pkg/bindings/containers/containers.go +++ b/pkg/bindings/containers/containers.go @@ -23,7 +23,7 @@ import ( ) var ( - ErrLostSync = errors.New("lost synchronization with attach multiplexed result") + ErrLostSync = errors.New("lost synchronization with multiplexed stream") ) // List obtains a list of containers in local storage. All parameters to this method are optional. @@ -485,7 +485,7 @@ func Attach(ctx context.Context, nameOrId string, detachKeys *string, logs, stre return err } case fd == 3: - return fmt.Errorf("error from daemon in stream: %s", frame) + return errors.New("error from service in stream: " + string(frame)) default: return fmt.Errorf("unrecognized input header: %d", fd) } @@ -507,7 +507,7 @@ func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) { fd = int(buffer[0]) if fd < 0 || fd > 3 { - err = ErrLostSync + err = errors.Wrapf(ErrLostSync, fmt.Sprintf(`channel "%d" found, 0-3 supported`, fd)) return } @@ -528,7 +528,6 @@ func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error err = io.ErrUnexpectedEOF return } - return buffer[0:length], nil } diff --git a/pkg/bindings/containers/logs.go b/pkg/bindings/containers/logs.go index b7ecb3c7e..20c8b4292 100644 --- a/pkg/bindings/containers/logs.go +++ b/pkg/bindings/containers/logs.go @@ -1,8 +1,9 @@ package containers import ( + "bytes" "context" - "encoding/binary" + "fmt" "io" "net/http" "net/url" @@ -49,68 +50,34 @@ func Logs(ctx context.Context, nameOrID string, opts LogOptions, stdoutChan, std if err != nil { return err } + defer response.Body.Close() - // read 8 bytes - // first byte determines stderr=2|stdout=1 - // bytes 4-7 len(msg) in uint32 + buffer := make([]byte, 1024) for { - stream, msgSize, err := readHeader(response.Body) + fd, l, err := DemuxHeader(response.Body, buffer) if err != nil { - // In case the server side closes up shop because !follow - if err == io.EOF { - break + if errors.Is(err, io.EOF) { + return nil } - return errors.Wrap(err, "unable to read log header") + return err } - msg, err := readMsg(response.Body, msgSize) + frame, err := DemuxFrame(response.Body, buffer, l) if err != nil { - return errors.Wrap(err, "unable to read log message") + return err } - if stream == 1 { - stdoutChan <- msg - } else { - stderrChan <- msg - } - } - return nil -} + frame = bytes.Replace(frame[0:l], []byte{13}, []byte{10}, -1) -func readMsg(r io.Reader, msgSize int) (string, error) { - var msg []byte - size := msgSize - for { - b := make([]byte, size) - _, err := r.Read(b) - if err != nil { - return "", err - } - msg = append(msg, b...) - if len(msg) == msgSize { - break - } - size = msgSize - len(msg) - } - return string(msg), nil -} - -func readHeader(r io.Reader) (byte, int, error) { - var ( - header []byte - size = 8 - ) - for { - b := make([]byte, size) - _, err := r.Read(b) - if err != nil { - return 0, 0, err - } - header = append(header, b...) - if len(header) == 8 { - break + switch fd { + case 0: + stdoutChan <- string(frame) + case 1: + stdoutChan <- string(frame) + case 2: + stderrChan <- string(frame) + case 3: + return errors.New("error from service in stream: " + string(frame)) + default: + return fmt.Errorf("unrecognized input header: %d", fd) } - size = 8 - len(header) } - stream := header[0] - msgSize := int(binary.BigEndian.Uint32(header[4:]) - 8) - return stream, msgSize, nil } diff --git a/pkg/bindings/test/containers_test.go b/pkg/bindings/test/containers_test.go index f725d1cf2..3b94b10eb 100644 --- a/pkg/bindings/test/containers_test.go +++ b/pkg/bindings/test/containers_test.go @@ -378,9 +378,9 @@ var _ = Describe("Podman containers ", func() { containers.Logs(bt.conn, r.ID, opts, stdoutChan, nil) }() o := <-stdoutChan - o = strings.ReplaceAll(o, "\r", "") + o = strings.TrimSpace(o) _, err = time.Parse(time.RFC1123Z, o) - Expect(err).To(BeNil()) + Expect(err).ShouldNot(HaveOccurred()) }) It("podman top", func() { diff --git a/pkg/domain/infra/abi/system.go b/pkg/domain/infra/abi/system.go index af2ec5f7b..52dfaba7d 100644 --- a/pkg/domain/infra/abi/system.go +++ b/pkg/domain/infra/abi/system.go @@ -16,56 +16,18 @@ import ( "github.com/containers/libpod/pkg/domain/entities" "github.com/containers/libpod/pkg/rootless" "github.com/containers/libpod/pkg/util" - iopodman "github.com/containers/libpod/pkg/varlink" - iopodmanAPI "github.com/containers/libpod/pkg/varlinkapi" "github.com/containers/libpod/utils" - "github.com/containers/libpod/version" "github.com/docker/distribution/reference" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" - "github.com/varlink/go/varlink" ) func (ic *ContainerEngine) Info(ctx context.Context) (*define.Info, error) { return ic.Libpod.Info() } -func (ic *ContainerEngine) VarlinkService(_ context.Context, opts entities.ServiceOptions) error { - var varlinkInterfaces = []*iopodman.VarlinkInterface{ - iopodmanAPI.New(opts.Command, ic.Libpod), - } - - service, err := varlink.NewService( - "Atomic", - "podman", - version.Version, - "https://github.com/containers/libpod", - ) - if err != nil { - return errors.Wrapf(err, "unable to create new varlink service") - } - - for _, i := range varlinkInterfaces { - if err := service.RegisterInterface(i); err != nil { - return errors.Errorf("unable to register varlink interface %v", i) - } - } - - // Run the varlink server at the given address - if err = service.Listen(opts.URI, opts.Timeout); err != nil { - switch err.(type) { - case varlink.ServiceTimeoutError: - logrus.Infof("varlink service expired (use --timeout to increase session time beyond %s ms, 0 means never timeout)", opts.Timeout.String()) - return nil - default: - return errors.Wrapf(err, "unable to start varlink service") - } - } - return nil -} - func (ic *ContainerEngine) SetupRootless(_ context.Context, cmd *cobra.Command) error { // do it only after podman has already re-execed and running with uid==0. if os.Geteuid() == 0 { diff --git a/pkg/domain/infra/abi/system_novalink.go b/pkg/domain/infra/abi/system_novalink.go new file mode 100644 index 000000000..a71b0170a --- /dev/null +++ b/pkg/domain/infra/abi/system_novalink.go @@ -0,0 +1,14 @@ +// +build !varlink + +package abi + +import ( + "context" + + "github.com/containers/libpod/pkg/domain/entities" + "github.com/pkg/errors" +) + +func (ic *ContainerEngine) VarlinkService(_ context.Context, opts entities.ServiceOptions) error { + return errors.Errorf("varlink is not supported") +} diff --git a/pkg/domain/infra/abi/system_varlink.go b/pkg/domain/infra/abi/system_varlink.go new file mode 100644 index 000000000..c0144babe --- /dev/null +++ b/pkg/domain/infra/abi/system_varlink.go @@ -0,0 +1,49 @@ +// +build varlink + +package abi + +import ( + "context" + + "github.com/containers/libpod/pkg/domain/entities" + iopodman "github.com/containers/libpod/pkg/varlink" + iopodmanAPI "github.com/containers/libpod/pkg/varlinkapi" + "github.com/containers/libpod/version" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/varlink/go/varlink" +) + +func (ic *ContainerEngine) VarlinkService(_ context.Context, opts entities.ServiceOptions) error { + var varlinkInterfaces = []*iopodman.VarlinkInterface{ + iopodmanAPI.New(opts.Command, ic.Libpod), + } + + service, err := varlink.NewService( + "Atomic", + "podman", + version.Version, + "https://github.com/containers/libpod", + ) + if err != nil { + return errors.Wrapf(err, "unable to create new varlink service") + } + + for _, i := range varlinkInterfaces { + if err := service.RegisterInterface(i); err != nil { + return errors.Errorf("unable to register varlink interface %v", i) + } + } + + // Run the varlink server at the given address + if err = service.Listen(opts.URI, opts.Timeout); err != nil { + switch err.(type) { + case varlink.ServiceTimeoutError: + logrus.Infof("varlink service expired (use --timeout to increase session time beyond %s ms, 0 means never timeout)", opts.Timeout.String()) + return nil + default: + return errors.Wrapf(err, "unable to start varlink service") + } + } + return nil +} diff --git a/pkg/domain/infra/tunnel/containers.go b/pkg/domain/infra/tunnel/containers.go index 30c4a8359..beba55c2b 100644 --- a/pkg/domain/infra/tunnel/containers.go +++ b/pkg/domain/infra/tunnel/containers.go @@ -4,7 +4,9 @@ import ( "context" "io" "os" + "strconv" "strings" + "time" "github.com/containers/common/pkg/config" "github.com/containers/image/v5/docker/reference" @@ -336,9 +338,36 @@ func (ic *ContainerEngine) ContainerCreate(ctx context.Context, s *specgen.SpecG return &entities.ContainerCreateReport{Id: response.ID}, nil } -func (ic *ContainerEngine) ContainerLogs(ctx context.Context, containers []string, options entities.ContainerLogsOptions) error { - // The endpoint is not ready yet and requires some more work. - return errors.New("not implemented yet") +func (ic *ContainerEngine) ContainerLogs(_ context.Context, nameOrIds []string, options entities.ContainerLogsOptions) error { + since := options.Since.Format(time.RFC3339) + tail := strconv.FormatInt(options.Tail, 10) + stdout := options.Writer != nil + opts := containers.LogOptions{ + Follow: &options.Follow, + Since: &since, + Stderr: &stdout, + Stdout: &stdout, + Tail: &tail, + Timestamps: &options.Timestamps, + Until: nil, + } + + var err error + outCh := make(chan string) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err = containers.Logs(ic.ClientCxt, nameOrIds[0], opts, outCh, outCh) + cancel() + }() + + for { + select { + case <-ctx.Done(): + return err + case line := <-outCh: + _, _ = io.WriteString(options.Writer, line) + } + } } func (ic *ContainerEngine) ContainerAttach(ctx context.Context, nameOrId string, options entities.AttachOptions) error { diff --git a/pkg/specgen/generate/config_linux_nocgo.go b/pkg/specgen/generate/config_linux_nocgo.go index fc8ed206d..81d1c7011 100644 --- a/pkg/specgen/generate/config_linux_nocgo.go +++ b/pkg/specgen/generate/config_linux_nocgo.go @@ -5,10 +5,11 @@ package generate import ( "errors" + "github.com/containers/libpod/libpod/image" "github.com/containers/libpod/pkg/specgen" spec "github.com/opencontainers/runtime-spec/specs-go" ) -func (s *specgen.SpecGenerator) getSeccompConfig(configSpec *spec.Spec) (*spec.LinuxSeccomp, error) { +func getSeccompConfig(s *specgen.SpecGenerator, configSpec *spec.Spec, img *image.Image) (*spec.LinuxSeccomp, error) { return nil, errors.New("not implemented") } |