diff options
Diffstat (limited to 'libpod/container_log_linux.go')
-rw-r--r-- | libpod/container_log_linux.go | 143 |
1 files changed, 143 insertions, 0 deletions
diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go new file mode 100644 index 000000000..e549673a6 --- /dev/null +++ b/libpod/container_log_linux.go @@ -0,0 +1,143 @@ +//+build linux +//+build systemd + +package libpod + +import ( + "fmt" + "io" + "strings" + "time" + + journal "github.com/coreos/go-systemd/sdjournal" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + // journaldLogOut is the journald priority signifying stdout + journaldLogOut = "6" + + // journaldLogErr is the journald priority signifying stderr + journaldLogErr = "3" + + // bufLen is the length of the buffer to read from a k8s-file + // formatted log line + // let's set it as 2k just to be safe if k8s-file format ever changes + bufLen = 16384 +) + +func (c *Container) readFromJournal(options *LogOptions, logChannel chan *LogLine) error { + var config journal.JournalReaderConfig + config.NumFromTail = options.Tail + config.Formatter = journalFormatter + defaultTime := time.Time{} + if options.Since != defaultTime { + // coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future + // return nothing instead of fasely printing + if time.Now().Before(options.Since) { + return nil + } + config.Since = time.Since(options.Since) + } + config.Matches = append(config.Matches, journal.Match{ + Field: "CONTAINER_ID_FULL", + Value: c.ID(), + }) + options.WaitGroup.Add(1) + + r, err := journal.NewJournalReader(config) + if err != nil { + return err + } + if r == nil { + return errors.Errorf("journal reader creation failed") + } + if options.Tail == 0 { + r.Rewind() + } + + if options.Follow { + go func() { + follower := FollowBuffer{logChannel} + err := r.Follow(nil, follower) + if err != nil { + logrus.Debugf(err.Error()) + } + r.Close() + options.WaitGroup.Done() + return + }() + return nil + } + + go func() { + bytes := make([]byte, bufLen) + // /me complains about no do-while in go + ec, err := r.Read(bytes) + for ec != 0 && err == nil { + // because we are reusing bytes, we need to make + // sure the old data doesn't get into the new line + bytestr := string(bytes[:ec]) + logLine, err2 := newLogLine(bytestr) + if err2 != nil { + logrus.Error(err2) + continue + } + logChannel <- logLine + ec, err = r.Read(bytes) + } + if err != nil && err != io.EOF { + logrus.Error(err) + } + r.Close() + options.WaitGroup.Done() + }() + return nil +} + +func journalFormatter(entry *journal.JournalEntry) (string, error) { + usec := entry.RealtimeTimestamp + tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logTimeFormat) + output := fmt.Sprintf("%s ", tsString) + priority, ok := entry.Fields["PRIORITY"] + if !ok { + return "", errors.Errorf("no PRIORITY field present in journal entry") + } + if priority == journaldLogOut { + output += "stdout " + } else if priority == journaldLogErr { + output += "stderr " + } else { + return "", errors.Errorf("unexpected PRIORITY field in journal entry") + } + + // if CONTAINER_PARTIAL_MESSAGE is defined, the log type is "P" + if _, ok := entry.Fields["CONTAINER_PARTIAL_MESSAGE"]; ok { + output += fmt.Sprintf("%s ", partialLogType) + } else { + output += fmt.Sprintf("%s ", fullLogType) + } + + // Finally, append the message + msg, ok := entry.Fields["MESSAGE"] + if !ok { + return "", fmt.Errorf("no MESSAGE field present in journal entry") + } + output += strings.TrimSpace(msg) + return output, nil +} + +type FollowBuffer struct { + logChannel chan *LogLine +} + +func (f FollowBuffer) Write(p []byte) (int, error) { + bytestr := string(p) + logLine, err := newLogLine(bytestr) + if err != nil { + return -1, err + } + f.logChannel <- logLine + return len(p), nil +} |