diff options
-rw-r--r-- | vendor.conf | 2 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/NOTICE | 5 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/README.md | 21 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/activation/files.go | 19 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/activation/listeners.go | 51 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/activation/packetconns.go | 5 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/dbus/dbus.go | 31 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/dbus/methods.go | 43 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/dbus/set.go | 2 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/dbus/subscription.go | 157 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/journal/journal.go | 7 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/sdjournal/journal.go | 134 | ||||
-rw-r--r-- | vendor/github.com/coreos/go-systemd/sdjournal/read.go | 124 |
13 files changed, 464 insertions, 137 deletions
diff --git a/vendor.conf b/vendor.conf index 5c41d6908..65bfcd77a 100644 --- a/vendor.conf +++ b/vendor.conf @@ -21,7 +21,7 @@ github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1 github.com/containers/storage v1.12.7 github.com/containers/psgo v1.3.0 -github.com/coreos/go-systemd v14 +github.com/coreos/go-systemd v17 github.com/coreos/pkg v4 github.com/cri-o/ocicni 0c180f981b27ef6036fa5be29bcb4dd666e406eb github.com/cyphar/filepath-securejoin v0.2.1 diff --git a/vendor/github.com/coreos/go-systemd/NOTICE b/vendor/github.com/coreos/go-systemd/NOTICE new file mode 100644 index 000000000..23a0ada2f --- /dev/null +++ b/vendor/github.com/coreos/go-systemd/NOTICE @@ -0,0 +1,5 @@ +CoreOS Project +Copyright 2018 CoreOS, Inc + +This product includes software developed at CoreOS, Inc. +(http://www.coreos.com/). diff --git a/vendor/github.com/coreos/go-systemd/README.md b/vendor/github.com/coreos/go-systemd/README.md index cb87a1124..cad04a803 100644 --- a/vendor/github.com/coreos/go-systemd/README.md +++ b/vendor/github.com/coreos/go-systemd/README.md @@ -6,9 +6,11 @@ Go bindings to systemd. The project has several packages: - `activation` - for writing and using socket activation from Go +- `daemon` - for notifying systemd of service status changes - `dbus` - for starting/stopping/inspecting running services and units - `journal` - for writing to systemd's logging service, journald - `sdjournal` - for reading from journald by wrapping its C API +- `login1` - for integration with the systemd logind API - `machine1` - for registering machines/containers with systemd - `unit` - for (de)serialization and comparison of unit files @@ -18,10 +20,9 @@ An example HTTP server using socket activation can be quickly set up by followin https://github.com/coreos/go-systemd/tree/master/examples/activation/httpserver -## Journal +## systemd Service Notification -Using the pure-Go `journal` package you can submit journal entries directly to systemd's journal, taking advantage of features like indexed key/value pairs for each log entry. -The `sdjournal` package provides read access to the journal by wrapping around journald's native C API; consequently it requires cgo and the journal headers to be available. +The `daemon` package is an implementation of the [sd_notify protocol](https://www.freedesktop.org/software/systemd/man/sd_notify.html#Description). It can be used to inform systemd of service start-up completion, watchdog events, and other status changes. ## D-Bus @@ -45,6 +46,20 @@ Create `/etc/dbus-1/system-local.conf` that looks like this: </busconfig> ``` +## Journal + +### Writing to the Journal + +Using the pure-Go `journal` package you can submit journal entries directly to systemd's journal, taking advantage of features like indexed key/value pairs for each log entry. + +### Reading from the Journal + +The `sdjournal` package provides read access to the journal by wrapping around journald's native C API; consequently it requires cgo and the journal headers to be available. + +## logind + +The `login1` package provides functions to integrate with the [systemd logind API](http://www.freedesktop.org/wiki/Software/systemd/logind/). + ## machined The `machine1` package allows interaction with the [systemd machined D-Bus API](http://www.freedesktop.org/wiki/Software/systemd/machined/). diff --git a/vendor/github.com/coreos/go-systemd/activation/files.go b/vendor/github.com/coreos/go-systemd/activation/files.go index c8e85fcd5..29dd18def 100644 --- a/vendor/github.com/coreos/go-systemd/activation/files.go +++ b/vendor/github.com/coreos/go-systemd/activation/files.go @@ -18,18 +18,26 @@ package activation import ( "os" "strconv" + "strings" "syscall" ) -// based on: https://gist.github.com/alberts/4640792 const ( + // listenFdsStart corresponds to `SD_LISTEN_FDS_START`. listenFdsStart = 3 ) +// Files returns a slice containing a `os.File` object for each +// file descriptor passed to this process via systemd fd-passing protocol. +// +// The order of the file descriptors is preserved in the returned slice. +// `unsetEnv` is typically set to `true` in order to avoid clashes in +// fd usage and to avoid leaking environment flags to child processes. func Files(unsetEnv bool) []*os.File { if unsetEnv { defer os.Unsetenv("LISTEN_PID") defer os.Unsetenv("LISTEN_FDS") + defer os.Unsetenv("LISTEN_FDNAMES") } pid, err := strconv.Atoi(os.Getenv("LISTEN_PID")) @@ -42,10 +50,17 @@ func Files(unsetEnv bool) []*os.File { return nil } + names := strings.Split(os.Getenv("LISTEN_FDNAMES"), ":") + files := make([]*os.File, 0, nfds) for fd := listenFdsStart; fd < listenFdsStart+nfds; fd++ { syscall.CloseOnExec(fd) - files = append(files, os.NewFile(uintptr(fd), "LISTEN_FD_"+strconv.Itoa(fd))) + name := "LISTEN_FD_" + strconv.Itoa(fd) + offset := fd - listenFdsStart + if offset < len(names) && len(names[offset]) > 0 { + name = names[offset] + } + files = append(files, os.NewFile(uintptr(fd), name)) } return files diff --git a/vendor/github.com/coreos/go-systemd/activation/listeners.go b/vendor/github.com/coreos/go-systemd/activation/listeners.go index fd5dfc709..bb5cc2311 100644 --- a/vendor/github.com/coreos/go-systemd/activation/listeners.go +++ b/vendor/github.com/coreos/go-systemd/activation/listeners.go @@ -25,13 +25,33 @@ import ( // The order of the file descriptors is preserved in the returned slice. // Nil values are used to fill any gaps. For example if systemd were to return file descriptors // corresponding with "udp, tcp, tcp", then the slice would contain {nil, net.Listener, net.Listener} -func Listeners(unsetEnv bool) ([]net.Listener, error) { - files := Files(unsetEnv) +func Listeners() ([]net.Listener, error) { + files := Files(true) listeners := make([]net.Listener, len(files)) for i, f := range files { if pc, err := net.FileListener(f); err == nil { listeners[i] = pc + f.Close() + } + } + return listeners, nil +} + +// ListenersWithNames maps a listener name to a set of net.Listener instances. +func ListenersWithNames() (map[string][]net.Listener, error) { + files := Files(true) + listeners := map[string][]net.Listener{} + + for _, f := range files { + if pc, err := net.FileListener(f); err == nil { + current, ok := listeners[f.Name()] + if !ok { + listeners[f.Name()] = []net.Listener{pc} + } else { + listeners[f.Name()] = append(current, pc) + } + f.Close() } } return listeners, nil @@ -40,8 +60,8 @@ func Listeners(unsetEnv bool) ([]net.Listener, error) { // TLSListeners returns a slice containing a net.listener for each matching TCP socket type // passed to this process. // It uses default Listeners func and forces TCP sockets handlers to use TLS based on tlsConfig. -func TLSListeners(unsetEnv bool, tlsConfig *tls.Config) ([]net.Listener, error) { - listeners, err := Listeners(unsetEnv) +func TLSListeners(tlsConfig *tls.Config) ([]net.Listener, error) { + listeners, err := Listeners() if listeners == nil || err != nil { return nil, err @@ -58,3 +78,26 @@ func TLSListeners(unsetEnv bool, tlsConfig *tls.Config) ([]net.Listener, error) return listeners, err } + +// TLSListenersWithNames maps a listener name to a net.Listener with +// the associated TLS configuration. +func TLSListenersWithNames(tlsConfig *tls.Config) (map[string][]net.Listener, error) { + listeners, err := ListenersWithNames() + + if listeners == nil || err != nil { + return nil, err + } + + if tlsConfig != nil && err == nil { + for _, ll := range listeners { + // Activate TLS only for TCP sockets + for i, l := range ll { + if l.Addr().Network() == "tcp" { + ll[i] = tls.NewListener(l, tlsConfig) + } + } + } + } + + return listeners, err +} diff --git a/vendor/github.com/coreos/go-systemd/activation/packetconns.go b/vendor/github.com/coreos/go-systemd/activation/packetconns.go index 48b2ca029..a97206785 100644 --- a/vendor/github.com/coreos/go-systemd/activation/packetconns.go +++ b/vendor/github.com/coreos/go-systemd/activation/packetconns.go @@ -24,13 +24,14 @@ import ( // The order of the file descriptors is preserved in the returned slice. // Nil values are used to fill any gaps. For example if systemd were to return file descriptors // corresponding with "udp, tcp, udp", then the slice would contain {net.PacketConn, nil, net.PacketConn} -func PacketConns(unsetEnv bool) ([]net.PacketConn, error) { - files := Files(unsetEnv) +func PacketConns() ([]net.PacketConn, error) { + files := Files(true) conns := make([]net.PacketConn, len(files)) for i, f := range files { if pc, err := net.FilePacketConn(f); err == nil { conns[i] = pc + f.Close() } } return conns, nil diff --git a/vendor/github.com/coreos/go-systemd/dbus/dbus.go b/vendor/github.com/coreos/go-systemd/dbus/dbus.go index c1694fb52..1d54810af 100644 --- a/vendor/github.com/coreos/go-systemd/dbus/dbus.go +++ b/vendor/github.com/coreos/go-systemd/dbus/dbus.go @@ -16,6 +16,7 @@ package dbus import ( + "encoding/hex" "fmt" "os" "strconv" @@ -60,6 +61,27 @@ func PathBusEscape(path string) string { return string(n) } +// pathBusUnescape is the inverse of PathBusEscape. +func pathBusUnescape(path string) string { + if path == "_" { + return "" + } + n := []byte{} + for i := 0; i < len(path); i++ { + c := path[i] + if c == '_' && i+2 < len(path) { + res, err := hex.DecodeString(path[i+1 : i+3]) + if err == nil { + n = append(n, res...) + } + i += 2 + } else { + n = append(n, c) + } + } + return string(n) +} + // Conn is a connection to systemd's dbus endpoint. type Conn struct { // sysconn/sysobj are only used to call dbus methods @@ -74,13 +96,18 @@ type Conn struct { jobs map[dbus.ObjectPath]chan<- string sync.Mutex } - subscriber struct { + subStateSubscriber struct { updateCh chan<- *SubStateUpdate errCh chan<- error sync.Mutex ignore map[dbus.ObjectPath]int64 cleanIgnore int64 } + propertiesSubscriber struct { + updateCh chan<- *PropertiesUpdate + errCh chan<- error + sync.Mutex + } } // New establishes a connection to any available bus and authenticates. @@ -152,7 +179,7 @@ func NewConnection(dialBus func() (*dbus.Conn, error)) (*Conn, error) { sigobj: systemdObject(sigconn), } - c.subscriber.ignore = make(map[dbus.ObjectPath]int64) + c.subStateSubscriber.ignore = make(map[dbus.ObjectPath]int64) c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string) // Setup the listeners on jobs so that we can get completions diff --git a/vendor/github.com/coreos/go-systemd/dbus/methods.go b/vendor/github.com/coreos/go-systemd/dbus/methods.go index ab17f7cc7..0b4207229 100644 --- a/vendor/github.com/coreos/go-systemd/dbus/methods.go +++ b/vendor/github.com/coreos/go-systemd/dbus/methods.go @@ -1,4 +1,4 @@ -// Copyright 2015 CoreOS, Inc. +// Copyright 2015, 2018 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package dbus import ( "errors" + "fmt" "path" "strconv" @@ -148,14 +149,27 @@ func (c *Conn) ResetFailedUnit(name string) error { return c.sysobj.Call("org.freedesktop.systemd1.Manager.ResetFailedUnit", 0, name).Store() } -// getProperties takes the unit name and returns all of its dbus object properties, for the given dbus interface -func (c *Conn) getProperties(unit string, dbusInterface string) (map[string]interface{}, error) { +// SystemState returns the systemd state. Equivalent to `systemctl is-system-running`. +func (c *Conn) SystemState() (*Property, error) { + var err error + var prop dbus.Variant + + obj := c.sysconn.Object("org.freedesktop.systemd1", "/org/freedesktop/systemd1") + err = obj.Call("org.freedesktop.DBus.Properties.Get", 0, "org.freedesktop.systemd1.Manager", "SystemState").Store(&prop) + if err != nil { + return nil, err + } + + return &Property{Name: "SystemState", Value: prop}, nil +} + +// getProperties takes the unit path and returns all of its dbus object properties, for the given dbus interface +func (c *Conn) getProperties(path dbus.ObjectPath, dbusInterface string) (map[string]interface{}, error) { var err error var props map[string]dbus.Variant - path := unitPath(unit) if !path.IsValid() { - return nil, errors.New("invalid unit name: " + unit) + return nil, fmt.Errorf("invalid unit name: %v", path) } obj := c.sysconn.Object("org.freedesktop.systemd1", path) @@ -172,9 +186,15 @@ func (c *Conn) getProperties(unit string, dbusInterface string) (map[string]inte return out, nil } -// GetUnitProperties takes the unit name and returns all of its dbus object properties. +// GetUnitProperties takes the (unescaped) unit name and returns all of its dbus object properties. func (c *Conn) GetUnitProperties(unit string) (map[string]interface{}, error) { - return c.getProperties(unit, "org.freedesktop.systemd1.Unit") + path := unitPath(unit) + return c.getProperties(path, "org.freedesktop.systemd1.Unit") +} + +// GetUnitProperties takes the (escaped) unit path and returns all of its dbus object properties. +func (c *Conn) GetUnitPathProperties(path dbus.ObjectPath) (map[string]interface{}, error) { + return c.getProperties(path, "org.freedesktop.systemd1.Unit") } func (c *Conn) getProperty(unit string, dbusInterface string, propertyName string) (*Property, error) { @@ -208,7 +228,8 @@ func (c *Conn) GetServiceProperty(service string, propertyName string) (*Propert // Valid values for unitType: Service, Socket, Target, Device, Mount, Automount, Snapshot, Timer, Swap, Path, Slice, Scope // return "dbus.Error: Unknown interface" if the unitType is not the correct type of the unit func (c *Conn) GetUnitTypeProperties(unit string, unitType string) (map[string]interface{}, error) { - return c.getProperties(unit, "org.freedesktop.systemd1."+unitType) + path := unitPath(unit) + return c.getProperties(path, "org.freedesktop.systemd1."+unitType) } // SetUnitProperties() may be used to modify certain unit properties at runtime. @@ -292,6 +313,7 @@ func (c *Conn) ListUnitsByPatterns(states []string, patterns []string) ([]UnitSt // names and returns an UnitStatus array. Comparing to ListUnitsByPatterns // method, this method returns statuses even for inactive or non-existing // units. Input array should contain exact unit names, but not patterns. +// Note: Requires systemd v230 or higher func (c *Conn) ListUnitsByNames(units []string) ([]UnitStatus, error) { return c.listUnitsInternal(c.sysobj.Call("org.freedesktop.systemd1.Manager.ListUnitsByNames", 0, units).Store) } @@ -563,3 +585,8 @@ func (c *Conn) Reload() error { func unitPath(name string) dbus.ObjectPath { return dbus.ObjectPath("/org/freedesktop/systemd1/unit/" + PathBusEscape(name)) } + +// unitName returns the unescaped base element of the supplied escaped path +func unitName(dpath dbus.ObjectPath) string { + return pathBusUnescape(path.Base(string(dpath))) +} diff --git a/vendor/github.com/coreos/go-systemd/dbus/set.go b/vendor/github.com/coreos/go-systemd/dbus/set.go index f92e6fbed..17c5d4856 100644 --- a/vendor/github.com/coreos/go-systemd/dbus/set.go +++ b/vendor/github.com/coreos/go-systemd/dbus/set.go @@ -36,7 +36,7 @@ func (s *set) Length() int { } func (s *set) Values() (values []string) { - for val, _ := range s.data { + for val := range s.data { values = append(values, val) } return diff --git a/vendor/github.com/coreos/go-systemd/dbus/subscription.go b/vendor/github.com/coreos/go-systemd/dbus/subscription.go index 996451445..70e63a6f1 100644 --- a/vendor/github.com/coreos/go-systemd/dbus/subscription.go +++ b/vendor/github.com/coreos/go-systemd/dbus/subscription.go @@ -16,6 +16,7 @@ package dbus import ( "errors" + "log" "time" "github.com/godbus/dbus" @@ -36,22 +37,12 @@ func (c *Conn) Subscribe() error { c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged'") - err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store() - if err != nil { - return err - } - - return nil + return c.sigobj.Call("org.freedesktop.systemd1.Manager.Subscribe", 0).Store() } // Unsubscribe this connection from systemd dbus events. func (c *Conn) Unsubscribe() error { - err := c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store() - if err != nil { - return err - } - - return nil + return c.sigobj.Call("org.freedesktop.systemd1.Manager.Unsubscribe", 0).Store() } func (c *Conn) dispatch() { @@ -70,7 +61,8 @@ func (c *Conn) dispatch() { c.jobComplete(signal) } - if c.subscriber.updateCh == nil { + if c.subStateSubscriber.updateCh == nil && + c.propertiesSubscriber.updateCh == nil { continue } @@ -84,6 +76,12 @@ func (c *Conn) dispatch() { case "org.freedesktop.DBus.Properties.PropertiesChanged": if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" { unitPath = signal.Path + + if len(signal.Body) >= 2 { + if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok { + c.sendPropertiesUpdate(unitPath, changed) + } + } } } @@ -169,42 +167,80 @@ type SubStateUpdate struct { // is full, it attempts to write an error to errCh; if errCh is full, the error // passes silently. func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) { - c.subscriber.Lock() - defer c.subscriber.Unlock() - c.subscriber.updateCh = updateCh - c.subscriber.errCh = errCh + if c == nil { + msg := "nil receiver" + select { + case errCh <- errors.New(msg): + default: + log.Printf("full error channel while reporting: %s\n", msg) + } + return + } + + c.subStateSubscriber.Lock() + defer c.subStateSubscriber.Unlock() + c.subStateSubscriber.updateCh = updateCh + c.subStateSubscriber.errCh = errCh } -func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) { - c.subscriber.Lock() - defer c.subscriber.Unlock() +func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) { + c.subStateSubscriber.Lock() + defer c.subStateSubscriber.Unlock() + + if c.subStateSubscriber.updateCh == nil { + return + } - if c.shouldIgnore(path) { + isIgnored := c.shouldIgnore(unitPath) + defer c.cleanIgnore() + if isIgnored { return } - info, err := c.GetUnitProperties(string(path)) + info, err := c.GetUnitPathProperties(unitPath) if err != nil { select { - case c.subscriber.errCh <- err: + case c.subStateSubscriber.errCh <- err: default: + log.Printf("full error channel while reporting: %s\n", err) } + return } + defer c.updateIgnore(unitPath, info) - name := info["Id"].(string) - substate := info["SubState"].(string) + name, ok := info["Id"].(string) + if !ok { + msg := "failed to cast info.Id" + select { + case c.subStateSubscriber.errCh <- errors.New(msg): + default: + log.Printf("full error channel while reporting: %s\n", err) + } + return + } + substate, ok := info["SubState"].(string) + if !ok { + msg := "failed to cast info.SubState" + select { + case c.subStateSubscriber.errCh <- errors.New(msg): + default: + log.Printf("full error channel while reporting: %s\n", msg) + } + return + } update := &SubStateUpdate{name, substate} select { - case c.subscriber.updateCh <- update: + case c.subStateSubscriber.updateCh <- update: default: + msg := "update channel is full" select { - case c.subscriber.errCh <- errors.New("update channel full!"): + case c.subStateSubscriber.errCh <- errors.New(msg): default: + log.Printf("full error channel while reporting: %s\n", msg) } + return } - - c.updateIgnore(path, info) } // The ignore functions work around a wart in the systemd dbus interface. @@ -222,29 +258,76 @@ func (c *Conn) sendSubStateUpdate(path dbus.ObjectPath) { // the properties). func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool { - t, ok := c.subscriber.ignore[path] + t, ok := c.subStateSubscriber.ignore[path] return ok && t >= time.Now().UnixNano() } func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) { - c.cleanIgnore() + loadState, ok := info["LoadState"].(string) + if !ok { + return + } // unit is unloaded - it will trigger bad systemd dbus behavior - if info["LoadState"].(string) == "not-found" { - c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval + if loadState == "not-found" { + c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval } } // without this, ignore would grow unboundedly over time func (c *Conn) cleanIgnore() { now := time.Now().UnixNano() - if c.subscriber.cleanIgnore < now { - c.subscriber.cleanIgnore = now + cleanIgnoreInterval + if c.subStateSubscriber.cleanIgnore < now { + c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval - for p, t := range c.subscriber.ignore { + for p, t := range c.subStateSubscriber.ignore { if t < now { - delete(c.subscriber.ignore, p) + delete(c.subStateSubscriber.ignore, p) } } } } + +// PropertiesUpdate holds a map of a unit's changed properties +type PropertiesUpdate struct { + UnitName string + Changed map[string]dbus.Variant +} + +// SetPropertiesSubscriber writes to updateCh when any unit's properties +// change. Every property change reported by systemd will be sent; that is, no +// transitions will be "missed" (as they might be with SetSubStateSubscriber). +// However, state changes will only be written to the channel with non-blocking +// writes. If updateCh is full, it attempts to write an error to errCh; if +// errCh is full, the error passes silently. +func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) { + c.propertiesSubscriber.Lock() + defer c.propertiesSubscriber.Unlock() + c.propertiesSubscriber.updateCh = updateCh + c.propertiesSubscriber.errCh = errCh +} + +// we don't need to worry about shouldIgnore() here because +// sendPropertiesUpdate doesn't call GetProperties() +func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) { + c.propertiesSubscriber.Lock() + defer c.propertiesSubscriber.Unlock() + + if c.propertiesSubscriber.updateCh == nil { + return + } + + update := &PropertiesUpdate{unitName(unitPath), changedProps} + + select { + case c.propertiesSubscriber.updateCh <- update: + default: + msg := "update channel is full" + select { + case c.propertiesSubscriber.errCh <- errors.New(msg): + default: + log.Printf("full error channel while reporting: %s\n", msg) + } + return + } +} diff --git a/vendor/github.com/coreos/go-systemd/journal/journal.go b/vendor/github.com/coreos/go-systemd/journal/journal.go index 7f434990d..ef85a3ba2 100644 --- a/vendor/github.com/coreos/go-systemd/journal/journal.go +++ b/vendor/github.com/coreos/go-systemd/journal/journal.go @@ -103,7 +103,10 @@ func Send(message string, priority Priority, vars map[string]string) error { if !ok { return journalError("can't send file through non-Unix connection") } - unixConn.WriteMsgUnix([]byte{}, rights, nil) + _, _, err = unixConn.WriteMsgUnix([]byte{}, rights, nil) + if err != nil { + return journalError(err.Error()) + } } else if err != nil { return journalError(err.Error()) } @@ -165,7 +168,7 @@ func tempFd() (*os.File, error) { if err != nil { return nil, err } - syscall.Unlink(file.Name()) + err = syscall.Unlink(file.Name()) if err != nil { return nil, err } diff --git a/vendor/github.com/coreos/go-systemd/sdjournal/journal.go b/vendor/github.com/coreos/go-systemd/sdjournal/journal.go index b00d606c1..9f3d92342 100644 --- a/vendor/github.com/coreos/go-systemd/sdjournal/journal.go +++ b/vendor/github.com/coreos/go-systemd/sdjournal/journal.go @@ -47,6 +47,15 @@ package sdjournal // return sd_journal_open_directory(ret, path, flags); // } // +// int +// my_sd_journal_open_files(void *f, sd_journal **ret, const char **paths, int flags) +// { +// int (*sd_journal_open_files)(sd_journal **, const char **, int); +// +// sd_journal_open_files = f; +// return sd_journal_open_files(ret, paths, flags); +// } +// // void // my_sd_journal_close(void *f, sd_journal *j) // { @@ -282,9 +291,19 @@ package sdjournal // sd_journal_restart_unique(j); // } // +// int +// my_sd_journal_get_catalog(void *f, sd_journal *j, char **ret) +// { +// int(*sd_journal_get_catalog)(sd_journal *, char **); +// +// sd_journal_get_catalog = f; +// return sd_journal_get_catalog(j, ret); +// } +// import "C" import ( "bytes" + "errors" "fmt" "strings" "sync" @@ -352,6 +371,12 @@ const ( IndefiniteWait time.Duration = 1<<63 - 1 ) +var ( + // ErrNoTestCursor gets returned when using TestCursor function and cursor + // parameter is not the same as the current cursor position. + ErrNoTestCursor = errors.New("Cursor parameter is not the same as current position") +) + // Journal is a Go wrapper of an sd_journal structure. type Journal struct { cjournal *C.sd_journal @@ -396,8 +421,7 @@ func NewJournal() (j *Journal, err error) { } // NewJournalFromDir returns a new Journal instance pointing to a journal residing -// in a given directory. The supplied path may be relative or absolute; if -// relative, it will be converted to an absolute path before being opened. +// in a given directory. func NewJournalFromDir(path string) (j *Journal, err error) { j = &Journal{} @@ -417,6 +441,32 @@ func NewJournalFromDir(path string) (j *Journal, err error) { return j, nil } +// NewJournalFromFiles returns a new Journal instance pointing to a journals residing +// in a given files. +func NewJournalFromFiles(paths ...string) (j *Journal, err error) { + j = &Journal{} + + sd_journal_open_files, err := getFunction("sd_journal_open_files") + if err != nil { + return nil, err + } + + // by making the slice 1 elem too long, we guarantee it'll be null-terminated + cPaths := make([]*C.char, len(paths)+1) + for idx, path := range paths { + p := C.CString(path) + cPaths[idx] = p + defer C.free(unsafe.Pointer(p)) + } + + r := C.my_sd_journal_open_files(sd_journal_open_files, &j.cjournal, &cPaths[0], 0) + if r < 0 { + return nil, fmt.Errorf("failed to open journals in paths %q: %d", paths, syscall.Errno(-r)) + } + + return j, nil +} + // Close closes a journal opened with NewJournal. func (j *Journal) Close() error { sd_journal_close, err := getFunction("sd_journal_close") @@ -598,7 +648,8 @@ func (j *Journal) getData(field string) (unsafe.Pointer, C.int, error) { } // GetData gets the data object associated with a specific field from the -// current journal entry. +// the journal entry referenced by the last completed Next/Previous function +// call. To call GetData, you must have first called one of these functions. func (j *Journal) GetData(field string) (string, error) { d, l, err := j.getData(field) if err != nil { @@ -609,7 +660,9 @@ func (j *Journal) GetData(field string) (string, error) { } // GetDataValue gets the data object associated with a specific field from the -// current journal entry, returning only the value of the object. +// journal entry referenced by the last completed Next/Previous function call, +// returning only the value of the object. To call GetDataValue, you must first +// have called one of the Next/Previous functions. func (j *Journal) GetDataValue(field string) (string, error) { val, err := j.GetData(field) if err != nil { @@ -620,7 +673,8 @@ func (j *Journal) GetDataValue(field string) (string, error) { } // GetDataBytes gets the data object associated with a specific field from the -// current journal entry. +// journal entry referenced by the last completed Next/Previous function call. +// To call GetDataBytes, you must first have called one of these functions. func (j *Journal) GetDataBytes(field string) ([]byte, error) { d, l, err := j.getData(field) if err != nil { @@ -631,7 +685,9 @@ func (j *Journal) GetDataBytes(field string) ([]byte, error) { } // GetDataValueBytes gets the data object associated with a specific field from the -// current journal entry, returning only the value of the object. +// journal entry referenced by the last completed Next/Previous function call, +// returning only the value of the object. To call GetDataValueBytes, you must first +// have called one of the Next/Previous functions. func (j *Journal) GetDataValueBytes(field string) ([]byte, error) { val, err := j.GetDataBytes(field) if err != nil { @@ -641,9 +697,10 @@ func (j *Journal) GetDataValueBytes(field string) ([]byte, error) { return bytes.SplitN(val, []byte("="), 2)[1], nil } -// GetEntry returns a full representation of a journal entry with -// all key-value pairs of data as well as address fields (cursor, realtime -// timestamp and monotonic timestamp) +// GetEntry returns a full representation of the journal entry referenced by the +// last completed Next/Previous function call, with all key-value pairs of data +// as well as address fields (cursor, realtime timestamp and monotonic timestamp). +// To call GetEntry, you must first have called one of the Next/Previous functions. func (j *Journal) GetEntry() (*JournalEntry, error) { sd_journal_get_realtime_usec, err := getFunction("sd_journal_get_realtime_usec") if err != nil { @@ -731,7 +788,7 @@ func (j *Journal) GetEntry() (*JournalEntry, error) { return entry, nil } -// SetDataThresold sets the data field size threshold for data returned by +// SetDataThreshold sets the data field size threshold for data returned by // GetData. To retrieve the complete data fields this threshold should be // turned off by setting it to 0, so that the library always returns the // complete data objects. @@ -752,8 +809,10 @@ func (j *Journal) SetDataThreshold(threshold uint64) error { return nil } -// GetRealtimeUsec gets the realtime (wallclock) timestamp of the current -// journal entry. +// GetRealtimeUsec gets the realtime (wallclock) timestamp of the journal +// entry referenced by the last completed Next/Previous function call. To +// call GetRealtimeUsec, you must first have called one of the Next/Previous +// functions. func (j *Journal) GetRealtimeUsec() (uint64, error) { var usec C.uint64_t @@ -773,7 +832,10 @@ func (j *Journal) GetRealtimeUsec() (uint64, error) { return uint64(usec), nil } -// GetMonotonicUsec gets the monotonic timestamp of the current journal entry. +// GetMonotonicUsec gets the monotonic timestamp of the journal entry +// referenced by the last completed Next/Previous function call. To call +// GetMonotonicUsec, you must first have called one of the Next/Previous +// functions. func (j *Journal) GetMonotonicUsec() (uint64, error) { var usec C.uint64_t var boot_id C.sd_id128_t @@ -794,7 +856,9 @@ func (j *Journal) GetMonotonicUsec() (uint64, error) { return uint64(usec), nil } -// GetCursor gets the cursor of the current journal entry. +// GetCursor gets the cursor of the last journal entry reeferenced by the +// last completed Next/Previous function call. To call GetCursor, you must +// first have called one of the Next/Previous functions. func (j *Journal) GetCursor() (string, error) { sd_journal_get_cursor, err := getFunction("sd_journal_get_cursor") if err != nil { @@ -836,13 +900,16 @@ func (j *Journal) TestCursor(cursor string) error { if r < 0 { return fmt.Errorf("failed to test to cursor %q: %d", cursor, syscall.Errno(-r)) + } else if r == 0 { + return ErrNoTestCursor } return nil } // SeekHead seeks to the beginning of the journal, i.e. the oldest available -// entry. +// entry. This call must be followed by a call to Next before any call to +// Get* will return data about the first element. func (j *Journal) SeekHead() error { sd_journal_seek_head, err := getFunction("sd_journal_seek_head") if err != nil { @@ -861,7 +928,8 @@ func (j *Journal) SeekHead() error { } // SeekTail may be used to seek to the end of the journal, i.e. the most recent -// available entry. +// available entry. This call must be followed by a call to Next before any +// call to Get* will return data about the last element. func (j *Journal) SeekTail() error { sd_journal_seek_tail, err := getFunction("sd_journal_seek_tail") if err != nil { @@ -880,7 +948,8 @@ func (j *Journal) SeekTail() error { } // SeekRealtimeUsec seeks to the entry with the specified realtime (wallclock) -// timestamp, i.e. CLOCK_REALTIME. +// timestamp, i.e. CLOCK_REALTIME. This call must be followed by a call to +// Next/Previous before any call to Get* will return data about the sought entry. func (j *Journal) SeekRealtimeUsec(usec uint64) error { sd_journal_seek_realtime_usec, err := getFunction("sd_journal_seek_realtime_usec") if err != nil { @@ -898,7 +967,9 @@ func (j *Journal) SeekRealtimeUsec(usec uint64) error { return nil } -// SeekCursor seeks to a concrete journal cursor. +// SeekCursor seeks to a concrete journal cursor. This call must be +// followed by a call to Next/Previous before any call to Get* will return +// data about the sought entry. func (j *Journal) SeekCursor(cursor string) error { sd_journal_seek_cursor, err := getFunction("sd_journal_seek_cursor") if err != nil { @@ -937,7 +1008,7 @@ func (j *Journal) Wait(timeout time.Duration) int { // equivalent hex value. to = 0xffffffffffffffff } else { - to = uint64(time.Now().Add(timeout).Unix() / 1000) + to = uint64(timeout / time.Microsecond) } j.mu.Lock() r := C.my_sd_journal_wait(sd_journal_wait, j.cjournal, C.uint64_t(to)) @@ -1022,3 +1093,28 @@ func (j *Journal) GetUniqueValues(field string) ([]string, error) { return result, nil } + +// GetCatalog retrieves a message catalog entry for the journal entry referenced +// by the last completed Next/Previous function call. To call GetCatalog, you +// must first have called one of these functions. +func (j *Journal) GetCatalog() (string, error) { + sd_journal_get_catalog, err := getFunction("sd_journal_get_catalog") + if err != nil { + return "", err + } + + var c *C.char + + j.mu.Lock() + r := C.my_sd_journal_get_catalog(sd_journal_get_catalog, j.cjournal, &c) + j.mu.Unlock() + defer C.free(unsafe.Pointer(c)) + + if r < 0 { + return "", fmt.Errorf("failed to retrieve catalog entry for current journal entry: %d", syscall.Errno(-r)) + } + + catalog := C.GoString(c) + + return catalog, nil +} diff --git a/vendor/github.com/coreos/go-systemd/sdjournal/read.go b/vendor/github.com/coreos/go-systemd/sdjournal/read.go index b581f03b4..51a060fb5 100644 --- a/vendor/github.com/coreos/go-systemd/sdjournal/read.go +++ b/vendor/github.com/coreos/go-systemd/sdjournal/read.go @@ -21,10 +21,13 @@ import ( "io" "log" "strings" + "sync" "time" ) var ( + // ErrExpired gets returned when the Follow function runs into the + // specified timeout. ErrExpired = errors.New("Timeout expired") ) @@ -44,6 +47,11 @@ type JournalReaderConfig struct { // If not empty, the journal instance will point to a journal residing // in this directory. The supplied path may be relative or absolute. Path string + + // If not nil, Formatter will be used to translate the resulting entries + // into strings. If not set, the default format (timestamp and message field) + // will be used. If Formatter returns an error, Read will stop and return the error. + Formatter func(entry *JournalEntry) (string, error) } // JournalReader is an io.ReadCloser which provides a simple interface for iterating through the @@ -51,12 +59,20 @@ type JournalReaderConfig struct { type JournalReader struct { journal *Journal msgReader *strings.Reader + formatter func(entry *JournalEntry) (string, error) } // NewJournalReader creates a new JournalReader with configuration options that are similar to the // systemd journalctl tool's iteration and filtering features. func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { - r := &JournalReader{} + // use simpleMessageFormatter as default formatter. + if config.Formatter == nil { + config.Formatter = simpleMessageFormatter + } + + r := &JournalReader{ + formatter: config.Formatter, + } // Open the journal var err error @@ -71,7 +87,9 @@ func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { // Add any supplied matches for _, m := range config.Matches { - r.journal.AddMatch(m.String()) + if err = r.journal.AddMatch(m.String()); err != nil { + return nil, err + } } // Set the start position based on options @@ -118,14 +136,10 @@ func NewJournalReader(config JournalReaderConfig) (*JournalReader, error) { // don't fit in the read buffer. Callers should keep calling until 0 and/or an // error is returned. func (r *JournalReader) Read(b []byte) (int, error) { - var err error - if r.msgReader == nil { - var c uint64 - // Advance the journal cursor. It has to be called at least one time // before reading - c, err = r.journal.Next() + c, err := r.journal.Next() // An unexpected error if err != nil { @@ -137,10 +151,13 @@ func (r *JournalReader) Read(b []byte) (int, error) { return 0, io.EOF } - // Build a message - var msg string - msg, err = r.buildMessage() + entry, err := r.journal.GetEntry() + if err != nil { + return 0, err + } + // Build a message + msg, err := r.formatter(entry) if err != nil { return 0, err } @@ -148,8 +165,7 @@ func (r *JournalReader) Read(b []byte) (int, error) { } // Copy and return the message - var sz int - sz, err = r.msgReader.Read(b) + sz, err := r.msgReader.Read(b) if err == io.EOF { // The current entry has been fully read. Don't propagate this // EOF, so the next entry can be read at the next Read() @@ -180,80 +196,76 @@ func (r *JournalReader) Rewind() error { // Follow synchronously follows the JournalReader, writing each new journal entry to writer. The // follow will continue until a single time.Time is received on the until channel. -func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) (err error) { +func (r *JournalReader) Follow(until <-chan time.Time, writer io.Writer) error { // Process journal entries and events. Entries are flushed until the tail or // timeout is reached, and then we wait for new events or the timeout. var msg = make([]byte, 64*1<<(10)) + var waitCh = make(chan int, 1) + var waitGroup sync.WaitGroup + defer waitGroup.Wait() + process: for { c, err := r.Read(msg) if err != nil && err != io.EOF { - break process + return err } select { case <-until: return ErrExpired default: - if c > 0 { - if _, err = writer.Write(msg[:c]); err != nil { - break process - } - continue process + } + if c > 0 { + if _, err = writer.Write(msg[:c]); err != nil { + return err } + continue process } // We're at the tail, so wait for new events or time out. // Holds journal events to process. Tightly bounded for now unless there's a // reason to unblock the journal watch routine more quickly. - events := make(chan int, 1) - pollDone := make(chan bool, 1) - go func() { - for { - select { - case <-pollDone: - return + for { + waitGroup.Add(1) + go func() { + status := r.journal.Wait(100 * time.Millisecond) + waitCh <- status + waitGroup.Done() + }() + + select { + case <-until: + return ErrExpired + case e := <-waitCh: + switch e { + case SD_JOURNAL_NOP: + // the journal did not change since the last invocation + case SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: + continue process default: - events <- r.journal.Wait(time.Duration(1) * time.Second) - } - } - }() + if e < 0 { + return fmt.Errorf("received error event: %d", e) + } - select { - case <-until: - pollDone <- true - return ErrExpired - case e := <-events: - pollDone <- true - switch e { - case SD_JOURNAL_NOP, SD_JOURNAL_APPEND, SD_JOURNAL_INVALIDATE: - // TODO: need to account for any of these? - default: - log.Printf("Received unknown event: %d\n", e) + log.Printf("received unknown event: %d\n", e) + } } - continue process } } - - return } -// buildMessage returns a string representing the current journal entry in a simple format which +// simpleMessageFormatter is the default formatter. +// It returns a string representing the current journal entry in a simple format which // includes the entry timestamp and MESSAGE field. -func (r *JournalReader) buildMessage() (string, error) { - var msg string - var usec uint64 - var err error - - if msg, err = r.journal.GetData("MESSAGE"); err != nil { - return "", err - } - - if usec, err = r.journal.GetRealtimeUsec(); err != nil { - return "", err +func simpleMessageFormatter(entry *JournalEntry) (string, error) { + msg, ok := entry.Fields["MESSAGE"] + if !ok { + return "", fmt.Errorf("no MESSAGE field present in journal entry") } + usec := entry.RealtimeTimestamp timestamp := time.Unix(0, int64(usec)*int64(time.Microsecond)) return fmt.Sprintf("%s %s\n", timestamp, msg), nil |