aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorumohnani8 <umohnani@redhat.com>2018-08-30 10:10:05 -0400
committerAtomic Bot <atomic-devel@projectatomic.io>2018-09-05 13:48:20 +0000
commit7ffb8a79007414e510ac14ac0d682e1bbfe30c7c (patch)
tree8c031232a361c5d3ed662bc6bc9082f23cb2911a
parent4ddcbd7941e8cb32cbdcac2f1aa3a939e82be764 (diff)
downloadpodman-7ffb8a79007414e510ac14ac0d682e1bbfe30c7c.tar.gz
podman-7ffb8a79007414e510ac14ac0d682e1bbfe30c7c.tar.bz2
podman-7ffb8a79007414e510ac14ac0d682e1bbfe30c7c.zip
Add CRI logs parsing to podman logs
Podman logs was not parsing CRI logs well, especially the F and P logs. Now using the same parsing code as in kube here. Signed-off-by: umohnani8 <umohnani@redhat.com> Closes: #1403 Approved by: rhatdan
-rw-r--r--cmd/podman/logs.go161
-rw-r--r--pkg/logs/logs.go344
2 files changed, 352 insertions, 153 deletions
diff --git a/cmd/podman/logs.go b/cmd/podman/logs.go
index 555c65fa5..34d062c56 100644
--- a/cmd/podman/logs.go
+++ b/cmd/podman/logs.go
@@ -1,28 +1,17 @@
package main
import (
- "fmt"
- "io"
"os"
- "strings"
"time"
- "bufio"
"github.com/containers/libpod/cmd/podman/libpodruntime"
"github.com/containers/libpod/libpod"
+ "github.com/containers/libpod/pkg/logs"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)
-type logOptions struct {
- details bool
- follow bool
- sinceTime time.Time
- tail uint64
- showTimestamps bool
-}
-
var (
logsFlags = []cli.Flag{
cli.BoolFlag{
@@ -90,12 +79,12 @@ func logsCmd(c *cli.Context) error {
sinceTime = since
}
- opts := logOptions{
- details: c.Bool("details"),
- follow: c.Bool("follow"),
- sinceTime: sinceTime,
- tail: c.Uint64("tail"),
- showTimestamps: c.Bool("timestamps"),
+ opts := &logs.LogOptions{
+ Details: c.Bool("details"),
+ Follow: c.Bool("follow"),
+ Since: sinceTime,
+ Tail: c.Uint64("tail"),
+ Timestamps: c.Bool("timestamps"),
}
if c.Bool("latest") {
@@ -123,141 +112,7 @@ func logsCmd(c *cli.Context) error {
return nil
}
}
-
- file, err := os.Open(logPath)
- if err != nil {
- return errors.Wrapf(err, "unable to read container log file")
- }
- defer file.Close()
- reader := bufio.NewReader(file)
- if opts.follow {
- followLog(reader, opts, ctr)
- } else {
- dumpLog(reader, opts)
- }
- return err
-}
-
-func followLog(reader *bufio.Reader, opts logOptions, ctr *libpod.Container) error {
- var cacheOutput []string
- firstPass := false
- if opts.tail > 0 {
- firstPass = true
- }
- // We need to read the entire file in here until we reach EOF
- // and then dump it out in the case that the user also wants
- // tail output
- for {
- line, err := reader.ReadString('\n')
- if err == io.EOF && opts.follow {
- if firstPass {
- firstPass = false
- cacheLen := int64(len(cacheOutput))
- start := int64(0)
- if cacheLen > int64(opts.tail) {
- start = cacheLen - int64(opts.tail)
- }
- for i := start; i < cacheLen; i++ {
- printLine(cacheOutput[i], opts)
- }
- continue
- }
- time.Sleep(1 * time.Second)
- // Check if container is still running or paused
- state, err := ctr.State()
- if err != nil {
- return err
- }
- if state != libpod.ContainerStateRunning && state != libpod.ContainerStatePaused {
- break
- }
- continue
- }
- // exits
- if err != nil {
- break
- }
- if firstPass {
- cacheOutput = append(cacheOutput, line)
- continue
- }
- printLine(line, opts)
- }
- return nil
-}
-
-func dumpLog(reader *bufio.Reader, opts logOptions) error {
- output := readLog(reader, opts)
- for _, line := range output {
- printLine(line, opts)
- }
-
- return nil
-}
-
-func readLog(reader *bufio.Reader, opts logOptions) []string {
- var output []string
- for {
- line, err := reader.ReadString('\n')
- if err != nil {
- break
- }
- output = append(output, line)
- }
- start := 0
- if opts.tail > 0 {
- if len(output) > int(opts.tail) {
- start = len(output) - int(opts.tail)
- }
- }
- return output[start:]
-}
-
-func printLine(line string, opts logOptions) {
- start := 3
- fields := strings.Fields(line)
- if opts.showTimestamps || !isStringTimestamp(fields[0]) {
- start = 0
- }
- if opts.sinceTime.IsZero() || logSinceTime(opts.sinceTime, fields[0]) {
- output := strings.Join(fields[start:], " ")
- fmt.Printf("%s\n", output)
- }
-}
-
-func isStringTimestamp(t string) bool {
- _, err := time.Parse("2006-01-02T15:04:05.999999999-07:00", t)
- if err != nil {
- return false
- }
- return true
-}
-
-// returns true if the time stamps of the logs are equal to or after the
-// timestamp comparing to
-func logSinceTime(sinceTime time.Time, logStr string) bool {
- timestamp := strings.Split(logStr, " ")[0]
- logTime, err := time.Parse("2006-01-02T15:04:05.999999999-07:00", timestamp)
- if err != nil {
- return false
- }
- return logTime.After(sinceTime) || logTime.Equal(sinceTime)
-}
-
-// secondSpaceIndex returns the index of the second space in a string
-// In a line of the logs, the first two tokens are a timestamp and stdout/stderr,
-// followed by the message itself. This allows us to get the index of the message
-// and avoid sending the other information back to the caller of GetLogs()
-func secondSpaceIndex(line string) int {
- index := strings.Index(line, " ")
- if index == -1 {
- return 0
- }
- index = strings.Index(line[index:], " ")
- if index == -1 {
- return 0
- }
- return index
+ return logs.ReadLogs(logPath, ctr, opts)
}
// parseInputTime takes the users input and to determine if it is valid and
diff --git a/pkg/logs/logs.go b/pkg/logs/logs.go
new file mode 100644
index 000000000..b104c592b
--- /dev/null
+++ b/pkg/logs/logs.go
@@ -0,0 +1,344 @@
+/*
+This package picks up CRI parsing and writer for the logs from the kubernetes
+logs package. These two bits have been modified to fit the requirements of libpod.
+
+Copyright 2017 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package logs
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "time"
+
+ "github.com/containers/libpod/libpod"
+ "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+)
+
+const (
+ // timeFormat is the time format used in the log.
+ timeFormat = time.RFC3339Nano
+)
+
+// LogStreamType is the type of the stream in CRI container log.
+type LogStreamType string
+
+const (
+ // Stdout is the stream type for stdout.
+ Stdout LogStreamType = "stdout"
+ // Stderr is the stream type for stderr.
+ Stderr LogStreamType = "stderr"
+)
+
+// LogTag is the tag of a log line in CRI container log.
+// Currently defined log tags:
+// * First tag: Partial/Full - P/F.
+// The field in the container log format can be extended to include multiple
+// tags by using a delimiter, but changes should be rare.
+type LogTag string
+
+const (
+ // LogTagPartial means the line is part of multiple lines.
+ LogTagPartial LogTag = "P"
+ // LogTagFull means the line is a single full line or the end of multiple lines.
+ LogTagFull LogTag = "F"
+ // LogTagDelimiter is the delimiter for different log tags.
+ LogTagDelimiter = ":"
+)
+
+var (
+ // eol is the end-of-line sign in the log.
+ eol = []byte{'\n'}
+ // delimiter is the delimiter for timestamp and stream type in log line.
+ delimiter = []byte{' '}
+ // tagDelimiter is the delimiter for log tags.
+ tagDelimiter = []byte(LogTagDelimiter)
+)
+
+// logMessage is the CRI internal log type.
+type logMessage struct {
+ timestamp time.Time
+ stream LogStreamType
+ log []byte
+}
+
+// LogOptions is the options you can use for logs
+type LogOptions struct {
+ Details bool
+ Follow bool
+ Since time.Time
+ Tail uint64
+ Timestamps bool
+ bytes int64
+}
+
+// reset resets the log to nil.
+func (l *logMessage) reset() {
+ l.timestamp = time.Time{}
+ l.stream = ""
+ l.log = nil
+}
+
+// parseCRILog parses logs in CRI log format. CRI Log format example:
+// 2016-10-06T00:17:09.669794202Z stdout P log content 1
+// 2016-10-06T00:17:09.669794203Z stderr F log content 2
+func parseCRILog(log []byte, msg *logMessage) error {
+ var err error
+ // Parse timestamp
+ idx := bytes.Index(log, delimiter)
+ if idx < 0 {
+ return fmt.Errorf("timestamp is not found")
+ }
+ msg.timestamp, err = time.Parse(timeFormat, string(log[:idx]))
+ if err != nil {
+ return fmt.Errorf("unexpected timestamp format %q: %v", timeFormat, err)
+ }
+
+ // Parse stream type
+ log = log[idx+1:]
+ idx = bytes.Index(log, delimiter)
+ if idx < 0 {
+ return fmt.Errorf("stream type is not found")
+ }
+ msg.stream = LogStreamType(log[:idx])
+ if msg.stream != Stdout && msg.stream != Stderr {
+ return fmt.Errorf("unexpected stream type %q", msg.stream)
+ }
+
+ // Parse log tag
+ log = log[idx+1:]
+ idx = bytes.Index(log, delimiter)
+ if idx < 0 {
+ return fmt.Errorf("log tag is not found")
+ }
+ // Keep this forward compatible.
+ tags := bytes.Split(log[:idx], tagDelimiter)
+ partial := (LogTag(tags[0]) == LogTagPartial)
+ // Trim the tailing new line if this is a partial line.
+ if partial && len(log) > 0 && log[len(log)-1] == '\n' {
+ log = log[:len(log)-1]
+ }
+
+ // Get log content
+ msg.log = log[idx+1:]
+
+ return nil
+}
+
+// ReadLogs reads in the logs from the logPath
+func ReadLogs(logPath string, ctr *libpod.Container, opts *LogOptions) error {
+ file, err := os.Open(logPath)
+ if err != nil {
+ return errors.Wrapf(err, "failed to open log file %q", logPath)
+ }
+ defer file.Close()
+
+ msg := &logMessage{}
+ opts.bytes = -1
+ writer := newLogWriter(opts)
+ reader := bufio.NewReader(file)
+
+ if opts.Follow {
+ followLog(reader, writer, opts, ctr, msg, logPath)
+ } else {
+ dumpLog(reader, writer, opts, msg, logPath)
+ }
+ return err
+}
+
+func followLog(reader *bufio.Reader, writer *logWriter, opts *LogOptions, ctr *libpod.Container, msg *logMessage, logPath string) error {
+ var cacheOutput []string
+ firstPass := false
+ if opts.Tail > 0 {
+ firstPass = true
+ }
+ // We need to read the entire file in here until we reach EOF
+ // and then dump it out in the case that the user also wants
+ // tail output
+ for {
+ line, err := reader.ReadString(eol[0])
+ if err == io.EOF && opts.Follow {
+ if firstPass {
+ firstPass = false
+ cacheLen := int64(len(cacheOutput))
+ start := int64(0)
+ if cacheLen > int64(opts.Tail) {
+ start = cacheLen - int64(opts.Tail)
+ }
+ for i := start; i < cacheLen; i++ {
+ msg.reset()
+ if err := parseCRILog([]byte(cacheOutput[i]), msg); err != nil {
+ return errors.Wrapf(err, "error parsing log line")
+ }
+ // Write the log line into the stream.
+ if err := writer.write(msg); err != nil {
+ if err == errMaximumWrite {
+ logrus.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", logPath, opts.bytes)
+ return nil
+ }
+ logrus.Errorf("Failed with err %v when writing log for log file %q: %+v", err, logPath, msg)
+ return err
+ }
+ }
+ continue
+ }
+ time.Sleep(1 * time.Second)
+ // Check if container is still running or paused
+ state, err := ctr.State()
+ if err != nil {
+ return err
+ }
+ if state != libpod.ContainerStateRunning && state != libpod.ContainerStatePaused {
+ break
+ }
+ continue
+ }
+ // exits
+ if err != nil {
+ break
+ }
+ if firstPass {
+ cacheOutput = append(cacheOutput, line)
+ continue
+ }
+ msg.reset()
+ if err := parseCRILog([]byte(line), msg); err != nil {
+ return errors.Wrapf(err, "error parsing log line")
+ }
+ // Write the log line into the stream.
+ if err := writer.write(msg); err != nil {
+ if err == errMaximumWrite {
+ logrus.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", logPath, opts.bytes)
+ return nil
+ }
+ logrus.Errorf("Failed with err %v when writing log for log file %q: %+v", err, logPath, msg)
+ return err
+ }
+ }
+ return nil
+}
+
+func dumpLog(reader *bufio.Reader, writer *logWriter, opts *LogOptions, msg *logMessage, logPath string) error {
+ output := readLog(reader, opts)
+ for _, line := range output {
+ msg.reset()
+ if err := parseCRILog([]byte(line), msg); err != nil {
+ return errors.Wrapf(err, "error parsing log line")
+ }
+ // Write the log line into the stream.
+ if err := writer.write(msg); err != nil {
+ if err == errMaximumWrite {
+ logrus.Infof("Finish parsing log file %q, hit bytes limit %d(bytes)", logPath, opts.bytes)
+ return nil
+ }
+ logrus.Errorf("Failed with err %v when writing log for log file %q: %+v", err, logPath, msg)
+ return err
+ }
+ }
+
+ return nil
+}
+
+func readLog(reader *bufio.Reader, opts *LogOptions) []string {
+ var output []string
+ for {
+ line, err := reader.ReadString(eol[0])
+ if err != nil {
+ break
+ }
+ output = append(output, line)
+ }
+ start := 0
+ if opts.Tail > 0 {
+ if len(output) > int(opts.Tail) {
+ start = len(output) - int(opts.Tail)
+ }
+ }
+ return output[start:]
+}
+
+// logWriter controls the writing into the stream based on the log options.
+type logWriter struct {
+ stdout io.Writer
+ stderr io.Writer
+ opts *LogOptions
+ remain int64
+}
+
+// errMaximumWrite is returned when all bytes have been written.
+var errMaximumWrite = errors.New("maximum write")
+
+// errShortWrite is returned when the message is not fully written.
+var errShortWrite = errors.New("short write")
+
+func newLogWriter(opts *LogOptions) *logWriter {
+ w := &logWriter{
+ stdout: os.Stdout,
+ stderr: os.Stderr,
+ opts: opts,
+ remain: math.MaxInt64, // initialize it as infinity
+ }
+ if opts.bytes >= 0 {
+ w.remain = opts.bytes
+ }
+ return w
+}
+
+// writeLogs writes logs into stdout, stderr.
+func (w *logWriter) write(msg *logMessage) error {
+ if msg.timestamp.Before(w.opts.Since) {
+ // Skip the line because it's older than since
+ return nil
+ }
+ line := msg.log
+ if w.opts.Timestamps {
+ prefix := append([]byte(msg.timestamp.Format(timeFormat)), delimiter[0])
+ line = append(prefix, line...)
+ }
+ // If the line is longer than the remaining bytes, cut it.
+ if int64(len(line)) > w.remain {
+ line = line[:w.remain]
+ }
+ // Get the proper stream to write to.
+ var stream io.Writer
+ switch msg.stream {
+ case Stdout:
+ stream = w.stdout
+ case Stderr:
+ stream = w.stderr
+ default:
+ return fmt.Errorf("unexpected stream type %q", msg.stream)
+ }
+ n, err := stream.Write(line)
+ w.remain -= int64(n)
+ if err != nil {
+ return err
+ }
+ // If the line has not been fully written, return errShortWrite
+ if n < len(line) {
+ return errShortWrite
+ }
+ // If there are no more bytes left, return errMaximumWrite
+ if w.remain <= 0 {
+ return errMaximumWrite
+ }
+ return nil
+}