From 7ffb8a79007414e510ac14ac0d682e1bbfe30c7c Mon Sep 17 00:00:00 2001 From: umohnani8 Date: Thu, 30 Aug 2018 10:10:05 -0400 Subject: Add CRI logs parsing to podman logs Podman logs was not parsing CRI logs well, especially the F and P logs. Now using the same parsing code as in kube here. Signed-off-by: umohnani8 Closes: #1403 Approved by: rhatdan --- cmd/podman/logs.go | 161 ++----------------------- pkg/logs/logs.go | 344 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 352 insertions(+), 153 deletions(-) create mode 100644 pkg/logs/logs.go diff --git a/cmd/podman/logs.go b/cmd/podman/logs.go index 555c65fa5..34d062c56 100644 --- a/cmd/podman/logs.go +++ b/cmd/podman/logs.go @@ -1,28 +1,17 @@ package main import ( - "fmt" - "io" "os" - "strings" "time" - "bufio" "github.com/containers/libpod/cmd/podman/libpodruntime" "github.com/containers/libpod/libpod" + "github.com/containers/libpod/pkg/logs" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" ) -type logOptions struct { - details bool - follow bool - sinceTime time.Time - tail uint64 - showTimestamps bool -} - var ( logsFlags = []cli.Flag{ cli.BoolFlag{ @@ -90,12 +79,12 @@ func logsCmd(c *cli.Context) error { sinceTime = since } - opts := logOptions{ - details: c.Bool("details"), - follow: c.Bool("follow"), - sinceTime: sinceTime, - tail: c.Uint64("tail"), - showTimestamps: c.Bool("timestamps"), + opts := &logs.LogOptions{ + Details: c.Bool("details"), + Follow: c.Bool("follow"), + Since: sinceTime, + Tail: c.Uint64("tail"), + Timestamps: c.Bool("timestamps"), } if c.Bool("latest") { @@ -123,141 +112,7 @@ func logsCmd(c *cli.Context) error { return nil } } - - file, err := os.Open(logPath) - if err != nil { - return errors.Wrapf(err, "unable to read container log file") - } - defer file.Close() - reader := bufio.NewReader(file) - if opts.follow { - followLog(reader, opts, ctr) - } else { - dumpLog(reader, opts) - } - return err -} - -func followLog(reader *bufio.Reader, opts logOptions, ctr *libpod.Container) error { - var cacheOutput []string - firstPass := false - if opts.tail > 0 { - firstPass = true - } - // We need to read the entire file in here until we reach EOF - // and then dump it out in the case that the user also wants - // tail output - for { - line, err := reader.ReadString('\n') - if err == io.EOF && opts.follow { - if firstPass { - firstPass = false - cacheLen := int64(len(cacheOutput)) - start := int64(0) - if cacheLen > int64(opts.tail) { - start = cacheLen - int64(opts.tail) - } - for i := start; i < cacheLen; i++ { - printLine(cacheOutput[i], opts) - } - continue - } - time.Sleep(1 * time.Second) - // Check if container is still running or paused - state, err := ctr.State() - if err != nil { - return err - } - if state != libpod.ContainerStateRunning && state != libpod.ContainerStatePaused { - break - } - continue - } - // exits - if err != nil { - break - } - if firstPass { - cacheOutput = append(cacheOutput, line) - continue - } - printLine(line, opts) - } - return nil -} - -func dumpLog(reader *bufio.Reader, opts logOptions) error { - output := readLog(reader, opts) - for _, line := range output { - printLine(line, opts) - } - - return nil -} - -func readLog(reader *bufio.Reader, opts logOptions) []string { - var output []string - for { - line, err := reader.ReadString('\n') - if err != nil { - break - } - output = append(output, line) - } - start := 0 - if opts.tail > 0 { - if len(output) > int(opts.tail) { - start = len(output) - int(opts.tail) - } - } - return output[start:] -} - -func printLine(line string, opts logOptions) { - start := 3 - fields := strings.Fields(line) - if opts.showTimestamps || !isStringTimestamp(fields[0]) { - start = 0 - } - if opts.sinceTime.IsZero() || logSinceTime(opts.sinceTime, fields[0]) { - output := strings.Join(fields[start:], " ") - fmt.Printf("%s\n", output) - } -} - -func isStringTimestamp(t string) bool { - _, err := time.Parse("2006-01-02T15:04:05.999999999-07:00", t) - if err != nil { - return false - } - return true -} - -// returns true if the time stamps of the logs are equal to or after the -// timestamp comparing to -func logSinceTime(sinceTime time.Time, logStr string) bool { - timestamp := strings.Split(logStr, " ")[0] - logTime, err := time.Parse("2006-01-02T15:04:05.999999999-07:00", timestamp) - if err != nil { - return false - } - return logTime.After(sinceTime) || logTime.Equal(sinceTime) -} - -// secondSpaceIndex returns the index of the second space in a string -// In a line of the logs, the first two tokens are a timestamp and stdout/stderr, -// followed by the message itself. This allows us to get the index of the message -// and avoid sending the other information back to the caller of GetLogs() -func secondSpaceIndex(line string) int { - index := strings.Index(line, " ") - if index == -1 { - return 0 - } - index = strings.Index(line[index:], " ") - if index == -1 { - return 0 - } - return index + return logs.ReadLogs(logPath, ctr, opts) } // parseInputTime takes the users input and to determine if it is valid and diff --git a/pkg/logs/logs.go b/pkg/logs/logs.go new file mode 100644 index 000000000..b104c592b --- /dev/null +++ b/pkg/logs/logs.go @@ -0,0 +1,344 @@ +/* +This package picks up CRI parsing and writer for the logs from the kubernetes +logs package. These two bits have been modified to fit the requirements of libpod. + +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logs + +import ( + "bufio" + "bytes" + "fmt" + "io" + "math" + "os" + "time" + + "github.com/containers/libpod/libpod" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + // timeFormat is the time format used in the log. + timeFormat = time.RFC3339Nano +) + +// LogStreamType is the type of the stream in CRI container log. +type LogStreamType string + +const ( + // Stdout is the stream type for stdout. + Stdout LogStreamType = "stdout" + // Stderr is the stream type for stderr. + Stderr LogStreamType = "stderr" +) + +// LogTag is the tag of a log line in CRI container log. +// Currently defined log tags: +// * First tag: Partial/Full - P/F. +// The field in the container log format can be extended to include multiple +// tags by using a delimiter, but changes should be rare. +type LogTag string + +const ( + // LogTagPartial means the line is part of multiple lines. + LogTagPartial LogTag = "P" + // LogTagFull means the line is a single full line or the end of multiple lines. + LogTagFull LogTag = "F" + // LogTagDelimiter is the delimiter for different log tags. + LogTagDelimiter = ":" +) + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} + // delimiter is the delimiter for timestamp and stream type in log line. + delimiter = []byte{' '} + // tagDelimiter is the delimiter for log tags. + tagDelimiter = []byte(LogTagDelimiter) +) + +// logMessage is the CRI internal log type. +type logMessage struct { + timestamp time.Time + stream LogStreamType + log []byte +} + +// LogOptions is the options you can use for logs +type LogOptions struct { + Details bool + Follow bool + Since time.Time + Tail uint64 + Timestamps bool + bytes int64 +} + +// reset resets the log to nil. +func (l *logMessage) reset() { + l.timestamp = time.Time{} + l.stream = "" + l.log = nil +} + +// parseCRILog parses logs in CRI log format. CRI Log format example: +// 2016-10-06T00:17:09.669794202Z stdout P log content 1 +// 2016-10-06T00:17:09.669794203Z stderr F log content 2 +func parseCRILog(log []byte, msg *logMessage) error { + var err error + // Parse timestamp + idx := bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("timestamp is not found") + } + msg.timestamp, err = time.Parse(timeFormat, string(log[:idx])) + if err != nil { + return fmt.Errorf("unexpected timestamp format %q: %v", timeFormat, err) + } + + // Parse stream type + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("stream type is not found") + } + msg.stream = LogStreamType(log[:idx]) + if msg.stream != Stdout && msg.stream != Stderr { + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + + // Parse log tag + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("log tag is not found") + } + // Keep this forward compatible. + tags := bytes.Split(log[:idx], tagDelimiter) + partial := (LogTag(tags[0]) == LogTagPartial) + // Trim the tailing new line if this is a partial line. + if partial && len(log) > 0 && log[len(log)-1] == '\n' { + log = log[:len(log)-1] + } + + // Get log content + msg.log = log[idx+1:] + + return nil +} + +// ReadLogs reads in the logs from the logPath +func ReadLogs(logPath string, ctr *libpod.Container, opts *LogOptions) error { + file, err := os.Open(logPath) + if err != nil { + return errors.Wrapf(err, "failed to open log file %q", logPath) + } + defer file.Close() + + msg := &logMessage{} + opts.bytes = -1 + writer := newLogWriter(opts) + reader := bufio.NewReader(file) + + if opts.Follow { + followLog(reader, writer, opts, ctr, msg, logPath) + } else { + dumpLog(reader, writer, opts, msg, logPath) + } + return err +} + +func followLog(reader *bufio.Reader, writer *logWriter, opts *LogOptions, ctr *libpod.Container, msg *logMessage, logPath string) error { + var cacheOutput []string + firstPass := false + if opts.Tail > 0 { + firstPass = true + } + // We need to read the entire file in here until we reach EOF + // and then dump it out in the case that the user also wants + // tail output + for { + line, err := reader.ReadString(eol[0]) + if err == io.EOF && opts.Follow { + if firstPass { + firstPass = false + cacheLen := int64(len(cacheOutput)) + start := int64(0) + if cacheLen > int64(opts.Tail) { + start = cacheLen - int64(opts.Tail) + } + for i := start; i < cacheLen; i++ { + msg.reset() + if err := parseCRILog([]byte(cacheOutput[i]), msg); err != nil { + return errors.Wrapf(err, "error parsing log line") + } + // Write the log line into the stream. + if err := writer.write(msg); err != nil { + if err == errMaximumWrite { + logrus.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", logPath, opts.bytes) + return nil + } + logrus.Errorf("Failed with err %v when writing log for log file %q: %+v", err, logPath, msg) + return err + } + } + continue + } + time.Sleep(1 * time.Second) + // Check if container is still running or paused + state, err := ctr.State() + if err != nil { + return err + } + if state != libpod.ContainerStateRunning && state != libpod.ContainerStatePaused { + break + } + continue + } + // exits + if err != nil { + break + } + if firstPass { + cacheOutput = append(cacheOutput, line) + continue + } + msg.reset() + if err := parseCRILog([]byte(line), msg); err != nil { + return errors.Wrapf(err, "error parsing log line") + } + // Write the log line into the stream. + if err := writer.write(msg); err != nil { + if err == errMaximumWrite { + logrus.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", logPath, opts.bytes) + return nil + } + logrus.Errorf("Failed with err %v when writing log for log file %q: %+v", err, logPath, msg) + return err + } + } + return nil +} + +func dumpLog(reader *bufio.Reader, writer *logWriter, opts *LogOptions, msg *logMessage, logPath string) error { + output := readLog(reader, opts) + for _, line := range output { + msg.reset() + if err := parseCRILog([]byte(line), msg); err != nil { + return errors.Wrapf(err, "error parsing log line") + } + // Write the log line into the stream. + if err := writer.write(msg); err != nil { + if err == errMaximumWrite { + logrus.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", logPath, opts.bytes) + return nil + } + logrus.Errorf("Failed with err %v when writing log for log file %q: %+v", err, logPath, msg) + return err + } + } + + return nil +} + +func readLog(reader *bufio.Reader, opts *LogOptions) []string { + var output []string + for { + line, err := reader.ReadString(eol[0]) + if err != nil { + break + } + output = append(output, line) + } + start := 0 + if opts.Tail > 0 { + if len(output) > int(opts.Tail) { + start = len(output) - int(opts.Tail) + } + } + return output[start:] +} + +// logWriter controls the writing into the stream based on the log options. +type logWriter struct { + stdout io.Writer + stderr io.Writer + opts *LogOptions + remain int64 +} + +// errMaximumWrite is returned when all bytes have been written. +var errMaximumWrite = errors.New("maximum write") + +// errShortWrite is returned when the message is not fully written. +var errShortWrite = errors.New("short write") + +func newLogWriter(opts *LogOptions) *logWriter { + w := &logWriter{ + stdout: os.Stdout, + stderr: os.Stderr, + opts: opts, + remain: math.MaxInt64, // initialize it as infinity + } + if opts.bytes >= 0 { + w.remain = opts.bytes + } + return w +} + +// writeLogs writes logs into stdout, stderr. +func (w *logWriter) write(msg *logMessage) error { + if msg.timestamp.Before(w.opts.Since) { + // Skip the line because it's older than since + return nil + } + line := msg.log + if w.opts.Timestamps { + prefix := append([]byte(msg.timestamp.Format(timeFormat)), delimiter[0]) + line = append(prefix, line...) + } + // If the line is longer than the remaining bytes, cut it. + if int64(len(line)) > w.remain { + line = line[:w.remain] + } + // Get the proper stream to write to. + var stream io.Writer + switch msg.stream { + case Stdout: + stream = w.stdout + case Stderr: + stream = w.stderr + default: + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + n, err := stream.Write(line) + w.remain -= int64(n) + if err != nil { + return err + } + // If the line has not been fully written, return errShortWrite + if n < len(line) { + return errShortWrite + } + // If there are no more bytes left, return errMaximumWrite + if w.remain <= 0 { + return errMaximumWrite + } + return nil +} -- cgit v1.2.3-54-g00ecf