diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2021-05-26 22:38:37 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-26 22:38:37 +0200 |
commit | 5b4ffc7ba79d0c3ad59cce17500c5a98ea686577 (patch) | |
tree | 4e01832920140ff2a5f069528a5b4b5241e93282 /libpod | |
parent | ac94be37e996fdebf44e5ace83be5219b9488ec4 (diff) | |
parent | 10569c988f8ad3bfa796e52c61d7f4ef5266c193 (diff) | |
download | podman-5b4ffc7ba79d0c3ad59cce17500c5a98ea686577.tar.gz podman-5b4ffc7ba79d0c3ad59cce17500c5a98ea686577.tar.bz2 podman-5b4ffc7ba79d0c3ad59cce17500c5a98ea686577.zip |
Merge pull request #10431 from vrothberg/journald-logs
journald logger: fix race condition
Diffstat (limited to 'libpod')
-rw-r--r-- | libpod/container_log_linux.go | 276 |
1 files changed, 162 insertions, 114 deletions
diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go index ec4fa9724..892ee34e3 100644 --- a/libpod/container_log_linux.go +++ b/libpod/container_log_linux.go @@ -6,14 +6,12 @@ package libpod import ( "context" "fmt" - "io" - "math" "strings" "time" - "github.com/containers/podman/v3/libpod/define" + "github.com/containers/podman/v3/libpod/events" "github.com/containers/podman/v3/libpod/logs" - journal "github.com/coreos/go-systemd/v22/sdjournal" + "github.com/coreos/go-systemd/v22/sdjournal" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -24,122 +22,187 @@ const ( // journaldLogErr is the journald priority signifying stderr journaldLogErr = "3" - - // bufLen is the length of the buffer to read from a k8s-file - // formatted log line - // let's set it as 2k just to be safe if k8s-file format ever changes - bufLen = 16384 ) func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { - var config journal.JournalReaderConfig - if options.Tail < 0 { - config.NumFromTail = 0 - } else if options.Tail == 0 { - config.NumFromTail = math.MaxUint64 - } else { - config.NumFromTail = uint64(options.Tail) + journal, err := sdjournal.NewJournal() + if err != nil { + return err } - if options.Multi { - config.Formatter = journalFormatterWithID - } else { - config.Formatter = journalFormatter - } - defaultTime := time.Time{} - if options.Since != defaultTime { - // coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future - // return nothing instead of falsely printing - if time.Now().Before(options.Since) { - return nil - } - // coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past - config.Since = -time.Since(options.Since) + // While logs are written to the `logChannel`, we inspect each event + // and stop once the container has died. Having logs and events in one + // stream prevents a race condition that we faced in #10323. + + // Add the filters for events. + match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"} + if err := journal.AddMatch(match.String()); err != nil { + return errors.Wrapf(err, "adding filter to journald logger: %v", match) + } + match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()} + if err := journal.AddMatch(match.String()); err != nil { + return errors.Wrapf(err, "adding filter to journald logger: %v", match) } - config.Matches = append(config.Matches, journal.Match{ - Field: "CONTAINER_ID_FULL", - Value: c.ID(), - }) - options.WaitGroup.Add(1) - r, err := journal.NewJournalReader(config) - if err != nil { + // Add the filter for logs. Note the disjunction so that we match + // either the events or the logs. + if err := journal.AddDisjunction(); err != nil { + return errors.Wrap(err, "adding filter disjunction to journald logger") + } + match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()} + if err := journal.AddMatch(match.String()); err != nil { + return errors.Wrapf(err, "adding filter to journald logger: %v", match) + } + + if err := journal.SeekHead(); err != nil { return err } - if r == nil { - return errors.Errorf("journal reader creation failed") + // API requires Next() immediately after SeekHead(). + if _, err := journal.Next(); err != nil { + return errors.Wrap(err, "initial journal cursor") } - if options.Tail == math.MaxInt64 { - r.Rewind() + + // API requires a next|prev before getting a cursor. + if _, err := journal.Previous(); err != nil { + return errors.Wrap(err, "initial journal cursor") } - state, err := c.State() - if err != nil { - return err + + // Note that the initial cursor may not yet be ready, so we'll do an + // exponential backoff. + var cursor string + var cursorError error + for i := 1; i <= 3; i++ { + cursor, cursorError = journal.GetCursor() + if err != nil { + continue + } + time.Sleep(time.Duration(i*100) * time.Millisecond) + break + } + if cursorError != nil { + return errors.Wrap(cursorError, "inital journal cursor") + } + + // We need the container's events in the same journal to guarantee + // consistency, see #10323. + if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" { + return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger) } - if options.Follow && state == define.ContainerStateRunning { - go func() { - done := make(chan bool) - until := make(chan time.Time) - go func() { - select { - case <-ctx.Done(): - until <- time.Time{} - case <-done: - // nothing to do anymore + options.WaitGroup.Add(1) + go func() { + defer func() { + options.WaitGroup.Done() + if err := journal.Close(); err != nil { + logrus.Errorf("Unable to close journal: %v", err) + } + }() + + afterTimeStamp := false // needed for options.Since + tailQueue := []*logs.LogLine{} // needed for options.Tail + doTail := options.Tail > 0 + for { + select { + case <-ctx.Done(): + // Remote client may have closed/lost the connection. + return + default: + // Fallthrough + } + + if _, err := journal.Next(); err != nil { + logrus.Errorf("Failed to move journal cursor to next entry: %v", err) + return + } + latestCursor, err := journal.GetCursor() + if err != nil { + logrus.Errorf("Failed to get journal cursor: %v", err) + return + } + + // Hit the end of the journal. + if cursor == latestCursor { + if doTail { + // Flush *once* we hit the end of the journal. + startIndex := int64(len(tailQueue)-1) - options.Tail + if startIndex < 0 { + startIndex = 0 + } + for i := startIndex; i < int64(len(tailQueue)); i++ { + logChannel <- tailQueue[i] + } + tailQueue = nil + doTail = false + } + // Unless we follow, quit. + if !options.Follow { + return } - }() - go func() { - // FIXME (#10323): we are facing a terrible - // race condition here. At the time the - // container dies and `c.Wait()` has returned, - // we may not have received all journald logs. - // So far there is no other way than waiting - // for a second. Ultimately, `r.Follow` is - // racy and we may have to implement our custom - // logic here. - c.Wait(ctx) - time.Sleep(time.Second) - until <- time.Time{} - }() - follower := journaldFollowBuffer{logChannel, options.Multi} - err := r.Follow(until, follower) + // Sleep until something's happening on the journal. + journal.Wait(sdjournal.IndefiniteWait) + continue + } + cursor = latestCursor + + entry, err := journal.GetEntry() if err != nil { - logrus.Debugf(err.Error()) + logrus.Errorf("Failed to get journal entry: %v", err) + return } - r.Close() - options.WaitGroup.Done() - done <- true - return - }() - return nil - } - go func() { - bytes := make([]byte, bufLen) - // /me complains about no do-while in go - ec, err := r.Read(bytes) - for ec != 0 && err == nil { - // because we are reusing bytes, we need to make - // sure the old data doesn't get into the new line - bytestr := string(bytes[:ec]) - logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi) - if err2 != nil { - logrus.Error(err2) + if !afterTimeStamp { + entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond)) + if entryTime.Before(options.Since) { + continue + } + afterTimeStamp = true + } + + // If we're reading an event and the container exited/died, + // then we're done and can return. + event, ok := entry.Fields["PODMAN_EVENT"] + if ok { + status, err := events.StringToStatus(event) + if err != nil { + logrus.Errorf("Failed to translate event: %v", err) + return + } + if status == events.Exited { + return + } + continue + } + + var message string + var formatError error + + if options.Multi { + message, formatError = journalFormatterWithID(entry) + } else { + message, formatError = journalFormatter(entry) + } + + if formatError != nil { + logrus.Errorf("Failed to parse journald log entry: %v", err) + return + } + + logLine, err := logs.NewJournaldLogLine(message, options.Multi) + if err != nil { + logrus.Errorf("Failed parse log line: %v", err) + return + } + if doTail { + tailQueue = append(tailQueue, logLine) continue } logChannel <- logLine - ec, err = r.Read(bytes) } - if err != nil && err != io.EOF { - logrus.Error(err) - } - r.Close() - options.WaitGroup.Done() }() + return nil } -func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { +func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) { output, err := formatterPrefix(entry) if err != nil { return "", err @@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { return output, nil } -func journalFormatter(entry *journal.JournalEntry) (string, error) { +func journalFormatter(entry *sdjournal.JournalEntry) (string, error) { output, err := formatterPrefix(entry) if err != nil { return "", err @@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) { return output, nil } -func formatterPrefix(entry *journal.JournalEntry) (string, error) { +func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) { usec := entry.RealtimeTimestamp tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat) output := fmt.Sprintf("%s ", tsString) @@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) { return output, nil } -func formatterMessage(entry *journal.JournalEntry) (string, error) { +func formatterMessage(entry *sdjournal.JournalEntry) (string, error) { // Finally, append the message msg, ok := entry.Fields["MESSAGE"] if !ok { @@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) { msg = strings.TrimSuffix(msg, "\n") return msg, nil } - -type journaldFollowBuffer struct { - logChannel chan *logs.LogLine - withID bool -} - -func (f journaldFollowBuffer) Write(p []byte) (int, error) { - bytestr := string(p) - logLine, err := logs.NewJournaldLogLine(bytestr, f.withID) - if err != nil { - return -1, err - } - f.logChannel <- logLine - return len(p), nil -} |