diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2019-06-25 21:40:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-25 21:40:38 +0200 |
commit | 5b7086abda91f4301af3bfb642d416a22349c276 (patch) | |
tree | bf139f29b261e55c161394637f1c7073da5103f0 /vendor/github.com/godbus/dbus/conn.go | |
parent | a488e197a6e3947dd420b40ed834b50db9c829c3 (diff) | |
parent | 2388222e98462fdbbe44f3e091b2b79d80956a9a (diff) | |
download | podman-5b7086abda91f4301af3bfb642d416a22349c276.tar.gz podman-5b7086abda91f4301af3bfb642d416a22349c276.tar.bz2 podman-5b7086abda91f4301af3bfb642d416a22349c276.zip |
Merge pull request #3418 from vrothberg/go-modules
update dependencies
Diffstat (limited to 'vendor/github.com/godbus/dbus/conn.go')
-rw-r--r-- | vendor/github.com/godbus/dbus/conn.go | 616 |
1 files changed, 390 insertions, 226 deletions
diff --git a/vendor/github.com/godbus/dbus/conn.go b/vendor/github.com/godbus/dbus/conn.go index 5720e2ebb..b38920baf 100644 --- a/vendor/github.com/godbus/dbus/conn.go +++ b/vendor/github.com/godbus/dbus/conn.go @@ -1,6 +1,7 @@ package dbus import ( + "context" "errors" "io" "os" @@ -14,7 +15,6 @@ var ( systemBusLck sync.Mutex sessionBus *Conn sessionBusLck sync.Mutex - sessionEnvLck sync.Mutex ) // ErrClosed is the error returned by calls on a closed connection. @@ -35,23 +35,13 @@ type Conn struct { unixFD bool uuid string - names []string - namesLck sync.RWMutex - - serialLck sync.Mutex - nextSerial uint32 - serialUsed map[uint32]bool - - calls map[uint32]*Call - callsLck sync.RWMutex - - handler Handler - - out chan *Message - closed bool - outLck sync.RWMutex - + handler Handler signalHandler SignalHandler + serialGen SerialGenerator + + names *nameTracker + calls *callTracker + outHandler *outputHandler eavesdropped chan<- *Message eavesdroppedLck sync.Mutex @@ -87,32 +77,31 @@ func SessionBus() (conn *Conn, err error) { } func getSessionBusAddress() (string, error) { - sessionEnvLck.Lock() - defer sessionEnvLck.Unlock() - address := os.Getenv("DBUS_SESSION_BUS_ADDRESS") - if address != "" && address != "autolaunch:" { + if address := os.Getenv("DBUS_SESSION_BUS_ADDRESS"); address != "" && address != "autolaunch:" { + return address, nil + + } else if address := tryDiscoverDbusSessionBusAddress(); address != "" { + os.Setenv("DBUS_SESSION_BUS_ADDRESS", address) return address, nil } return getSessionBusPlatformAddress() } // SessionBusPrivate returns a new private connection to the session bus. -func SessionBusPrivate() (*Conn, error) { +func SessionBusPrivate(opts ...ConnOption) (*Conn, error) { address, err := getSessionBusAddress() if err != nil { return nil, err } - return Dial(address) + return Dial(address, opts...) } // SessionBusPrivate returns a new private connection to the session bus. +// +// Deprecated: use SessionBusPrivate with options instead. func SessionBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) { - address, err := getSessionBusAddress() - if err != nil { - return nil, err - } - return DialHandler(address, handler, signalHandler) + return SessionBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler)) } // SystemBus returns a shared connection to the system bus, connecting to it if @@ -145,53 +134,93 @@ func SystemBus() (conn *Conn, err error) { } // SystemBusPrivate returns a new private connection to the system bus. -func SystemBusPrivate() (*Conn, error) { - return Dial(getSystemBusPlatformAddress()) +func SystemBusPrivate(opts ...ConnOption) (*Conn, error) { + return Dial(getSystemBusPlatformAddress(), opts...) } // SystemBusPrivateHandler returns a new private connection to the system bus, using the provided handlers. +// +// Deprecated: use SystemBusPrivate with options instead. func SystemBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) { - return DialHandler(getSystemBusPlatformAddress(), handler, signalHandler) + return SystemBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler)) } // Dial establishes a new private connection to the message bus specified by address. -func Dial(address string) (*Conn, error) { +func Dial(address string, opts ...ConnOption) (*Conn, error) { tr, err := getTransport(address) if err != nil { return nil, err } - return newConn(tr, NewDefaultHandler(), NewDefaultSignalHandler()) + return newConn(tr, opts...) } // DialHandler establishes a new private connection to the message bus specified by address, using the supplied handlers. +// +// Deprecated: use Dial with options instead. func DialHandler(address string, handler Handler, signalHandler SignalHandler) (*Conn, error) { - tr, err := getTransport(address) - if err != nil { - return nil, err + return Dial(address, WithSignalHandler(signalHandler)) +} + +// ConnOption is a connection option. +type ConnOption func(conn *Conn) error + +// WithHandler overrides the default handler. +func WithHandler(handler Handler) ConnOption { + return func(conn *Conn) error { + conn.handler = handler + return nil + } +} + +// WithSignalHandler overrides the default signal handler. +func WithSignalHandler(handler SignalHandler) ConnOption { + return func(conn *Conn) error { + conn.signalHandler = handler + return nil + } +} + +// WithSerialGenerator overrides the default signals generator. +func WithSerialGenerator(gen SerialGenerator) ConnOption { + return func(conn *Conn) error { + conn.serialGen = gen + return nil } - return newConn(tr, handler, signalHandler) } // NewConn creates a new private *Conn from an already established connection. -func NewConn(conn io.ReadWriteCloser) (*Conn, error) { - return NewConnHandler(conn, NewDefaultHandler(), NewDefaultSignalHandler()) +func NewConn(conn io.ReadWriteCloser, opts ...ConnOption) (*Conn, error) { + return newConn(genericTransport{conn}, opts...) } // NewConnHandler creates a new private *Conn from an already established connection, using the supplied handlers. +// +// Deprecated: use NewConn with options instead. func NewConnHandler(conn io.ReadWriteCloser, handler Handler, signalHandler SignalHandler) (*Conn, error) { - return newConn(genericTransport{conn}, handler, signalHandler) + return NewConn(genericTransport{conn}, WithHandler(handler), WithSignalHandler(signalHandler)) } // newConn creates a new *Conn from a transport. -func newConn(tr transport, handler Handler, signalHandler SignalHandler) (*Conn, error) { +func newConn(tr transport, opts ...ConnOption) (*Conn, error) { conn := new(Conn) conn.transport = tr - conn.calls = make(map[uint32]*Call) - conn.out = make(chan *Message, 10) - conn.handler = handler - conn.signalHandler = signalHandler - conn.nextSerial = 1 - conn.serialUsed = map[uint32]bool{0: true} + for _, opt := range opts { + if err := opt(conn); err != nil { + return nil, err + } + } + conn.calls = newCallTracker() + if conn.handler == nil { + conn.handler = NewDefaultHandler() + } + if conn.signalHandler == nil { + conn.signalHandler = NewDefaultSignalHandler() + } + if conn.serialGen == nil { + conn.serialGen = newSerialGenerator() + } + conn.outHandler = &outputHandler{conn: conn} + conn.names = newNameTracker() conn.busObj = conn.Object("org.freedesktop.DBus", "/org/freedesktop/DBus") return conn, nil } @@ -206,18 +235,7 @@ func (conn *Conn) BusObject() BusObject { // and the channels passed to Eavesdrop and Signal are closed. This method must // not be called on shared connections. func (conn *Conn) Close() error { - conn.outLck.Lock() - if conn.closed { - // inWorker calls Close on read error, the read error may - // be caused by another caller calling Close to shutdown the - // dbus connection, a double-close scenario we prevent here. - conn.outLck.Unlock() - return nil - } - close(conn.out) - conn.closed = true - conn.outLck.Unlock() - + conn.outHandler.close() if term, ok := conn.signalHandler.(Terminator); ok { term.Terminate() } @@ -249,17 +267,9 @@ func (conn *Conn) Eavesdrop(ch chan<- *Message) { conn.eavesdroppedLck.Unlock() } -// getSerial returns an unused serial. +// GetSerial returns an unused serial. func (conn *Conn) getSerial() uint32 { - conn.serialLck.Lock() - defer conn.serialLck.Unlock() - n := conn.nextSerial - for conn.serialUsed[n] { - n++ - } - conn.serialUsed[n] = true - conn.nextSerial = n + 1 - return n + return conn.serialGen.GetSerial() } // Hello sends the initial org.freedesktop.DBus.Hello call. This method must be @@ -271,10 +281,7 @@ func (conn *Conn) Hello() error { if err != nil { return err } - conn.namesLck.Lock() - conn.names = make([]string, 1) - conn.names[0] = s - conn.namesLck.Unlock() + conn.names.acquireUniqueConnectionName(s) return nil } @@ -283,109 +290,48 @@ func (conn *Conn) Hello() error { func (conn *Conn) inWorker() { for { msg, err := conn.ReadMessage() - if err == nil { - conn.eavesdroppedLck.Lock() - if conn.eavesdropped != nil { - select { - case conn.eavesdropped <- msg: - default: - } - conn.eavesdroppedLck.Unlock() - continue - } - conn.eavesdroppedLck.Unlock() - dest, _ := msg.Headers[FieldDestination].value.(string) - found := false - if dest == "" { - found = true - } else { - conn.namesLck.RLock() - if len(conn.names) == 0 { - found = true - } - for _, v := range conn.names { - if dest == v { - found = true - break - } - } - conn.namesLck.RUnlock() - } - if !found { - // Eavesdropped a message, but no channel for it is registered. - // Ignore it. - continue - } - switch msg.Type { - case TypeMethodReply, TypeError: - serial := msg.Headers[FieldReplySerial].value.(uint32) - conn.callsLck.Lock() - if c, ok := conn.calls[serial]; ok { - if msg.Type == TypeError { - name, _ := msg.Headers[FieldErrorName].value.(string) - c.Err = Error{name, msg.Body} - } else { - c.Body = msg.Body - } - c.Done <- c - conn.serialLck.Lock() - delete(conn.serialUsed, serial) - conn.serialLck.Unlock() - delete(conn.calls, serial) - } - conn.callsLck.Unlock() - case TypeSignal: - iface := msg.Headers[FieldInterface].value.(string) - member := msg.Headers[FieldMember].value.(string) - // as per http://dbus.freedesktop.org/doc/dbus-specification.html , - // sender is optional for signals. - sender, _ := msg.Headers[FieldSender].value.(string) - if iface == "org.freedesktop.DBus" && sender == "org.freedesktop.DBus" { - if member == "NameLost" { - // If we lost the name on the bus, remove it from our - // tracking list. - name, ok := msg.Body[0].(string) - if !ok { - panic("Unable to read the lost name") - } - conn.namesLck.Lock() - for i, v := range conn.names { - if v == name { - conn.names = append(conn.names[:i], - conn.names[i+1:]...) - } - } - conn.namesLck.Unlock() - } else if member == "NameAcquired" { - // If we acquired the name on the bus, add it to our - // tracking list. - name, ok := msg.Body[0].(string) - if !ok { - panic("Unable to read the acquired name") - } - conn.namesLck.Lock() - conn.names = append(conn.names, name) - conn.namesLck.Unlock() - } - } - conn.handleSignal(msg) - case TypeMethodCall: - go conn.handleCall(msg) + if err != nil { + if _, ok := err.(InvalidMessageError); !ok { + // Some read error occured (usually EOF); we can't really do + // anything but to shut down all stuff and returns errors to all + // pending replies. + conn.Close() + conn.calls.finalizeAllWithError(err) + return } - } else if _, ok := err.(InvalidMessageError); !ok { - // Some read error occured (usually EOF); we can't really do - // anything but to shut down all stuff and returns errors to all - // pending replies. - conn.Close() - conn.callsLck.RLock() - for _, v := range conn.calls { - v.Err = err - v.Done <- v + // invalid messages are ignored + continue + } + conn.eavesdroppedLck.Lock() + if conn.eavesdropped != nil { + select { + case conn.eavesdropped <- msg: + default: } - conn.callsLck.RUnlock() - return + conn.eavesdroppedLck.Unlock() + continue + } + conn.eavesdroppedLck.Unlock() + dest, _ := msg.Headers[FieldDestination].value.(string) + found := dest == "" || + !conn.names.uniqueNameIsKnown() || + conn.names.isKnownName(dest) + if !found { + // Eavesdropped a message, but no channel for it is registered. + // Ignore it. + continue + } + switch msg.Type { + case TypeError: + conn.serialGen.RetireSerial(conn.calls.handleDBusError(msg)) + case TypeMethodReply: + conn.serialGen.RetireSerial(conn.calls.handleReply(msg)) + case TypeSignal: + conn.handleSignal(msg) + case TypeMethodCall: + go conn.handleCall(msg) } - // invalid messages are ignored + } } @@ -395,6 +341,25 @@ func (conn *Conn) handleSignal(msg *Message) { // as per http://dbus.freedesktop.org/doc/dbus-specification.html , // sender is optional for signals. sender, _ := msg.Headers[FieldSender].value.(string) + if iface == "org.freedesktop.DBus" && sender == "org.freedesktop.DBus" { + if member == "NameLost" { + // If we lost the name on the bus, remove it from our + // tracking list. + name, ok := msg.Body[0].(string) + if !ok { + panic("Unable to read the lost name") + } + conn.names.loseName(name) + } else if member == "NameAcquired" { + // If we acquired the name on the bus, add it to our + // tracking list. + name, ok := msg.Body[0].(string) + if !ok { + panic("Unable to read the acquired name") + } + conn.names.acquireName(name) + } + } signal := &Signal{ Sender: sender, Path: msg.Headers[FieldPath].value.(ObjectPath), @@ -408,12 +373,7 @@ func (conn *Conn) handleSignal(msg *Message) { // connection. The slice is always at least one element long, the first element // being the unique name of the connection. func (conn *Conn) Names() []string { - conn.namesLck.RLock() - // copy the slice so it can't be modified - s := make([]string, len(conn.names)) - copy(s, conn.names) - conn.namesLck.RUnlock() - return s + return conn.names.listKnownNames() } // Object returns the object identified by the given destination name and path. @@ -423,24 +383,17 @@ func (conn *Conn) Object(dest string, path ObjectPath) BusObject { // outWorker runs in an own goroutine, encoding and sending messages that are // sent to conn.out. -func (conn *Conn) outWorker() { - for msg := range conn.out { - err := conn.SendMessage(msg) - conn.callsLck.RLock() - if err != nil { - if c := conn.calls[msg.serial]; c != nil { - c.Err = err - c.Done <- c - } - conn.serialLck.Lock() - delete(conn.serialUsed, msg.serial) - conn.serialLck.Unlock() - } else if msg.Type != TypeMethodCall { - conn.serialLck.Lock() - delete(conn.serialUsed, msg.serial) - conn.serialLck.Unlock() - } - conn.callsLck.RUnlock() +func (conn *Conn) sendMessage(msg *Message) { + conn.sendMessageAndIfClosed(msg, func() {}) +} + +func (conn *Conn) sendMessageAndIfClosed(msg *Message, ifClosed func()) { + err := conn.outHandler.sendAndIfClosed(msg, ifClosed) + conn.calls.handleSendError(msg, err) + if err != nil { + conn.serialGen.RetireSerial(msg.serial) + } else if msg.Type != TypeMethodCall { + conn.serialGen.RetireSerial(msg.serial) } } @@ -451,8 +404,21 @@ func (conn *Conn) outWorker() { // once the call is complete. Otherwise, ch is ignored and a Call structure is // returned of which only the Err member is valid. func (conn *Conn) Send(msg *Message, ch chan *Call) *Call { - var call *Call + return conn.send(context.Background(), msg, ch) +} +// SendWithContext acts like Send but takes a context +func (conn *Conn) SendWithContext(ctx context.Context, msg *Message, ch chan *Call) *Call { + return conn.send(ctx, msg, ch) +} + +func (conn *Conn) send(ctx context.Context, msg *Message, ch chan *Call) *Call { + if ctx == nil { + panic("nil context") + } + + var call *Call + ctx, canceler := context.WithCancel(ctx) msg.serial = conn.getSerial() if msg.Type == TypeMethodCall && msg.Flags&FlagNoReplyExpected == 0 { if ch == nil { @@ -468,26 +434,23 @@ func (conn *Conn) Send(msg *Message, ch chan *Call) *Call { call.Method = iface + "." + member call.Args = msg.Body call.Done = ch - conn.callsLck.Lock() - conn.calls[msg.serial] = call - conn.callsLck.Unlock() - conn.outLck.RLock() - if conn.closed { - call.Err = ErrClosed - call.Done <- call - } else { - conn.out <- msg - } - conn.outLck.RUnlock() + call.ctx = ctx + call.ctxCanceler = canceler + conn.calls.track(msg.serial, call) + go func() { + <-ctx.Done() + conn.calls.handleSendError(msg, ctx.Err()) + }() + conn.sendMessageAndIfClosed(msg, func() { + conn.calls.handleSendError(msg, ErrClosed) + canceler() + }) } else { - conn.outLck.RLock() - if conn.closed { + canceler() + call = &Call{Err: nil} + conn.sendMessageAndIfClosed(msg, func() { call = &Call{Err: ErrClosed} - } else { - conn.out <- msg - call = &Call{Err: nil} - } - conn.outLck.RUnlock() + }) } return call } @@ -520,11 +483,7 @@ func (conn *Conn) sendError(err error, dest string, serial uint32) { if len(e.Body) > 0 { msg.Headers[FieldSignature] = MakeVariant(SignatureOf(e.Body...)) } - conn.outLck.RLock() - if !conn.closed { - conn.out <- msg - } - conn.outLck.RUnlock() + conn.sendMessage(msg) } // sendReply creates a method reply message corresponding to the parameters and @@ -542,11 +501,7 @@ func (conn *Conn) sendReply(dest string, serial uint32, values ...interface{}) { if len(values) > 0 { msg.Headers[FieldSignature] = MakeVariant(SignatureOf(values...)) } - conn.outLck.RLock() - if !conn.closed { - conn.out <- msg - } - conn.outLck.RUnlock() + conn.sendMessage(msg) } func (conn *Conn) defaultSignalAction(fn func(h *defaultSignalHandler, ch chan<- *Signal), ch chan<- *Signal) { @@ -681,3 +636,212 @@ func getKey(s, key string) string { } return "" } + +type outputHandler struct { + conn *Conn + sendLck sync.Mutex + closed struct { + isClosed bool + lck sync.RWMutex + } +} + +func (h *outputHandler) sendAndIfClosed(msg *Message, ifClosed func()) error { + h.closed.lck.RLock() + defer h.closed.lck.RUnlock() + if h.closed.isClosed { + ifClosed() + return nil + } + h.sendLck.Lock() + defer h.sendLck.Unlock() + return h.conn.SendMessage(msg) +} + +func (h *outputHandler) close() { + h.closed.lck.Lock() + defer h.closed.lck.Unlock() + h.closed.isClosed = true +} + +type serialGenerator struct { + lck sync.Mutex + nextSerial uint32 + serialUsed map[uint32]bool +} + +func newSerialGenerator() *serialGenerator { + return &serialGenerator{ + serialUsed: map[uint32]bool{0: true}, + nextSerial: 1, + } +} + +func (gen *serialGenerator) GetSerial() uint32 { + gen.lck.Lock() + defer gen.lck.Unlock() + n := gen.nextSerial + for gen.serialUsed[n] { + n++ + } + gen.serialUsed[n] = true + gen.nextSerial = n + 1 + return n +} + +func (gen *serialGenerator) RetireSerial(serial uint32) { + gen.lck.Lock() + defer gen.lck.Unlock() + delete(gen.serialUsed, serial) +} + +type nameTracker struct { + lck sync.RWMutex + unique string + names map[string]struct{} +} + +func newNameTracker() *nameTracker { + return &nameTracker{names: map[string]struct{}{}} +} +func (tracker *nameTracker) acquireUniqueConnectionName(name string) { + tracker.lck.Lock() + defer tracker.lck.Unlock() + tracker.unique = name +} +func (tracker *nameTracker) acquireName(name string) { + tracker.lck.Lock() + defer tracker.lck.Unlock() + tracker.names[name] = struct{}{} +} +func (tracker *nameTracker) loseName(name string) { + tracker.lck.Lock() + defer tracker.lck.Unlock() + delete(tracker.names, name) +} + +func (tracker *nameTracker) uniqueNameIsKnown() bool { + tracker.lck.RLock() + defer tracker.lck.RUnlock() + return tracker.unique != "" +} +func (tracker *nameTracker) isKnownName(name string) bool { + tracker.lck.RLock() + defer tracker.lck.RUnlock() + _, ok := tracker.names[name] + return ok || name == tracker.unique +} +func (tracker *nameTracker) listKnownNames() []string { + tracker.lck.RLock() + defer tracker.lck.RUnlock() + out := make([]string, 0, len(tracker.names)+1) + out = append(out, tracker.unique) + for k := range tracker.names { + out = append(out, k) + } + return out +} + +type callTracker struct { + calls map[uint32]*Call + lck sync.RWMutex +} + +func newCallTracker() *callTracker { + return &callTracker{calls: map[uint32]*Call{}} +} + +func (tracker *callTracker) track(sn uint32, call *Call) { + tracker.lck.Lock() + tracker.calls[sn] = call + tracker.lck.Unlock() +} + +func (tracker *callTracker) handleReply(msg *Message) uint32 { + serial := msg.Headers[FieldReplySerial].value.(uint32) + tracker.lck.RLock() + _, ok := tracker.calls[serial] + tracker.lck.RUnlock() + if ok { + tracker.finalizeWithBody(serial, msg.Body) + } + return serial +} + +func (tracker *callTracker) handleDBusError(msg *Message) uint32 { + serial := msg.Headers[FieldReplySerial].value.(uint32) + tracker.lck.RLock() + _, ok := tracker.calls[serial] + tracker.lck.RUnlock() + if ok { + name, _ := msg.Headers[FieldErrorName].value.(string) + tracker.finalizeWithError(serial, Error{name, msg.Body}) + } + return serial +} + +func (tracker *callTracker) handleSendError(msg *Message, err error) { + if err == nil { + return + } + tracker.lck.RLock() + _, ok := tracker.calls[msg.serial] + tracker.lck.RUnlock() + if ok { + tracker.finalizeWithError(msg.serial, err) + } +} + +// finalize was the only func that did not strobe Done +func (tracker *callTracker) finalize(sn uint32) { + tracker.lck.Lock() + defer tracker.lck.Unlock() + c, ok := tracker.calls[sn] + if ok { + delete(tracker.calls, sn) + c.ContextCancel() + } + return +} + +func (tracker *callTracker) finalizeWithBody(sn uint32, body []interface{}) { + tracker.lck.Lock() + c, ok := tracker.calls[sn] + if ok { + delete(tracker.calls, sn) + } + tracker.lck.Unlock() + if ok { + c.Body = body + c.done() + } + return +} + +func (tracker *callTracker) finalizeWithError(sn uint32, err error) { + tracker.lck.Lock() + c, ok := tracker.calls[sn] + if ok { + delete(tracker.calls, sn) + } + tracker.lck.Unlock() + if ok { + c.Err = err + c.done() + } + return +} + +func (tracker *callTracker) finalizeAllWithError(err error) { + tracker.lck.Lock() + closedCalls := make([]*Call, 0, len(tracker.calls)) + for sn := range tracker.calls { + closedCalls = append(closedCalls, tracker.calls[sn]) + } + tracker.calls = map[uint32]*Call{} + tracker.lck.Unlock() + for _, call := range closedCalls { + call.Err = err + call.done() + } +} |