//+build linux //+build systemd package libpod import ( "context" "fmt" "io" "math" "time" "github.com/containers/podman/v3/libpod/define" "github.com/containers/podman/v3/libpod/logs" journal "github.com/coreos/go-systemd/v22/sdjournal" "github.com/hpcloud/tail/watch" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) const ( // journaldLogOut is the journald priority signifying stdout journaldLogOut = "6" // journaldLogErr is the journald priority signifying stderr journaldLogErr = "3" // bufLen is the length of the buffer to read from a k8s-file // formatted log line // let's set it as 2k just to be safe if k8s-file format ever changes bufLen = 16384 ) func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error { var config journal.JournalReaderConfig if options.Tail < 0 { config.NumFromTail = 0 } else if options.Tail == 0 { config.NumFromTail = math.MaxUint64 } else { config.NumFromTail = uint64(options.Tail) } if options.Multi { config.Formatter = journalFormatterWithID } else { config.Formatter = journalFormatter } defaultTime := time.Time{} if options.Since != defaultTime { // coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future // return nothing instead of falsely printing if time.Now().Before(options.Since) { return nil } // coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past config.Since = -time.Since(options.Since) } config.Matches = append(config.Matches, journal.Match{ Field: "CONTAINER_ID_FULL", Value: c.ID(), }) options.WaitGroup.Add(1) r, err := journal.NewJournalReader(config) if err != nil { return err } if r == nil { return errors.Errorf("journal reader creation failed") } if options.Tail == math.MaxInt64 { r.Rewind() } state, err := c.State() if err != nil { return err } if options.Follow && state == define.ContainerStateRunning { go func() { done := make(chan bool) until := make(chan time.Time) go func() { select { case <-ctx.Done(): until <- time.Time{} case <-done: // nothing to do anymore } }() go func() { for { state, err := c.State() if err != nil { until <- time.Time{} logrus.Error(err) break } time.Sleep(watch.POLL_DURATION) if state != define.ContainerStateRunning && state != define.ContainerStatePaused { until <- time.Time{} break } } }() follower := FollowBuffer{logChannel} err := r.Follow(until, follower) if err != nil { logrus.Debugf(err.Error()) } r.Close() options.WaitGroup.Done() done <- true return }() return nil } go func() { bytes := make([]byte, bufLen) // /me complains about no do-while in go ec, err := r.Read(bytes) for ec != 0 && err == nil { // because we are reusing bytes, we need to make // sure the old data doesn't get into the new line bytestr := string(bytes[:ec]) logLine, err2 := logs.NewLogLine(bytestr) if err2 != nil { logrus.Error(err2) continue } logChannel <- logLine ec, err = r.Read(bytes) } if err != nil && err != io.EOF { logrus.Error(err) } r.Close() options.WaitGroup.Done() }() return nil } func journalFormatterWithID(entry *journal.JournalEntry) (string, error) { output, err := formatterPrefix(entry) if err != nil { return "", err } id, ok := entry.Fields["CONTAINER_ID_FULL"] if !ok { return "", fmt.Errorf("no CONTAINER_ID_FULL field present in journal entry") } if len(id) > 12 { id = id[:12] } output += fmt.Sprintf("%s ", id) // Append message msg, err := formatterMessage(entry) if err != nil { return "", err } output += msg return output, nil } func journalFormatter(entry *journal.JournalEntry) (string, error) { output, err := formatterPrefix(entry) if err != nil { return "", err } // Append message msg, err := formatterMessage(entry) if err != nil { return "", err } output += msg return output, nil } func formatterPrefix(entry *journal.JournalEntry) (string, error) { usec := entry.RealtimeTimestamp tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat) output := fmt.Sprintf("%s ", tsString) priority, ok := entry.Fields["PRIORITY"] if !ok { return "", errors.Errorf("no PRIORITY field present in journal entry") } if priority == journaldLogOut { output += "stdout " } else if priority == journaldLogErr { output += "stderr " } else { return "", errors.Errorf("unexpected PRIORITY field in journal entry") } // if CONTAINER_PARTIAL_MESSAGE is defined, the log type is "P" if _, ok := entry.Fields["CONTAINER_PARTIAL_MESSAGE"]; ok { output += fmt.Sprintf("%s ", logs.PartialLogType) } else { output += fmt.Sprintf("%s ", logs.FullLogType) } return output, nil } func formatterMessage(entry *journal.JournalEntry) (string, error) { // Finally, append the message msg, ok := entry.Fields["MESSAGE"] if !ok { return "", fmt.Errorf("no MESSAGE field present in journal entry") } return msg, nil } type FollowBuffer struct { logChannel chan *logs.LogLine } func (f FollowBuffer) Write(p []byte) (int, error) { bytestr := string(p) logLine, err := logs.NewLogLine(bytestr) if err != nil { return -1, err } f.logChannel <- logLine return len(p), nil }