From 09dc77aedfb786505bc526536be18d457fb7c68b Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Thu, 9 Jul 2020 13:32:20 +0200 Subject: log API: add context to allow for cancelling Add a `context.Context` to the log APIs to allow for cancelling streaming (e.g., via `podman logs -f`). This fixes issues for the remote API where some go routines of the server will continue writing and produce nothing but heat and waste CPU cycles. Signed-off-by: Valentin Rothberg --- libpod/container_api.go | 2 +- libpod/container_log.go | 23 ++++++--- libpod/container_log_linux.go | 16 ++++++- libpod/container_log_unsupported.go | 4 +- pkg/api/handlers/compat/containers_logs.go | 76 +++++++++++++++--------------- pkg/domain/infra/abi/containers.go | 2 +- pkg/varlinkapi/containers.go | 2 +- 7 files changed, 73 insertions(+), 52 deletions(-) diff --git a/libpod/container_api.go b/libpod/container_api.go index b37b05ff2..c7df9d66c 100644 --- a/libpod/container_api.go +++ b/libpod/container_api.go @@ -353,7 +353,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre logOpts.WaitGroup.Wait() close(logChan) }() - if err := c.ReadLog(logOpts, logChan); err != nil { + if err := c.ReadLog(context.Background(), logOpts, logChan); err != nil { return err } logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize) diff --git a/libpod/container_log.go b/libpod/container_log.go index 97936c683..80f8e6e50 100644 --- a/libpod/container_log.go +++ b/libpod/container_log.go @@ -1,6 +1,7 @@ package libpod import ( + "context" "fmt" "os" "time" @@ -13,9 +14,9 @@ import ( ) // Log is a runtime function that can read one or more container logs. -func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error { +func (r *Runtime) Log(ctx context.Context, containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error { for _, ctr := range containers { - if err := ctr.ReadLog(options, logChannel); err != nil { + if err := ctr.ReadLog(ctx, options, logChannel); err != nil { return err } } @@ -23,25 +24,25 @@ func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChan } // ReadLog reads a containers log based on the input options and returns loglines over a channel. -func (c *Container) ReadLog(options *logs.LogOptions, logChannel chan *logs.LogLine) error { +func (c *Container) ReadLog(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { switch c.LogDriver() { case define.NoLogging: return errors.Wrapf(define.ErrNoLogs, "this container is using the 'none' log driver, cannot read logs") case define.JournaldLogging: // TODO Skip sending logs until journald logs can be read - return c.readFromJournal(options, logChannel) + return c.readFromJournal(ctx, options, logChannel) case define.JSONLogging: // TODO provide a separate implementation of this when Conmon // has support. fallthrough case define.KubernetesLogging, "": - return c.readFromLogFile(options, logChannel) + return c.readFromLogFile(ctx, options, logChannel) default: return errors.Wrapf(define.ErrInternal, "unrecognized log driver %q, cannot read logs", c.LogDriver()) } } -func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *logs.LogLine) error { +func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { t, tailLog, err := logs.GetLogFile(c.LogPath(), options) if err != nil { // If the log file does not exist, this is not fatal. @@ -62,8 +63,17 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l } go func() { + defer options.WaitGroup.Done() + var partial string for line := range t.Lines { + select { + case <-ctx.Done(): + // the consumer has cancelled + return + default: + // fallthrough + } nll, err := logs.NewLogLine(line.Text) if err != nil { logrus.Error(err) @@ -82,7 +92,6 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l logChannel <- nll } } - options.WaitGroup.Done() }() // Check if container is still running or paused if options.Follow { diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go index fad3bf87c..00b2039a9 100644 --- a/libpod/container_log_linux.go +++ b/libpod/container_log_linux.go @@ -4,6 +4,7 @@ package libpod import ( + "context" "fmt" "io" "math" @@ -29,7 +30,7 @@ const ( bufLen = 16384 ) -func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error { +func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { var config journal.JournalReaderConfig if options.Tail < 0 { config.NumFromTail = math.MaxUint64 @@ -65,13 +66,24 @@ func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *l if options.Follow { go func() { + done := make(chan bool) + until := make(chan time.Time) + go func() { + select { + case <-ctx.Done(): + until <- time.Time{} + case <-done: + // nothing to do anymore + } + }() follower := FollowBuffer{logChannel} - err := r.Follow(nil, follower) + err := r.Follow(until, follower) if err != nil { logrus.Debugf(err.Error()) } r.Close() options.WaitGroup.Done() + done <- true return }() return nil diff --git a/libpod/container_log_unsupported.go b/libpod/container_log_unsupported.go index 18882720a..f3b36619e 100644 --- a/libpod/container_log_unsupported.go +++ b/libpod/container_log_unsupported.go @@ -3,11 +3,13 @@ package libpod import ( + "context" + "github.com/containers/libpod/v2/libpod/define" "github.com/containers/libpod/v2/libpod/logs" "github.com/pkg/errors" ) -func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error { +func (c *Container) readFromJournal(_ context.Context, _ *logs.LogOptions, _ chan *logs.LogLine) error { return errors.Wrapf(define.ErrOSNotSupported, "Journald logging only enabled with systemd on linux") } diff --git a/pkg/api/handlers/compat/containers_logs.go b/pkg/api/handlers/compat/containers_logs.go index 8147f4d38..30ee030e8 100644 --- a/pkg/api/handlers/compat/containers_logs.go +++ b/pkg/api/handlers/compat/containers_logs.go @@ -92,7 +92,7 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { options.WaitGroup = &wg logChannel := make(chan *logs.LogLine, tail+1) - if err := runtime.Log([]*libpod.Container{ctnr}, options, logChannel); err != nil { + if err := runtime.Log(r.Context(), []*libpod.Container{ctnr}, options, logChannel); err != nil { utils.InternalServerError(w, errors.Wrapf(err, "Failed to obtain logs for Container '%s'", name)) return } @@ -105,50 +105,48 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) { var frame strings.Builder header := make([]byte, 8) - for ok := true; ok; ok = query.Follow { - for line := range logChannel { - if _, found := r.URL.Query()["until"]; found { - if line.Time.After(until) { - break - } + for line := range logChannel { + if _, found := r.URL.Query()["until"]; found { + if line.Time.After(until) { + break } + } - // Reset buffer we're ready to loop again - frame.Reset() - switch line.Device { - case "stdout": - if !query.Stdout { - continue - } - header[0] = 1 - case "stderr": - if !query.Stderr { - continue - } - header[0] = 2 - default: - // Logging and moving on is the best we can do here. We may have already sent - // a Status and Content-Type to client therefore we can no longer report an error. - log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID()) + // Reset buffer we're ready to loop again + frame.Reset() + switch line.Device { + case "stdout": + if !query.Stdout { continue } - - if query.Timestamps { - frame.WriteString(line.Time.Format(time.RFC3339)) - frame.WriteString(" ") + header[0] = 1 + case "stderr": + if !query.Stderr { + continue } - frame.WriteString(line.Msg) + header[0] = 2 + default: + // Logging and moving on is the best we can do here. We may have already sent + // a Status and Content-Type to client therefore we can no longer report an error. + log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID()) + continue + } - binary.BigEndian.PutUint32(header[4:], uint32(frame.Len())) - if _, err := w.Write(header[0:8]); err != nil { - log.Errorf("unable to write log output header: %q", err) - } - if _, err := io.WriteString(w, frame.String()); err != nil { - log.Errorf("unable to write frame string: %q", err) - } - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } + if query.Timestamps { + frame.WriteString(line.Time.Format(time.RFC3339)) + frame.WriteString(" ") + } + frame.WriteString(line.Msg) + + binary.BigEndian.PutUint32(header[4:], uint32(frame.Len())) + if _, err := w.Write(header[0:8]); err != nil { + log.Errorf("unable to write log output header: %q", err) + } + if _, err := io.WriteString(w, frame.String()); err != nil { + log.Errorf("unable to write frame string: %q", err) + } + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() } } } diff --git a/pkg/domain/infra/abi/containers.go b/pkg/domain/infra/abi/containers.go index 596fc2cc1..8909f831d 100644 --- a/pkg/domain/infra/abi/containers.go +++ b/pkg/domain/infra/abi/containers.go @@ -924,7 +924,7 @@ func (ic *ContainerEngine) ContainerLogs(ctx context.Context, containers []strin } logChannel := make(chan *logs.LogLine, chSize) - if err := ic.Libpod.Log(ctrs, logOpts, logChannel); err != nil { + if err := ic.Libpod.Log(ctx, ctrs, logOpts, logChannel); err != nil { return err } diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go index 8650ba000..07b492331 100644 --- a/pkg/varlinkapi/containers.go +++ b/pkg/varlinkapi/containers.go @@ -754,7 +754,7 @@ func (i *VarlinkAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string if err != nil { return call.ReplyErrorOccurred(err.Error()) } - if err := i.Runtime.Log(containers, &options, logChannel); err != nil { + if err := i.Runtime.Log(getContext(), containers, &options, logChannel); err != nil { return err } go func() { -- cgit v1.2.3-54-g00ecf