From 10569c988f8ad3bfa796e52c61d7f4ef5266c193 Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Thu, 20 May 2021 11:07:28 +0200 Subject: journald logger: fix race condition Fix a race in journald driver. Following the logs implies streaming until the container is dead. Streaming happened in one goroutine, waiting for the container to exit/die and signaling that event happened in another goroutine. The nature of having two goroutines running simultaneously is pretty much the core of the race condition. When the streaming goroutines received the signal that the container has exitted, the routine may not have read and written all of the container's logs. Fix this race by reading both, the logs and the events, of the container and stop streaming when the died/exited event has been read. The died event is guaranteed to be after all logs in the journal which guarantees not only consistencty but also a deterministic behavior. Note that the journald log driver now requires the journald event backend to be set. Fixes: #10323 Signed-off-by: Valentin Rothberg --- libpod/container_log_linux.go | 276 +++++++++++++++++++++++++----------------- test/e2e/logs_test.go | 2 +- test/system/035-logs.bats | 52 ++++++++ test/system/130-kill.bats | 3 +- 4 files changed, 216 insertions(+), 117 deletions(-) diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go index ec4fa9724..892ee34e3 100644 --- a/libpod/container_log_linux.go +++ b/libpod/container_log_linux.go @@ -6,14 +6,12 @@ package libpod import ( "context" "fmt" - "io" - "math" "strings" "time" - "github.com/containers/podman/v3/libpod/define" + "github.com/containers/podman/v3/libpod/events" "github.com/containers/podman/v3/libpod/logs" - journal "github.com/coreos/go-systemd/v22/sdjournal" + "github.com/coreos/go-systemd/v22/sdjournal" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -24,122 +22,187 @@ const ( // journaldLogErr is the journald priority signifying stderr journaldLogErr = "3" - - // bufLen is the length of the buffer to read from a k8s-file - // formatted log line - // let's set it as 2k just to be safe if k8s-file format ever changes - bufLen = 16384 ) func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { - var config journal.JournalReaderConfig - if options.Tail < 0 { - config.NumFromTail = 0 - } else if options.Tail == 0 { - config.NumFromTail = math.MaxUint64 - } else { - config.NumFromTail = uint64(options.Tail) + journal, err := sdjournal.NewJournal() + if err != nil { + return err } - if options.Multi { - config.Formatter = journalFormatterWithID - } else { - config.Formatter = journalFormatter - } - defaultTime := time.Time{} - if options.Since != defaultTime { - // coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future - // return nothing instead of falsely printing - if time.Now().Before(options.Since) { - return nil - } - // coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past - config.Since = -time.Since(options.Since) + // While logs are written to the `logChannel`, we inspect each event + // and stop once the container has died. Having logs and events in one + // stream prevents a race condition that we faced in #10323. + + // Add the filters for events. + match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"} + if err := journal.AddMatch(match.String()); err != nil { + return errors.Wrapf(err, "adding filter to journald logger: %v", match) + } + match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()} + if err := journal.AddMatch(match.String()); err != nil { + return errors.Wrapf(err, "adding filter to journald logger: %v", match) } - config.Matches = append(config.Matches, journal.Match{ - Field: "CONTAINER_ID_FULL", - Value: c.ID(), - }) - options.WaitGroup.Add(1) - r, err := journal.NewJournalReader(config) - if err != nil { + // Add the filter for logs. Note the disjunction so that we match + // either the events or the logs. + if err := journal.AddDisjunction(); err != nil { + return errors.Wrap(err, "adding filter disjunction to journald logger") + } + match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()} + if err := journal.AddMatch(match.String()); err != nil { + return errors.Wrapf(err, "adding filter to journald logger: %v", match) + } + + if err := journal.SeekHead(); err != nil { return err } - if r == nil { - return errors.Errorf("journal reader creation failed") + // API requires Next() immediately after SeekHead(). + if _, err := journal.Next(); err != nil { + return errors.Wrap(err, "initial journal cursor") } - if options.Tail == math.MaxInt64 { - r.Rewind() + + // API requires a next|prev before getting a cursor. + if _, err := journal.Previous(); err != nil { + return errors.Wrap(err, "initial journal cursor") } - state, err := c.State() - if err != nil { - return err + + // Note that the initial cursor may not yet be ready, so we'll do an + // exponential backoff. + var cursor string + var cursorError error + for i := 1; i <= 3; i++ { + cursor, cursorError = journal.GetCursor() + if err != nil { + continue + } + time.Sleep(time.Duration(i*100) * time.Millisecond) + break + } + if cursorError != nil { + return errors.Wrap(cursorError, "inital journal cursor") + } + + // We need the container's events in the same journal to guarantee + // consistency, see #10323. + if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" { + return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger) } - if options.Follow && state == define.ContainerStateRunning { - go func() { - done := make(chan bool) - until := make(chan time.Time) - go func() { - select { - case <-ctx.Done(): - until <- time.Time{} - case <-done: - // nothing to do anymore + options.WaitGroup.Add(1) + go func() { + defer func() { + options.WaitGroup.Done() + if err := journal.Close(); err != nil { + logrus.Errorf("Unable to close journal: %v", err) + } + }() + + afterTimeStamp := false // needed for options.Since + tailQueue := []*logs.LogLine{} // needed for options.Tail + doTail := options.Tail > 0 + for { + select { + case <-ctx.Done(): + // Remote client may have closed/lost the connection. + return + default: + // Fallthrough + } + + if _, err := journal.Next(); err != nil { + logrus.Errorf("Failed to move journal cursor to next entry: %v", err) + return + } + latestCursor, err := journal.GetCursor() + if err != nil { + logrus.Errorf("Failed to get journal cursor: %v", err) + return + } + + // Hit the end of the journal. + if cursor == latestCursor { + if doTail { + // Flush *once* we hit the end of the journal. + startIndex := int64(len(tailQueue)-1) - options.Tail + if startIndex < 0 { + startIndex = 0 + } + for i := startIndex; i < int64(len(tailQueue)); i++ { + logChannel <- tailQueue[i] + } + tailQueue = nil + doTail = false + } + // Unless we follow, quit. + if !options.Follow { + return } - }() - go func() { - // FIXME (#10323): we are facing a terrible - // race condition here. At the time the - // container dies and `c.Wait()` has returned, - // we may not have received all journald logs. - // So far there is no other way than waiting - // for a second. Ultimately, `r.Follow` is - // racy and we may have to implement our custom - // logic here. - c.Wait(ctx) - time.Sleep(time.Second) - until <- time.Time{} - }() - follower := journaldFollowBuffer{logChannel, options.Multi} - err := r.Follow(until, follower) + // Sleep until something's happening on the journal. + journal.Wait(sdjournal.IndefiniteWait) + continue + } + cursor = latestCursor + + entry, err := journal.GetEntry() if err != nil { - logrus.Debugf(err.Error()) + logrus.Errorf("Failed to get journal entry: %v", err) + return } - r.Close() - options.WaitGroup.Done() - done <- true - return - }() - return nil - } - go func() { - bytes := make([]byte, bufLen) - // /me complains about no do-while in go - ec, err := r.Read(bytes) - for ec != 0 && err == nil { - // because we are reusing bytes, we need to make - // sure the old data doesn't get into the new line - bytestr := string(bytes[:ec]) - logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi) - if err2 != nil { - logrus.Error(err2) + if !afterTimeStamp { + entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond)) + if entryTime.Before(options.Since) { + continue + } + afterTimeStamp = true + } + + // If we're reading an event and the container exited/died, + // then we're done and can return. + event, ok := entry.Fields["PODMAN_EVENT"] + if ok { + status, err := events.StringToStatus(event) + if err != nil { + logrus.Errorf("Failed to translate event: %v", err) + return + } + if status == events.Exited { + return + } + continue + } + + var message string + var formatError error + + if options.Multi { + message, formatError = journalFormatterWithID(entry) + } else { + message, formatError = journalFormatter(entry) + } + + if formatError != nil { + logrus.Errorf("Failed to parse journald log entry: %v", err) + return + } + + logLine, err := logs.NewJournaldLogLine(message, options.Multi) + if err != nil { + logrus.Errorf("Failed parse log line: %v", err) + return + } + if doTail { + tailQueue = append(tailQueue, logLine) continue } logChannel <- logLine - ec, err = r.Read(bytes) } - if err != nil && err != io.EOF { - logrus.Error(err) - } - r.Close() - options.WaitGroup.Done() }() + return nil } -func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { +func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) { output, err := formatterPrefix(entry) if err != nil { return "", err @@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { return output, nil } -func journalFormatter(entry *journal.JournalEntry) (string, error) { +func journalFormatter(entry *sdjournal.JournalEntry) (string, error) { output, err := formatterPrefix(entry) if err != nil { return "", err @@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) { return output, nil } -func formatterPrefix(entry *journal.JournalEntry) (string, error) { +func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) { usec := entry.RealtimeTimestamp tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat) output := fmt.Sprintf("%s ", tsString) @@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) { return output, nil } -func formatterMessage(entry *journal.JournalEntry) (string, error) { +func formatterMessage(entry *sdjournal.JournalEntry) (string, error) { // Finally, append the message msg, ok := entry.Fields["MESSAGE"] if !ok { @@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) { msg = strings.TrimSuffix(msg, "\n") return msg, nil } - -type journaldFollowBuffer struct { - logChannel chan *logs.LogLine - withID bool -} - -func (f journaldFollowBuffer) Write(p []byte) (int, error) { - bytestr := string(p) - logLine, err := logs.NewJournaldLogLine(bytestr, f.withID) - if err != nil { - return -1, err - } - f.logChannel <- logLine - return len(p), nil -} diff --git a/test/e2e/logs_test.go b/test/e2e/logs_test.go index 3051031a5..4d9cbb48b 100644 --- a/test/e2e/logs_test.go +++ b/test/e2e/logs_test.go @@ -163,7 +163,7 @@ var _ = Describe("Podman logs", func() { }) It("podman logs on a created container should result in 0 exit code: "+log, func() { - session := podmanTest.Podman([]string{"create", "-t", "--name", "log", ALPINE}) + session := podmanTest.Podman([]string{"create", "--log-driver", log, "-t", "--name", "log", ALPINE}) session.WaitWithDefaultTimeout() Expect(session).To(Exit(0)) diff --git a/test/system/035-logs.bats b/test/system/035-logs.bats index 3dd88e5eb..ccf83df14 100644 --- a/test/system/035-logs.bats +++ b/test/system/035-logs.bats @@ -73,4 +73,56 @@ ${cid[0]} d" "Sequential output from logs" _log_test_multi journald } +@test "podman logs - journald log driver requires journald events backend" { + skip_if_remote "remote does not support --events-backend" + # We can't use journald on RHEL as rootless: rhbz#1895105 + skip_if_journald_unavailable + + run_podman --events-backend=file run --log-driver=journald -d --name test --replace $IMAGE ls / + run_podman --events-backend=file logs test + run_podman 125 --events-backend=file logs --follow test + is "$output" "Error: using --follow with the journald --log-driver but without the journald --events-backend (file) is not supported" "journald logger requires journald eventer" +} + +function _log_test_since() { + local driver=$1 + + s_before="before_$(random_string)_${driver}" + s_after="after_$(random_string)_${driver}" + + before=$(date --iso-8601=seconds) + run_podman run --log-driver=$driver -d --name test $IMAGE sh -c \ + "echo $s_before; trap 'echo $s_after; exit' SIGTERM; while :; do sleep 1; done" + + # sleep a second to make sure the date is after the first echo + sleep 1 + after=$(date --iso-8601=seconds) + run_podman stop test + + run_podman logs test + is "$output" \ + "$s_before +$s_after" + + run_podman logs --since $before test + is "$output" \ + "$s_before +$s_after" + + run_podman logs --since $after test + is "$output" "$s_after" + run_podman rm -f test +} + +@test "podman logs - since k8s-file" { + _log_test_since k8s-file +} + +@test "podman logs - since journald" { + # We can't use journald on RHEL as rootless: rhbz#1895105 + skip_if_journald_unavailable + + _log_test_since journald +} + # vim: filetype=sh diff --git a/test/system/130-kill.bats b/test/system/130-kill.bats index 1b02b4976..3770eac27 100644 --- a/test/system/130-kill.bats +++ b/test/system/130-kill.bats @@ -8,8 +8,7 @@ load helpers @test "podman kill - test signal handling in containers" { # Start a container that will handle all signals by emitting 'got: N' local -a signals=(1 2 3 4 5 6 8 10 12 13 14 15 16 20 21 22 23 24 25 26 64) - # Force the k8s-file driver until #10323 is fixed. - run_podman run --log-driver=k8s-file -d $IMAGE sh -c \ + run_podman run -d $IMAGE sh -c \ "for i in ${signals[*]}; do trap \"echo got: \$i\" \$i; done; echo READY; while ! test -e /stop; do sleep 0.05; done; -- cgit v1.2.3-54-g00ecf