summaryrefslogtreecommitdiff
path: root/vendor/github.com/coreos/go-systemd/sdjournal/read.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/coreos/go-systemd/sdjournal/read.go')
-rw-r--r--vendor/github.com/coreos/go-systemd/sdjournal/read.go260
1 files changed, 260 insertions, 0 deletions
diff --git a/vendor/github.com/coreos/go-systemd/sdjournal/read.go b/vendor/github.com/coreos/go-systemd/sdjournal/read.go
new file mode 100644
index 000000000..b581f03b4
--- /dev/null
+++ b/vendor/github.com/coreos/go-systemd/sdjournal/read.go
@@ -0,0 +1,260 @@
+// Copyright 2015 RedHat, Inc.
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdjournal
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "strings"
+ "time"
+)
+
+var (
+ ErrExpired = errors.New("Timeout expired")
+)
+
+// JournalReaderConfig represents options to drive the behavior of a JournalReader.
+type JournalReaderConfig struct {
+ // The Since, NumFromTail and Cursor options are mutually exclusive and
+ // determine where the reading begins within the journal. The order in which
+ // options are written is exactly the order of precedence.
+ Since time.Duration // start relative to a Duration from now
+ NumFromTail uint64 // start relative to the tail
+ Cursor string // start relative to the cursor
+
+ // Show only journal entries whose fields match the supplied values. If
+ // the array is empty, entries will not be filtered.
+ Matches []Match
+
+ // If not empty, the journal instance will point to a journal residing
+ // in this directory. The supplied path may be relative or absolute.
+ Path string
+}
+
+// JournalReader is an io.ReadCloser which provides a simple interface for iterating through the
+// systemd journal. A JournalReader is not safe for concurrent use by multiple goroutines.
+type JournalReader struct {
+ journal *Journal
+ msgReader *strings.Reader
+}
+
+// NewJournalReader creates a new JournalReader with configuration options that are similar to the
+// systemd journalctl tool's iteration and filtering features.
+func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) {
+ r := &JournalReader{}
+
+ // Open the journal
+ var err error
+ if config.Path != "" {
+ r.journal, err = NewJournalFromDir(config.Path)
+ } else {
+ r.journal, err = NewJournal()
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ // Add any supplied matches
+ for _, m := range config.Matches {
+ r.journal.AddMatch(m.String())
+ }
+
+ // Set the start position based on options
+ if config.Since != 0 {
+ // Start based on a relative time
+ start := time.Now().Add(config.Since)
+ if err := r.journal.SeekRealtimeUsec(uint64(start.UnixNano() / 1000)); err != nil {
+ return nil, err
+ }
+ } else if config.NumFromTail != 0 {
+ // Start based on a number of lines before the tail
+ if err := r.journal.SeekTail(); err != nil {
+ return nil, err
+ }
+
+ // Move the read pointer into position near the tail. Go one further than
+ // the option so that the initial cursor advancement positions us at the
+ // correct starting point.
+ skip, err := r.journal.PreviousSkip(config.NumFromTail + 1)
+ if err != nil {
+ return nil, err
+ }
+ // If we skipped fewer lines than expected, we have reached journal start.
+ // Thus, we seek to head so that next invocation can read the first line.
+ if skip != config.NumFromTail+1 {
+ if err := r.journal.SeekHead(); err != nil {
+ return nil, err
+ }
+ }
+ } else if config.Cursor != "" {
+ // Start based on a custom cursor
+ if err := r.journal.SeekCursor(config.Cursor); err != nil {
+ return nil, err
+ }
+ }
+
+ return r, nil
+}
+
+// Read reads entries from the journal. Read follows the Reader interface so
+// it must be able to read a specific amount of bytes. Journald on the other
+// hand only allows us to read full entries of arbitrary size (without byte
+// granularity). JournalReader is therefore internally buffering entries that
+// don't fit in the read buffer. Callers should keep calling until 0 and/or an
+// error is returned.
+func (r *JournalReader) Read(b []byte) (int, error) {
+ var err error
+
+ if r.msgReader == nil {
+ var c uint64
+
+ // Advance the journal cursor. It has to be called at least one time
+ // before reading
+ c, err = r.journal.Next()
+
+ // An unexpected error
+ if err != nil {
+ return 0, err
+ }
+
+ // EOF detection
+ if c == 0 {
+ return 0, io.EOF
+ }
+
+ // Build a message
+ var msg string
+ msg, err = r.buildMessage()
+
+ if err != nil {
+ return 0, err
+ }
+ r.msgReader = strings.NewReader(msg)
+ }
+
+ // Copy and return the message
+ var sz int
+ sz, err = r.msgReader.Read(b)
+ if err == io.EOF {
+ // The current entry has been fully read. Don't propagate this
+ // EOF, so the next entry can be read at the next Read()
+ // iteration.
+ r.msgReader = nil
+ return sz, nil
+ }
+ if err != nil {
+ return sz, err
+ }
+ if r.msgReader.Len() == 0 {
+ r.msgReader = nil
+ }
+
+ return sz, nil
+}
+
+// Close closes the JournalReader's handle to the journal.
+func (r *JournalReader) Close() error {
+ return r.journal.Close()
+}
+
+// Rewind attempts to rewind the JournalReader to the first entry.
+func (r *JournalReader) Rewind() error {
+ r.msgReader = nil
+ return r.journal.SeekHead()
+}
+
+// Follow synchronously follows the JournalReader, writing each new journal entry to writer. The
+// follow will continue until a single time.Time is received on the until channel.
+func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) {
+
+ // Process journal entries and events. Entries are flushed until the tail or
+ // timeout is reached, and then we wait for new events or the timeout.
+ var msg = make([]byte, 64*1<<(10))
+process:
+ for {
+ c, err := r.Read(msg)
+ if err != nil && err != io.EOF {
+ break process
+ }
+
+ select {
+ case <-until:
+ return ErrExpired
+ default:
+ if c > 0 {
+ if _, err = writer.Write(msg[:c]); err != nil {
+ break process
+ }
+ continue process
+ }
+ }
+
+ // We're at the tail, so wait for new events or time out.
+ // Holds journal events to process. Tightly bounded for now unless there's a
+ // reason to unblock the journal watch routine more quickly.
+ events := make(chan int, 1)
+ pollDone := make(chan bool, 1)
+ go func() {
+ for {
+ select {
+ case <-pollDone:
+ return
+ default:
+ events <- r.journal.Wait(time.Duration(1) * time.Second)
+ }
+ }
+ }()
+
+ select {
+ case <-until:
+ pollDone <- true
+ return ErrExpired
+ case e := <-events:
+ pollDone <- true
+ switch e {
+ case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE:
+ // TODO: need to account for any of these?
+ default:
+ log.Printf("Received unknown event: %d\n", e)
+ }
+ continue process
+ }
+ }
+
+ return
+}
+
+// buildMessage returns a string representing the current journal entry in a simple format which
+// includes the entry timestamp and MESSAGE field.
+func (r *JournalReader) buildMessage() (string, error) {
+ var msg string
+ var usec uint64
+ var err error
+
+ if msg, err = r.journal.GetData("MESSAGE"); err != nil {
+ return "", err
+ }
+
+ if usec, err = r.journal.GetRealtimeUsec(); err != nil {
+ return "", err
+ }
+
+ timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond))
+
+ return fmt.Sprintf("%s %s\n", timestamp, msg), nil
+}