summaryrefslogtreecommitdiff
path: root/libpod/container_log_linux.go
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2021-05-26 22:38:37 +0200
committerGitHub <noreply@github.com>2021-05-26 22:38:37 +0200
commit5b4ffc7ba79d0c3ad59cce17500c5a98ea686577 (patch)
tree4e01832920140ff2a5f069528a5b4b5241e93282 /libpod/container_log_linux.go
parentac94be37e996fdebf44e5ace83be5219b9488ec4 (diff)
parent10569c988f8ad3bfa796e52c61d7f4ef5266c193 (diff)
downloadpodman-5b4ffc7ba79d0c3ad59cce17500c5a98ea686577.tar.gz
podman-5b4ffc7ba79d0c3ad59cce17500c5a98ea686577.tar.bz2
podman-5b4ffc7ba79d0c3ad59cce17500c5a98ea686577.zip
Merge pull request #10431 from vrothberg/journald-logs
journald logger: fix race condition
Diffstat (limited to 'libpod/container_log_linux.go')
-rw-r--r--libpod/container_log_linux.go276
1 files changed, 162 insertions, 114 deletions
diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go
index ec4fa9724..892ee34e3 100644
--- a/libpod/container_log_linux.go
+++ b/libpod/container_log_linux.go
@@ -6,14 +6,12 @@ package libpod
import (
"context"
"fmt"
- "io"
- "math"
"strings"
"time"
- "github.com/containers/podman/v3/libpod/define"
+ "github.com/containers/podman/v3/libpod/events"
"github.com/containers/podman/v3/libpod/logs"
- journal "github.com/coreos/go-systemd/v22/sdjournal"
+ "github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@@ -24,122 +22,187 @@ const (
// journaldLogErr is the journald priority signifying stderr
journaldLogErr = "3"
-
- // bufLen is the length of the buffer to read from a k8s-file
- // formatted log line
- // let's set it as 2k just to be safe if k8s-file format ever changes
- bufLen = 16384
)
func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
- var config journal.JournalReaderConfig
- if options.Tail < 0 {
- config.NumFromTail = 0
- } else if options.Tail == 0 {
- config.NumFromTail = math.MaxUint64
- } else {
- config.NumFromTail = uint64(options.Tail)
+ journal, err := sdjournal.NewJournal()
+ if err != nil {
+ return err
}
- if options.Multi {
- config.Formatter = journalFormatterWithID
- } else {
- config.Formatter = journalFormatter
- }
- defaultTime := time.Time{}
- if options.Since != defaultTime {
- // coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
- // return nothing instead of falsely printing
- if time.Now().Before(options.Since) {
- return nil
- }
- // coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
- config.Since = -time.Since(options.Since)
+ // While logs are written to the `logChannel`, we inspect each event
+ // and stop once the container has died. Having logs and events in one
+ // stream prevents a race condition that we faced in #10323.
+
+ // Add the filters for events.
+ match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
+ if err := journal.AddMatch(match.String()); err != nil {
+ return errors.Wrapf(err, "adding filter to journald logger: %v", match)
+ }
+ match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()}
+ if err := journal.AddMatch(match.String()); err != nil {
+ return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
- config.Matches = append(config.Matches, journal.Match{
- Field: "CONTAINER_ID_FULL",
- Value: c.ID(),
- })
- options.WaitGroup.Add(1)
- r, err := journal.NewJournalReader(config)
- if err != nil {
+ // Add the filter for logs. Note the disjunction so that we match
+ // either the events or the logs.
+ if err := journal.AddDisjunction(); err != nil {
+ return errors.Wrap(err, "adding filter disjunction to journald logger")
+ }
+ match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()}
+ if err := journal.AddMatch(match.String()); err != nil {
+ return errors.Wrapf(err, "adding filter to journald logger: %v", match)
+ }
+
+ if err := journal.SeekHead(); err != nil {
return err
}
- if r == nil {
- return errors.Errorf("journal reader creation failed")
+ // API requires Next() immediately after SeekHead().
+ if _, err := journal.Next(); err != nil {
+ return errors.Wrap(err, "initial journal cursor")
}
- if options.Tail == math.MaxInt64 {
- r.Rewind()
+
+ // API requires a next|prev before getting a cursor.
+ if _, err := journal.Previous(); err != nil {
+ return errors.Wrap(err, "initial journal cursor")
}
- state, err := c.State()
- if err != nil {
- return err
+
+ // Note that the initial cursor may not yet be ready, so we'll do an
+ // exponential backoff.
+ var cursor string
+ var cursorError error
+ for i := 1; i <= 3; i++ {
+ cursor, cursorError = journal.GetCursor()
+ if err != nil {
+ continue
+ }
+ time.Sleep(time.Duration(i*100) * time.Millisecond)
+ break
+ }
+ if cursorError != nil {
+ return errors.Wrap(cursorError, "inital journal cursor")
+ }
+
+ // We need the container's events in the same journal to guarantee
+ // consistency, see #10323.
+ if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" {
+ return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger)
}
- if options.Follow && state == define.ContainerStateRunning {
- go func() {
- done := make(chan bool)
- until := make(chan time.Time)
- go func() {
- select {
- case <-ctx.Done():
- until <- time.Time{}
- case <-done:
- // nothing to do anymore
+ options.WaitGroup.Add(1)
+ go func() {
+ defer func() {
+ options.WaitGroup.Done()
+ if err := journal.Close(); err != nil {
+ logrus.Errorf("Unable to close journal: %v", err)
+ }
+ }()
+
+ afterTimeStamp := false // needed for options.Since
+ tailQueue := []*logs.LogLine{} // needed for options.Tail
+ doTail := options.Tail > 0
+ for {
+ select {
+ case <-ctx.Done():
+ // Remote client may have closed/lost the connection.
+ return
+ default:
+ // Fallthrough
+ }
+
+ if _, err := journal.Next(); err != nil {
+ logrus.Errorf("Failed to move journal cursor to next entry: %v", err)
+ return
+ }
+ latestCursor, err := journal.GetCursor()
+ if err != nil {
+ logrus.Errorf("Failed to get journal cursor: %v", err)
+ return
+ }
+
+ // Hit the end of the journal.
+ if cursor == latestCursor {
+ if doTail {
+ // Flush *once* we hit the end of the journal.
+ startIndex := int64(len(tailQueue)-1) - options.Tail
+ if startIndex < 0 {
+ startIndex = 0
+ }
+ for i := startIndex; i < int64(len(tailQueue)); i++ {
+ logChannel <- tailQueue[i]
+ }
+ tailQueue = nil
+ doTail = false
+ }
+ // Unless we follow, quit.
+ if !options.Follow {
+ return
}
- }()
- go func() {
- // FIXME (#10323): we are facing a terrible
- // race condition here. At the time the
- // container dies and `c.Wait()` has returned,
- // we may not have received all journald logs.
- // So far there is no other way than waiting
- // for a second. Ultimately, `r.Follow` is
- // racy and we may have to implement our custom
- // logic here.
- c.Wait(ctx)
- time.Sleep(time.Second)
- until <- time.Time{}
- }()
- follower := journaldFollowBuffer{logChannel, options.Multi}
- err := r.Follow(until, follower)
+ // Sleep until something's happening on the journal.
+ journal.Wait(sdjournal.IndefiniteWait)
+ continue
+ }
+ cursor = latestCursor
+
+ entry, err := journal.GetEntry()
if err != nil {
- logrus.Debugf(err.Error())
+ logrus.Errorf("Failed to get journal entry: %v", err)
+ return
}
- r.Close()
- options.WaitGroup.Done()
- done <- true
- return
- }()
- return nil
- }
- go func() {
- bytes := make([]byte, bufLen)
- // /me complains about no do-while in go
- ec, err := r.Read(bytes)
- for ec != 0 && err == nil {
- // because we are reusing bytes, we need to make
- // sure the old data doesn't get into the new line
- bytestr := string(bytes[:ec])
- logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
- if err2 != nil {
- logrus.Error(err2)
+ if !afterTimeStamp {
+ entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
+ if entryTime.Before(options.Since) {
+ continue
+ }
+ afterTimeStamp = true
+ }
+
+ // If we're reading an event and the container exited/died,
+ // then we're done and can return.
+ event, ok := entry.Fields["PODMAN_EVENT"]
+ if ok {
+ status, err := events.StringToStatus(event)
+ if err != nil {
+ logrus.Errorf("Failed to translate event: %v", err)
+ return
+ }
+ if status == events.Exited {
+ return
+ }
+ continue
+ }
+
+ var message string
+ var formatError error
+
+ if options.Multi {
+ message, formatError = journalFormatterWithID(entry)
+ } else {
+ message, formatError = journalFormatter(entry)
+ }
+
+ if formatError != nil {
+ logrus.Errorf("Failed to parse journald log entry: %v", err)
+ return
+ }
+
+ logLine, err := logs.NewJournaldLogLine(message, options.Multi)
+ if err != nil {
+ logrus.Errorf("Failed parse log line: %v", err)
+ return
+ }
+ if doTail {
+ tailQueue = append(tailQueue, logLine)
continue
}
logChannel <- logLine
- ec, err = r.Read(bytes)
}
- if err != nil && err != io.EOF {
- logrus.Error(err)
- }
- r.Close()
- options.WaitGroup.Done()
}()
+
return nil
}
-func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
+func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
@@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
return output, nil
}
-func journalFormatter(entry *journal.JournalEntry) (string, error) {
+func journalFormatter(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
@@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) {
return output, nil
}
-func formatterPrefix(entry *journal.JournalEntry) (string, error) {
+func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) {
usec := entry.RealtimeTimestamp
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
output := fmt.Sprintf("%s ", tsString)
@@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) {
return output, nil
}
-func formatterMessage(entry *journal.JournalEntry) (string, error) {
+func formatterMessage(entry *sdjournal.JournalEntry) (string, error) {
// Finally, append the message
msg, ok := entry.Fields["MESSAGE"]
if !ok {
@@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) {
msg = strings.TrimSuffix(msg, "\n")
return msg, nil
}
-
-type journaldFollowBuffer struct {
- logChannel chan *logs.LogLine
- withID bool
-}
-
-func (f journaldFollowBuffer) Write(p []byte) (int, error) {
- bytestr := string(p)
- logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
- if err != nil {
- return -1, err
- }
- f.logChannel <- logLine
- return len(p), nil
-}