summaryrefslogtreecommitdiff
path: root/libpod/container_log_linux.go
diff options
context:
space:
mode:
Diffstat (limited to 'libpod/container_log_linux.go')
-rw-r--r--libpod/container_log_linux.go143
1 files changed, 143 insertions, 0 deletions
diff --git a/libpod/container_log_linux.go b/libpod/container_log_linux.go
new file mode 100644
index 000000000..e549673a6
--- /dev/null
+++ b/libpod/container_log_linux.go
@@ -0,0 +1,143 @@
+//+build linux
+//+build systemd
+
+package libpod
+
+import (
+ "fmt"
+ "io"
+ "strings"
+ "time"
+
+ journal "github.com/coreos/go-systemd/sdjournal"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+)
+
+const (
+ // journaldLogOut is the journald priority signifying stdout
+ journaldLogOut = "6"
+
+ // journaldLogErr is the journald priority signifying stderr
+ journaldLogErr = "3"
+
+ // bufLen is the length of the buffer to read from a k8s-file
+ // formatted log line
+ // let's set it as 2k just to be safe if k8s-file format ever changes
+ bufLen = 16384
+)
+
+func (c *Container) readFromJournal(options *LogOptions, logChannel chan *LogLine) error {
+ var config journal.JournalReaderConfig
+ config.NumFromTail = options.Tail
+ config.Formatter = journalFormatter
+ defaultTime := time.Time{}
+ if options.Since != defaultTime {
+ // coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
+ // return nothing instead of fasely printing
+ if time.Now().Before(options.Since) {
+ return nil
+ }
+ config.Since = time.Since(options.Since)
+ }
+ config.Matches = append(config.Matches, journal.Match{
+ Field: "CONTAINER_ID_FULL",
+ Value: c.ID(),
+ })
+ options.WaitGroup.Add(1)
+
+ r, err := journal.NewJournalReader(config)
+ if err != nil {
+ return err
+ }
+ if r == nil {
+ return errors.Errorf("journal reader creation failed")
+ }
+ if options.Tail == 0 {
+ r.Rewind()
+ }
+
+ if options.Follow {
+ go func() {
+ follower := FollowBuffer{logChannel}
+ err := r.Follow(nil, follower)
+ if err != nil {
+ logrus.Debugf(err.Error())
+ }
+ r.Close()
+ options.WaitGroup.Done()
+ return
+ }()
+ return nil
+ }
+
+ go func() {
+ bytes := make([]byte, bufLen)
+ // /me complains about no do-while in go
+ ec, err := r.Read(bytes)
+ for ec != 0 && err == nil {
+ // because we are reusing bytes, we need to make
+ // sure the old data doesn't get into the new line
+ bytestr := string(bytes[:ec])
+ logLine, err2 := newLogLine(bytestr)
+ if err2 != nil {
+ logrus.Error(err2)
+ continue
+ }
+ logChannel <- logLine
+ ec, err = r.Read(bytes)
+ }
+ if err != nil && err != io.EOF {
+ logrus.Error(err)
+ }
+ r.Close()
+ options.WaitGroup.Done()
+ }()
+ return nil
+}
+
+func journalFormatter(entry *journal.JournalEntry) (string, error) {
+ usec := entry.RealtimeTimestamp
+ tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logTimeFormat)
+ output := fmt.Sprintf("%s ", tsString)
+ priority, ok := entry.Fields["PRIORITY"]
+ if !ok {
+ return "", errors.Errorf("no PRIORITY field present in journal entry")
+ }
+ if priority == journaldLogOut {
+ output += "stdout "
+ } else if priority == journaldLogErr {
+ output += "stderr "
+ } else {
+ return "", errors.Errorf("unexpected PRIORITY field in journal entry")
+ }
+
+ // if CONTAINER_PARTIAL_MESSAGE is defined, the log type is "P"
+ if _, ok := entry.Fields["CONTAINER_PARTIAL_MESSAGE"]; ok {
+ output += fmt.Sprintf("%s ", partialLogType)
+ } else {
+ output += fmt.Sprintf("%s ", fullLogType)
+ }
+
+ // Finally, append the message
+ msg, ok := entry.Fields["MESSAGE"]
+ if !ok {
+ return "", fmt.Errorf("no MESSAGE field present in journal entry")
+ }
+ output += strings.TrimSpace(msg)
+ return output, nil
+}
+
+type FollowBuffer struct {
+ logChannel chan *LogLine
+}
+
+func (f FollowBuffer) Write(p []byte) (int, error) {
+ bytestr := string(p)
+ logLine, err := newLogLine(bytestr)
+ if err != nil {
+ return -1, err
+ }
+ f.logChannel <- logLine
+ return len(p), nil
+}