summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libpod/container_api.go2
-rw-r--r--libpod/container_log.go23
-rw-r--r--libpod/container_log_linux.go16
-rw-r--r--libpod/container_log_unsupported.go4
-rw-r--r--pkg/api/handlers/compat/containers_logs.go76
-rw-r--r--pkg/domain/infra/abi/containers.go2
-rw-r--r--pkg/varlinkapi/containers.go2
7 files changed, 73 insertions, 52 deletions
diff --git a/libpod/container_api.go b/libpod/container_api.go
index b37b05ff2..c7df9d66c 100644
--- a/libpod/container_api.go
+++ b/libpod/container_api.go
@@ -353,7 +353,7 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre
logOpts.WaitGroup.Wait()
close(logChan)
}()
- if err := c.ReadLog(logOpts, logChan); err != nil {
+ if err := c.ReadLog(context.Background(), logOpts, logChan); err != nil {
return err
}
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
diff --git a/libpod/container_log.go b/libpod/container_log.go
index 97936c683..80f8e6e50 100644
--- a/libpod/container_log.go
+++ b/libpod/container_log.go
@@ -1,6 +1,7 @@
package libpod
import (
+ "context"
"fmt"
"os"
"time"
@@ -13,9 +14,9 @@ import (
)
// Log is a runtime function that can read one or more container logs.
-func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
+func (r *Runtime) Log(ctx context.Context, containers []*Container, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
for _, ctr := range containers {
- if err := ctr.ReadLog(options, logChannel); err != nil {
+ if err := ctr.ReadLog(ctx, options, logChannel); err != nil {
return err
}
}
@@ -23,25 +24,25 @@ func (r *Runtime) Log(containers []*Container, options *logs.LogOptions, logChan
}
// ReadLog reads a containers log based on the input options and returns loglines over a channel.
-func (c *Container) ReadLog(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
+func (c *Container) ReadLog(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
switch c.LogDriver() {
case define.NoLogging:
return errors.Wrapf(define.ErrNoLogs, "this container is using the 'none' log driver, cannot read logs")
case define.JournaldLogging:
// TODO Skip sending logs until journald logs can be read
- return c.readFromJournal(options, logChannel)
+ return c.readFromJournal(ctx, options, logChannel)
case define.JSONLogging:
// TODO provide a separate implementation of this when Conmon
// has support.
fallthrough
case define.KubernetesLogging, "":
- return c.readFromLogFile(options, logChannel)
+ return c.readFromLogFile(ctx, options, logChannel)
default:
return errors.Wrapf(define.ErrInternal, "unrecognized log driver %q, cannot read logs", c.LogDriver())
}
}
-func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
+func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
t, tailLog, err := logs.GetLogFile(c.LogPath(), options)
if err != nil {
// If the log file does not exist, this is not fatal.
@@ -62,8 +63,17 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l
}
go func() {
+ defer options.WaitGroup.Done()
+
var partial string
for line := range t.Lines {
+ select {
+ case <-ctx.Done():
+ // the consumer has cancelled
+ return
+ default:
+ // fallthrough
+ }
nll, err := logs.NewLogLine(line.Text)
if err != nil {
logrus.Error(err)
@@ -82,7 +92,6 @@ func (c *Container) readFromLogFile(options *logs.LogOptions, logChannel chan *l
logChannel <- nll
}
}
- options.WaitGroup.Done()
}()
// Check if container is still running or paused
if options.Follow {
diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go
index fad3bf87c..00b2039a9 100644
--- a/libpod/container_log_linux.go
+++ b/libpod/container_log_linux.go
@@ -4,6 +4,7 @@
package libpod
import (
+ "context"
"fmt"
"io"
"math"
@@ -29,7 +30,7 @@ const (
bufLen = 16384
)
-func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
+func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = math.MaxUint64
@@ -65,13 +66,24 @@ func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *l
if options.Follow {
go func() {
+ done := make(chan bool)
+ until := make(chan time.Time)
+ go func() {
+ select {
+ case <-ctx.Done():
+ until <- time.Time{}
+ case <-done:
+ // nothing to do anymore
+ }
+ }()
follower := FollowBuffer{logChannel}
- err := r.Follow(nil, follower)
+ err := r.Follow(until, follower)
if err != nil {
logrus.Debugf(err.Error())
}
r.Close()
options.WaitGroup.Done()
+ done <- true
return
}()
return nil
diff --git a/libpod/container_log_unsupported.go b/libpod/container_log_unsupported.go
index 18882720a..f3b36619e 100644
--- a/libpod/container_log_unsupported.go
+++ b/libpod/container_log_unsupported.go
@@ -3,11 +3,13 @@
package libpod
import (
+ "context"
+
"github.com/containers/libpod/v2/libpod/define"
"github.com/containers/libpod/v2/libpod/logs"
"github.com/pkg/errors"
)
-func (c *Container) readFromJournal(options *logs.LogOptions, logChannel chan *logs.LogLine) error {
+func (c *Container) readFromJournal(_ context.Context, _ *logs.LogOptions, _ chan *logs.LogLine) error {
return errors.Wrapf(define.ErrOSNotSupported, "Journald logging only enabled with systemd on linux")
}
diff --git a/pkg/api/handlers/compat/containers_logs.go b/pkg/api/handlers/compat/containers_logs.go
index 8147f4d38..30ee030e8 100644
--- a/pkg/api/handlers/compat/containers_logs.go
+++ b/pkg/api/handlers/compat/containers_logs.go
@@ -92,7 +92,7 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
options.WaitGroup = &wg
logChannel := make(chan *logs.LogLine, tail+1)
- if err := runtime.Log([]*libpod.Container{ctnr}, options, logChannel); err != nil {
+ if err := runtime.Log(r.Context(), []*libpod.Container{ctnr}, options, logChannel); err != nil {
utils.InternalServerError(w, errors.Wrapf(err, "Failed to obtain logs for Container '%s'", name))
return
}
@@ -105,50 +105,48 @@ func LogsFromContainer(w http.ResponseWriter, r *http.Request) {
var frame strings.Builder
header := make([]byte, 8)
- for ok := true; ok; ok = query.Follow {
- for line := range logChannel {
- if _, found := r.URL.Query()["until"]; found {
- if line.Time.After(until) {
- break
- }
+ for line := range logChannel {
+ if _, found := r.URL.Query()["until"]; found {
+ if line.Time.After(until) {
+ break
}
+ }
- // Reset buffer we're ready to loop again
- frame.Reset()
- switch line.Device {
- case "stdout":
- if !query.Stdout {
- continue
- }
- header[0] = 1
- case "stderr":
- if !query.Stderr {
- continue
- }
- header[0] = 2
- default:
- // Logging and moving on is the best we can do here. We may have already sent
- // a Status and Content-Type to client therefore we can no longer report an error.
- log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID())
+ // Reset buffer we're ready to loop again
+ frame.Reset()
+ switch line.Device {
+ case "stdout":
+ if !query.Stdout {
continue
}
-
- if query.Timestamps {
- frame.WriteString(line.Time.Format(time.RFC3339))
- frame.WriteString(" ")
+ header[0] = 1
+ case "stderr":
+ if !query.Stderr {
+ continue
}
- frame.WriteString(line.Msg)
+ header[0] = 2
+ default:
+ // Logging and moving on is the best we can do here. We may have already sent
+ // a Status and Content-Type to client therefore we can no longer report an error.
+ log.Infof("unknown Device type '%s' in log file from Container %s", line.Device, ctnr.ID())
+ continue
+ }
- binary.BigEndian.PutUint32(header[4:], uint32(frame.Len()))
- if _, err := w.Write(header[0:8]); err != nil {
- log.Errorf("unable to write log output header: %q", err)
- }
- if _, err := io.WriteString(w, frame.String()); err != nil {
- log.Errorf("unable to write frame string: %q", err)
- }
- if flusher, ok := w.(http.Flusher); ok {
- flusher.Flush()
- }
+ if query.Timestamps {
+ frame.WriteString(line.Time.Format(time.RFC3339))
+ frame.WriteString(" ")
+ }
+ frame.WriteString(line.Msg)
+
+ binary.BigEndian.PutUint32(header[4:], uint32(frame.Len()))
+ if _, err := w.Write(header[0:8]); err != nil {
+ log.Errorf("unable to write log output header: %q", err)
+ }
+ if _, err := io.WriteString(w, frame.String()); err != nil {
+ log.Errorf("unable to write frame string: %q", err)
+ }
+ if flusher, ok := w.(http.Flusher); ok {
+ flusher.Flush()
}
}
}
diff --git a/pkg/domain/infra/abi/containers.go b/pkg/domain/infra/abi/containers.go
index 596fc2cc1..8909f831d 100644
--- a/pkg/domain/infra/abi/containers.go
+++ b/pkg/domain/infra/abi/containers.go
@@ -924,7 +924,7 @@ func (ic *ContainerEngine) ContainerLogs(ctx context.Context, containers []strin
}
logChannel := make(chan *logs.LogLine, chSize)
- if err := ic.Libpod.Log(ctrs, logOpts, logChannel); err != nil {
+ if err := ic.Libpod.Log(ctx, ctrs, logOpts, logChannel); err != nil {
return err
}
diff --git a/pkg/varlinkapi/containers.go b/pkg/varlinkapi/containers.go
index 8650ba000..07b492331 100644
--- a/pkg/varlinkapi/containers.go
+++ b/pkg/varlinkapi/containers.go
@@ -754,7 +754,7 @@ func (i *VarlinkAPI) GetContainersLogs(call iopodman.VarlinkCall, names []string
if err != nil {
return call.ReplyErrorOccurred(err.Error())
}
- if err := i.Runtime.Log(containers, &options, logChannel); err != nil {
+ if err := i.Runtime.Log(getContext(), containers, &options, logChannel); err != nil {
return err
}
go func() {