summaryrefslogtreecommitdiff
path: root/vendor/github.com/coreos/go-systemd/v22/sdjournal/read.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/coreos/go-systemd/v22/sdjournal/read.go')
-rw-r--r--vendor/github.com/coreos/go-systemd/v22/sdjournal/read.go272
1 files changed, 272 insertions, 0 deletions
diff --git a/vendor/github.com/coreos/go-systemd/v22/sdjournal/read.go b/vendor/github.com/coreos/go-systemd/v22/sdjournal/read.go
new file mode 100644
index 000000000..51a060fb5
--- /dev/null
+++ b/vendor/github.com/coreos/go-systemd/v22/sdjournal/read.go
@@ -0,0 +1,272 @@
+// Copyright 2015 RedHat, Inc.
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdjournal
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ // ErrExpired gets returned when the Follow function runs into the
+ // specified timeout.
+ ErrExpired = errors.New("Timeout expired")
+)
+
+// JournalReaderConfig represents options to drive the behavior of a JournalReader.
+type JournalReaderConfig struct {
+ // The Since, NumFromTail and Cursor options are mutually exclusive and
+ // determine where the reading begins within the journal. The order in which
+ // options are written is exactly the order of precedence.
+ Since time.Duration // start relative to a Duration from now
+ NumFromTail uint64 // start relative to the tail
+ Cursor string // start relative to the cursor
+
+ // Show only journal entries whose fields match the supplied values. If
+ // the array is empty, entries will not be filtered.
+ Matches []Match
+
+ // If not empty, the journal instance will point to a journal residing
+ // in this directory. The supplied path may be relative or absolute.
+ Path string
+
+ // If not nil, Formatter will be used to translate the resulting entries
+ // into strings. If not set, the default format (timestamp and message field)
+ // will be used. If Formatter returns an error, Read will stop and return the error.
+ Formatter func(entry *JournalEntry) (string, error)
+}
+
+// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
+// systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
+type JournalReader struct {
+ journal *Journal
+ msgReader *strings.Reader
+ formatter func(entry *JournalEntry) (string, error)
+}
+
+// NewJournalReader creates a new JournalReader with configuration options that are similar to the
+// systemd journalctl tool's iteration and filtering features.
+func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
+ // use simpleMessageFormatter as default formatter.
+ if config.Formatter == nil {
+ config.Formatter = simpleMessageFormatter
+ }
+
+ r := &JournalReader{
+ formatter: config.Formatter,
+ }
+
+ // Open the journal
+ var err error
+ if config.Path != "" {
+ r.journal, err = NewJournalFromDir(config.Path)
+ } else {
+ r.journal, err = NewJournal()
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ // Add any supplied matches
+ for _, m := range config.Matches {
+ if err = r.journal.AddMatch(m.String()); err != nil {
+ return nil, err
+ }
+ }
+
+ // Set the start position based on options
+ if config.Since != 0 {
+ // Start based on a relative time
+ start := time.Now().Add(config.Since)
+ if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
+ return nil, err
+ }
+ } else if config.NumFromTail != 0 {
+ // Start based on a number of lines before the tail
+ if err := r.journal.SeekTail(); err != nil {
+ return nil, err
+ }
+
+ // Move the read pointer into position near the tail. Go one further than
+ // the option so that the initial cursor advancement positions us at the
+ // correct starting point.
+ skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
+ if err != nil {
+ return nil, err
+ }
+ // If we skipped fewer lines than expected, we have reached journal start.
+ // Thus, we seek to head so that next invocation can read the first line.
+ if skip != config.NumFromTail+1 {
+ if err := r.journal.SeekHead(); err != nil {
+ return nil, err
+ }
+ }
+ } else if config.Cursor != "" {
+ // Start based on a custom cursor
+ if err := r.journal.SeekCursor(config.Cursor); err != nil {
+ return nil, err
+ }
+ }
+
+ return r, nil
+}
+
+// Read reads entries from the journal. Read follows the Reader interface so
+// it must be able to read a specific amount of bytes. Journald on the other
+// hand only allows us to read full entries of arbitrary size (without byte
+// granularity). JournalReader is therefore internally buffering entries that
+// don't fit in the read buffer. Callers should keep calling until 0 and/or an
+// error is returned.
+func (r *JournalReader) Read(b []byte) (int, error) {
+ if r.msgReader == nil {
+ // Advance the journal cursor. It has to be called at least one time
+ // before reading
+ c, err := r.journal.Next()
+
+ // An unexpected error
+ if err != nil {
+ return 0, err
+ }
+
+ // EOF detection
+ if c == 0 {
+ return 0, io.EOF
+ }
+
+ entry, err := r.journal.GetEntry()
+ if err != nil {
+ return 0, err
+ }
+
+ // Build a message
+ msg, err := r.formatter(entry)
+ if err != nil {
+ return 0, err
+ }
+ r.msgReader = strings.NewReader(msg)
+ }
+
+ // Copy and return the message
+ sz, err := r.msgReader.Read(b)
+ if err == io.EOF {
+ // The current entry has been fully read. Don't propagate this
+ // EOF, so the next entry can be read at the next Read()
+ // iteration.
+ r.msgReader = nil
+ return sz, nil
+ }
+ if err != nil {
+ return sz, err
+ }
+ if r.msgReader.Len() == 0 {
+ r.msgReader = nil
+ }
+
+ return sz, nil
+}
+
+// Close closes the JournalReader's handle to the journal.
+func (r *JournalReader) Close() error {
+ return r.journal.Close()
+}
+
+// Rewind attempts to rewind the JournalReader to the first entry.
+func (r *JournalReader) Rewind() error {
+ r.msgReader = nil
+ return r.journal.SeekHead()
+}
+
+// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
+// follow will continue until a single time.Time is received on the until channel.
+func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error {
+
+ // Process journal entries and events. Entries are flushed until the tail or
+ // timeout is reached, and then we wait for new events or the timeout.
+ var msg = make([]byte, 64*1<<(10))
+ var waitCh = make(chan int, 1)
+ var waitGroup sync.WaitGroup
+ defer waitGroup.Wait()
+
+process:
+ for {
+ c, err := r.Read(msg)
+ if err != nil && err != io.EOF {
+ return err
+ }
+
+ select {
+ case <-until:
+ return ErrExpired
+ default:
+ }
+ if c > 0 {
+ if _, err = writer.Write(msg[:c]); err != nil {
+ return err
+ }
+ continue process
+ }
+
+ // We're at the tail, so wait for new events or time out.
+ // Holds journal events to process. Tightly bounded for now unless there's a
+ // reason to unblock the journal watch routine more quickly.
+ for {
+ waitGroup.Add(1)
+ go func() {
+ status := r.journal.Wait(100 * time.Millisecond)
+ waitCh <- status
+ waitGroup.Done()
+ }()
+
+ select {
+ case <-until:
+ return ErrExpired
+ case e := <-waitCh:
+ switch e {
+ case SD_JOURNAL_NOP:
+ // the journal did not change since the last invocation
+ case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
+ continue process
+ default:
+ if e < 0 {
+ return fmt.Errorf("received error event: %d", e)
+ }
+
+ log.Printf("received unknown event: %d\n", e)
+ }
+ }
+ }
+ }
+}
+
+// simpleMessageFormatter is the default formatter.
+// It returns a string representing the current journal entry in a simple format which
+// includes the entry timestamp and MESSAGE field.
+func simpleMessageFormatter(entry *JournalEntry) (string, error) {
+ msg, ok := entry.Fields["MESSAGE"]
+ if !ok {
+ return "", fmt.Errorf("no MESSAGE field present in journal entry")
+ }
+
+ usec := entry.RealtimeTimestamp
+ timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
+
+ return fmt.Sprintf("%s %s\n", timestamp, msg), nil
+}