summaryrefslogtreecommitdiff
path: root/vendor/github.com/godbus/dbus/conn.go
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2019-06-25 21:40:38 +0200
committerGitHub <noreply@github.com>2019-06-25 21:40:38 +0200
commit5b7086abda91f4301af3bfb642d416a22349c276 (patch)
treebf139f29b261e55c161394637f1c7073da5103f0 /vendor/github.com/godbus/dbus/conn.go
parenta488e197a6e3947dd420b40ed834b50db9c829c3 (diff)
parent2388222e98462fdbbe44f3e091b2b79d80956a9a (diff)
downloadpodman-5b7086abda91f4301af3bfb642d416a22349c276.tar.gz
podman-5b7086abda91f4301af3bfb642d416a22349c276.tar.bz2
podman-5b7086abda91f4301af3bfb642d416a22349c276.zip
Merge pull request #3418 from vrothberg/go-modules
update dependencies
Diffstat (limited to 'vendor/github.com/godbus/dbus/conn.go')
-rw-r--r--vendor/github.com/godbus/dbus/conn.go616
1 files changed, 390 insertions, 226 deletions
diff --git a/vendor/github.com/godbus/dbus/conn.go b/vendor/github.com/godbus/dbus/conn.go
index 5720e2ebb..b38920baf 100644
--- a/vendor/github.com/godbus/dbus/conn.go
+++ b/vendor/github.com/godbus/dbus/conn.go
@@ -1,6 +1,7 @@
package dbus
import (
+ "context"
"errors"
"io"
"os"
@@ -14,7 +15,6 @@ var (
systemBusLck sync.Mutex
sessionBus *Conn
sessionBusLck sync.Mutex
- sessionEnvLck sync.Mutex
)
// ErrClosed is the error returned by calls on a closed connection.
@@ -35,23 +35,13 @@ type Conn struct {
unixFD bool
uuid string
- names []string
- namesLck sync.RWMutex
-
- serialLck sync.Mutex
- nextSerial uint32
- serialUsed map[uint32]bool
-
- calls map[uint32]*Call
- callsLck sync.RWMutex
-
- handler Handler
-
- out chan *Message
- closed bool
- outLck sync.RWMutex
-
+ handler Handler
signalHandler SignalHandler
+ serialGen SerialGenerator
+
+ names *nameTracker
+ calls *callTracker
+ outHandler *outputHandler
eavesdropped chan<- *Message
eavesdroppedLck sync.Mutex
@@ -87,32 +77,31 @@ func SessionBus() (conn *Conn, err error) {
}
func getSessionBusAddress() (string, error) {
- sessionEnvLck.Lock()
- defer sessionEnvLck.Unlock()
- address := os.Getenv("DBUS_SESSION_BUS_ADDRESS")
- if address != "" && address != "autolaunch:" {
+ if address := os.Getenv("DBUS_SESSION_BUS_ADDRESS"); address != "" && address != "autolaunch:" {
+ return address, nil
+
+ } else if address := tryDiscoverDbusSessionBusAddress(); address != "" {
+ os.Setenv("DBUS_SESSION_BUS_ADDRESS", address)
return address, nil
}
return getSessionBusPlatformAddress()
}
// SessionBusPrivate returns a new private connection to the session bus.
-func SessionBusPrivate() (*Conn, error) {
+func SessionBusPrivate(opts ...ConnOption) (*Conn, error) {
address, err := getSessionBusAddress()
if err != nil {
return nil, err
}
- return Dial(address)
+ return Dial(address, opts...)
}
// SessionBusPrivate returns a new private connection to the session bus.
+//
+// Deprecated: use SessionBusPrivate with options instead.
func SessionBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) {
- address, err := getSessionBusAddress()
- if err != nil {
- return nil, err
- }
- return DialHandler(address, handler, signalHandler)
+ return SessionBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler))
}
// SystemBus returns a shared connection to the system bus, connecting to it if
@@ -145,53 +134,93 @@ func SystemBus() (conn *Conn, err error) {
}
// SystemBusPrivate returns a new private connection to the system bus.
-func SystemBusPrivate() (*Conn, error) {
- return Dial(getSystemBusPlatformAddress())
+func SystemBusPrivate(opts ...ConnOption) (*Conn, error) {
+ return Dial(getSystemBusPlatformAddress(), opts...)
}
// SystemBusPrivateHandler returns a new private connection to the system bus, using the provided handlers.
+//
+// Deprecated: use SystemBusPrivate with options instead.
func SystemBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) {
- return DialHandler(getSystemBusPlatformAddress(), handler, signalHandler)
+ return SystemBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler))
}
// Dial establishes a new private connection to the message bus specified by address.
-func Dial(address string) (*Conn, error) {
+func Dial(address string, opts ...ConnOption) (*Conn, error) {
tr, err := getTransport(address)
if err != nil {
return nil, err
}
- return newConn(tr, NewDefaultHandler(), NewDefaultSignalHandler())
+ return newConn(tr, opts...)
}
// DialHandler establishes a new private connection to the message bus specified by address, using the supplied handlers.
+//
+// Deprecated: use Dial with options instead.
func DialHandler(address string, handler Handler, signalHandler SignalHandler) (*Conn, error) {
- tr, err := getTransport(address)
- if err != nil {
- return nil, err
+ return Dial(address, WithSignalHandler(signalHandler))
+}
+
+// ConnOption is a connection option.
+type ConnOption func(conn *Conn) error
+
+// WithHandler overrides the default handler.
+func WithHandler(handler Handler) ConnOption {
+ return func(conn *Conn) error {
+ conn.handler = handler
+ return nil
+ }
+}
+
+// WithSignalHandler overrides the default signal handler.
+func WithSignalHandler(handler SignalHandler) ConnOption {
+ return func(conn *Conn) error {
+ conn.signalHandler = handler
+ return nil
+ }
+}
+
+// WithSerialGenerator overrides the default signals generator.
+func WithSerialGenerator(gen SerialGenerator) ConnOption {
+ return func(conn *Conn) error {
+ conn.serialGen = gen
+ return nil
}
- return newConn(tr, handler, signalHandler)
}
// NewConn creates a new private *Conn from an already established connection.
-func NewConn(conn io.ReadWriteCloser) (*Conn, error) {
- return NewConnHandler(conn, NewDefaultHandler(), NewDefaultSignalHandler())
+func NewConn(conn io.ReadWriteCloser, opts ...ConnOption) (*Conn, error) {
+ return newConn(genericTransport{conn}, opts...)
}
// NewConnHandler creates a new private *Conn from an already established connection, using the supplied handlers.
+//
+// Deprecated: use NewConn with options instead.
func NewConnHandler(conn io.ReadWriteCloser, handler Handler, signalHandler SignalHandler) (*Conn, error) {
- return newConn(genericTransport{conn}, handler, signalHandler)
+ return NewConn(genericTransport{conn}, WithHandler(handler), WithSignalHandler(signalHandler))
}
// newConn creates a new *Conn from a transport.
-func newConn(tr transport, handler Handler, signalHandler SignalHandler) (*Conn, error) {
+func newConn(tr transport, opts ...ConnOption) (*Conn, error) {
conn := new(Conn)
conn.transport = tr
- conn.calls = make(map[uint32]*Call)
- conn.out = make(chan *Message, 10)
- conn.handler = handler
- conn.signalHandler = signalHandler
- conn.nextSerial = 1
- conn.serialUsed = map[uint32]bool{0: true}
+ for _, opt := range opts {
+ if err := opt(conn); err != nil {
+ return nil, err
+ }
+ }
+ conn.calls = newCallTracker()
+ if conn.handler == nil {
+ conn.handler = NewDefaultHandler()
+ }
+ if conn.signalHandler == nil {
+ conn.signalHandler = NewDefaultSignalHandler()
+ }
+ if conn.serialGen == nil {
+ conn.serialGen = newSerialGenerator()
+ }
+ conn.outHandler = &outputHandler{conn: conn}
+ conn.names = newNameTracker()
conn.busObj = conn.Object("org.freedesktop.DBus", "/org/freedesktop/DBus")
return conn, nil
}
@@ -206,18 +235,7 @@ func (conn *Conn) BusObject() BusObject {
// and the channels passed to Eavesdrop and Signal are closed. This method must
// not be called on shared connections.
func (conn *Conn) Close() error {
- conn.outLck.Lock()
- if conn.closed {
- // inWorker calls Close on read error, the read error may
- // be caused by another caller calling Close to shutdown the
- // dbus connection, a double-close scenario we prevent here.
- conn.outLck.Unlock()
- return nil
- }
- close(conn.out)
- conn.closed = true
- conn.outLck.Unlock()
-
+ conn.outHandler.close()
if term, ok := conn.signalHandler.(Terminator); ok {
term.Terminate()
}
@@ -249,17 +267,9 @@ func (conn *Conn) Eavesdrop(ch chan<- *Message) {
conn.eavesdroppedLck.Unlock()
}
-// getSerial returns an unused serial.
+// GetSerial returns an unused serial.
func (conn *Conn) getSerial() uint32 {
- conn.serialLck.Lock()
- defer conn.serialLck.Unlock()
- n := conn.nextSerial
- for conn.serialUsed[n] {
- n++
- }
- conn.serialUsed[n] = true
- conn.nextSerial = n + 1
- return n
+ return conn.serialGen.GetSerial()
}
// Hello sends the initial org.freedesktop.DBus.Hello call. This method must be
@@ -271,10 +281,7 @@ func (conn *Conn) Hello() error {
if err != nil {
return err
}
- conn.namesLck.Lock()
- conn.names = make([]string, 1)
- conn.names[0] = s
- conn.namesLck.Unlock()
+ conn.names.acquireUniqueConnectionName(s)
return nil
}
@@ -283,109 +290,48 @@ func (conn *Conn) Hello() error {
func (conn *Conn) inWorker() {
for {
msg, err := conn.ReadMessage()
- if err == nil {
- conn.eavesdroppedLck.Lock()
- if conn.eavesdropped != nil {
- select {
- case conn.eavesdropped <- msg:
- default:
- }
- conn.eavesdroppedLck.Unlock()
- continue
- }
- conn.eavesdroppedLck.Unlock()
- dest, _ := msg.Headers[FieldDestination].value.(string)
- found := false
- if dest == "" {
- found = true
- } else {
- conn.namesLck.RLock()
- if len(conn.names) == 0 {
- found = true
- }
- for _, v := range conn.names {
- if dest == v {
- found = true
- break
- }
- }
- conn.namesLck.RUnlock()
- }
- if !found {
- // Eavesdropped a message, but no channel for it is registered.
- // Ignore it.
- continue
- }
- switch msg.Type {
- case TypeMethodReply, TypeError:
- serial := msg.Headers[FieldReplySerial].value.(uint32)
- conn.callsLck.Lock()
- if c, ok := conn.calls[serial]; ok {
- if msg.Type == TypeError {
- name, _ := msg.Headers[FieldErrorName].value.(string)
- c.Err = Error{name, msg.Body}
- } else {
- c.Body = msg.Body
- }
- c.Done <- c
- conn.serialLck.Lock()
- delete(conn.serialUsed, serial)
- conn.serialLck.Unlock()
- delete(conn.calls, serial)
- }
- conn.callsLck.Unlock()
- case TypeSignal:
- iface := msg.Headers[FieldInterface].value.(string)
- member := msg.Headers[FieldMember].value.(string)
- // as per http://dbus.freedesktop.org/doc/dbus-specification.html ,
- // sender is optional for signals.
- sender, _ := msg.Headers[FieldSender].value.(string)
- if iface == "org.freedesktop.DBus" && sender == "org.freedesktop.DBus" {
- if member == "NameLost" {
- // If we lost the name on the bus, remove it from our
- // tracking list.
- name, ok := msg.Body[0].(string)
- if !ok {
- panic("Unable to read the lost name")
- }
- conn.namesLck.Lock()
- for i, v := range conn.names {
- if v == name {
- conn.names = append(conn.names[:i],
- conn.names[i+1:]...)
- }
- }
- conn.namesLck.Unlock()
- } else if member == "NameAcquired" {
- // If we acquired the name on the bus, add it to our
- // tracking list.
- name, ok := msg.Body[0].(string)
- if !ok {
- panic("Unable to read the acquired name")
- }
- conn.namesLck.Lock()
- conn.names = append(conn.names, name)
- conn.namesLck.Unlock()
- }
- }
- conn.handleSignal(msg)
- case TypeMethodCall:
- go conn.handleCall(msg)
+ if err != nil {
+ if _, ok := err.(InvalidMessageError); !ok {
+ // Some read error occured (usually EOF); we can't really do
+ // anything but to shut down all stuff and returns errors to all
+ // pending replies.
+ conn.Close()
+ conn.calls.finalizeAllWithError(err)
+ return
}
- } else if _, ok := err.(InvalidMessageError); !ok {
- // Some read error occured (usually EOF); we can't really do
- // anything but to shut down all stuff and returns errors to all
- // pending replies.
- conn.Close()
- conn.callsLck.RLock()
- for _, v := range conn.calls {
- v.Err = err
- v.Done <- v
+ // invalid messages are ignored
+ continue
+ }
+ conn.eavesdroppedLck.Lock()
+ if conn.eavesdropped != nil {
+ select {
+ case conn.eavesdropped <- msg:
+ default:
}
- conn.callsLck.RUnlock()
- return
+ conn.eavesdroppedLck.Unlock()
+ continue
+ }
+ conn.eavesdroppedLck.Unlock()
+ dest, _ := msg.Headers[FieldDestination].value.(string)
+ found := dest == "" ||
+ !conn.names.uniqueNameIsKnown() ||
+ conn.names.isKnownName(dest)
+ if !found {
+ // Eavesdropped a message, but no channel for it is registered.
+ // Ignore it.
+ continue
+ }
+ switch msg.Type {
+ case TypeError:
+ conn.serialGen.RetireSerial(conn.calls.handleDBusError(msg))
+ case TypeMethodReply:
+ conn.serialGen.RetireSerial(conn.calls.handleReply(msg))
+ case TypeSignal:
+ conn.handleSignal(msg)
+ case TypeMethodCall:
+ go conn.handleCall(msg)
}
- // invalid messages are ignored
+
}
}
@@ -395,6 +341,25 @@ func (conn *Conn) handleSignal(msg *Message) {
// as per http://dbus.freedesktop.org/doc/dbus-specification.html ,
// sender is optional for signals.
sender, _ := msg.Headers[FieldSender].value.(string)
+ if iface == "org.freedesktop.DBus" && sender == "org.freedesktop.DBus" {
+ if member == "NameLost" {
+ // If we lost the name on the bus, remove it from our
+ // tracking list.
+ name, ok := msg.Body[0].(string)
+ if !ok {
+ panic("Unable to read the lost name")
+ }
+ conn.names.loseName(name)
+ } else if member == "NameAcquired" {
+ // If we acquired the name on the bus, add it to our
+ // tracking list.
+ name, ok := msg.Body[0].(string)
+ if !ok {
+ panic("Unable to read the acquired name")
+ }
+ conn.names.acquireName(name)
+ }
+ }
signal := &Signal{
Sender: sender,
Path: msg.Headers[FieldPath].value.(ObjectPath),
@@ -408,12 +373,7 @@ func (conn *Conn) handleSignal(msg *Message) {
// connection. The slice is always at least one element long, the first element
// being the unique name of the connection.
func (conn *Conn) Names() []string {
- conn.namesLck.RLock()
- // copy the slice so it can't be modified
- s := make([]string, len(conn.names))
- copy(s, conn.names)
- conn.namesLck.RUnlock()
- return s
+ return conn.names.listKnownNames()
}
// Object returns the object identified by the given destination name and path.
@@ -423,24 +383,17 @@ func (conn *Conn) Object(dest string, path ObjectPath) BusObject {
// outWorker runs in an own goroutine, encoding and sending messages that are
// sent to conn.out.
-func (conn *Conn) outWorker() {
- for msg := range conn.out {
- err := conn.SendMessage(msg)
- conn.callsLck.RLock()
- if err != nil {
- if c := conn.calls[msg.serial]; c != nil {
- c.Err = err
- c.Done <- c
- }
- conn.serialLck.Lock()
- delete(conn.serialUsed, msg.serial)
- conn.serialLck.Unlock()
- } else if msg.Type != TypeMethodCall {
- conn.serialLck.Lock()
- delete(conn.serialUsed, msg.serial)
- conn.serialLck.Unlock()
- }
- conn.callsLck.RUnlock()
+func (conn *Conn) sendMessage(msg *Message) {
+ conn.sendMessageAndIfClosed(msg, func() {})
+}
+
+func (conn *Conn) sendMessageAndIfClosed(msg *Message, ifClosed func()) {
+ err := conn.outHandler.sendAndIfClosed(msg, ifClosed)
+ conn.calls.handleSendError(msg, err)
+ if err != nil {
+ conn.serialGen.RetireSerial(msg.serial)
+ } else if msg.Type != TypeMethodCall {
+ conn.serialGen.RetireSerial(msg.serial)
}
}
@@ -451,8 +404,21 @@ func (conn *Conn) outWorker() {
// once the call is complete. Otherwise, ch is ignored and a Call structure is
// returned of which only the Err member is valid.
func (conn *Conn) Send(msg *Message, ch chan *Call) *Call {
- var call *Call
+ return conn.send(context.Background(), msg, ch)
+}
+// SendWithContext acts like Send but takes a context
+func (conn *Conn) SendWithContext(ctx context.Context, msg *Message, ch chan *Call) *Call {
+ return conn.send(ctx, msg, ch)
+}
+
+func (conn *Conn) send(ctx context.Context, msg *Message, ch chan *Call) *Call {
+ if ctx == nil {
+ panic("nil context")
+ }
+
+ var call *Call
+ ctx, canceler := context.WithCancel(ctx)
msg.serial = conn.getSerial()
if msg.Type == TypeMethodCall && msg.Flags&FlagNoReplyExpected == 0 {
if ch == nil {
@@ -468,26 +434,23 @@ func (conn *Conn) Send(msg *Message, ch chan *Call) *Call {
call.Method = iface + "." + member
call.Args = msg.Body
call.Done = ch
- conn.callsLck.Lock()
- conn.calls[msg.serial] = call
- conn.callsLck.Unlock()
- conn.outLck.RLock()
- if conn.closed {
- call.Err = ErrClosed
- call.Done <- call
- } else {
- conn.out <- msg
- }
- conn.outLck.RUnlock()
+ call.ctx = ctx
+ call.ctxCanceler = canceler
+ conn.calls.track(msg.serial, call)
+ go func() {
+ <-ctx.Done()
+ conn.calls.handleSendError(msg, ctx.Err())
+ }()
+ conn.sendMessageAndIfClosed(msg, func() {
+ conn.calls.handleSendError(msg, ErrClosed)
+ canceler()
+ })
} else {
- conn.outLck.RLock()
- if conn.closed {
+ canceler()
+ call = &Call{Err: nil}
+ conn.sendMessageAndIfClosed(msg, func() {
call = &Call{Err: ErrClosed}
- } else {
- conn.out <- msg
- call = &Call{Err: nil}
- }
- conn.outLck.RUnlock()
+ })
}
return call
}
@@ -520,11 +483,7 @@ func (conn *Conn) sendError(err error, dest string, serial uint32) {
if len(e.Body) > 0 {
msg.Headers[FieldSignature] = MakeVariant(SignatureOf(e.Body...))
}
- conn.outLck.RLock()
- if !conn.closed {
- conn.out <- msg
- }
- conn.outLck.RUnlock()
+ conn.sendMessage(msg)
}
// sendReply creates a method reply message corresponding to the parameters and
@@ -542,11 +501,7 @@ func (conn *Conn) sendReply(dest string, serial uint32, values ...interface{}) {
if len(values) > 0 {
msg.Headers[FieldSignature] = MakeVariant(SignatureOf(values...))
}
- conn.outLck.RLock()
- if !conn.closed {
- conn.out <- msg
- }
- conn.outLck.RUnlock()
+ conn.sendMessage(msg)
}
func (conn *Conn) defaultSignalAction(fn func(h *defaultSignalHandler, ch chan<- *Signal), ch chan<- *Signal) {
@@ -681,3 +636,212 @@ func getKey(s, key string) string {
}
return ""
}
+
+type outputHandler struct {
+ conn *Conn
+ sendLck sync.Mutex
+ closed struct {
+ isClosed bool
+ lck sync.RWMutex
+ }
+}
+
+func (h *outputHandler) sendAndIfClosed(msg *Message, ifClosed func()) error {
+ h.closed.lck.RLock()
+ defer h.closed.lck.RUnlock()
+ if h.closed.isClosed {
+ ifClosed()
+ return nil
+ }
+ h.sendLck.Lock()
+ defer h.sendLck.Unlock()
+ return h.conn.SendMessage(msg)
+}
+
+func (h *outputHandler) close() {
+ h.closed.lck.Lock()
+ defer h.closed.lck.Unlock()
+ h.closed.isClosed = true
+}
+
+type serialGenerator struct {
+ lck sync.Mutex
+ nextSerial uint32
+ serialUsed map[uint32]bool
+}
+
+func newSerialGenerator() *serialGenerator {
+ return &serialGenerator{
+ serialUsed: map[uint32]bool{0: true},
+ nextSerial: 1,
+ }
+}
+
+func (gen *serialGenerator) GetSerial() uint32 {
+ gen.lck.Lock()
+ defer gen.lck.Unlock()
+ n := gen.nextSerial
+ for gen.serialUsed[n] {
+ n++
+ }
+ gen.serialUsed[n] = true
+ gen.nextSerial = n + 1
+ return n
+}
+
+func (gen *serialGenerator) RetireSerial(serial uint32) {
+ gen.lck.Lock()
+ defer gen.lck.Unlock()
+ delete(gen.serialUsed, serial)
+}
+
+type nameTracker struct {
+ lck sync.RWMutex
+ unique string
+ names map[string]struct{}
+}
+
+func newNameTracker() *nameTracker {
+ return &nameTracker{names: map[string]struct{}{}}
+}
+func (tracker *nameTracker) acquireUniqueConnectionName(name string) {
+ tracker.lck.Lock()
+ defer tracker.lck.Unlock()
+ tracker.unique = name
+}
+func (tracker *nameTracker) acquireName(name string) {
+ tracker.lck.Lock()
+ defer tracker.lck.Unlock()
+ tracker.names[name] = struct{}{}
+}
+func (tracker *nameTracker) loseName(name string) {
+ tracker.lck.Lock()
+ defer tracker.lck.Unlock()
+ delete(tracker.names, name)
+}
+
+func (tracker *nameTracker) uniqueNameIsKnown() bool {
+ tracker.lck.RLock()
+ defer tracker.lck.RUnlock()
+ return tracker.unique != ""
+}
+func (tracker *nameTracker) isKnownName(name string) bool {
+ tracker.lck.RLock()
+ defer tracker.lck.RUnlock()
+ _, ok := tracker.names[name]
+ return ok || name == tracker.unique
+}
+func (tracker *nameTracker) listKnownNames() []string {
+ tracker.lck.RLock()
+ defer tracker.lck.RUnlock()
+ out := make([]string, 0, len(tracker.names)+1)
+ out = append(out, tracker.unique)
+ for k := range tracker.names {
+ out = append(out, k)
+ }
+ return out
+}
+
+type callTracker struct {
+ calls map[uint32]*Call
+ lck sync.RWMutex
+}
+
+func newCallTracker() *callTracker {
+ return &callTracker{calls: map[uint32]*Call{}}
+}
+
+func (tracker *callTracker) track(sn uint32, call *Call) {
+ tracker.lck.Lock()
+ tracker.calls[sn] = call
+ tracker.lck.Unlock()
+}
+
+func (tracker *callTracker) handleReply(msg *Message) uint32 {
+ serial := msg.Headers[FieldReplySerial].value.(uint32)
+ tracker.lck.RLock()
+ _, ok := tracker.calls[serial]
+ tracker.lck.RUnlock()
+ if ok {
+ tracker.finalizeWithBody(serial, msg.Body)
+ }
+ return serial
+}
+
+func (tracker *callTracker) handleDBusError(msg *Message) uint32 {
+ serial := msg.Headers[FieldReplySerial].value.(uint32)
+ tracker.lck.RLock()
+ _, ok := tracker.calls[serial]
+ tracker.lck.RUnlock()
+ if ok {
+ name, _ := msg.Headers[FieldErrorName].value.(string)
+ tracker.finalizeWithError(serial, Error{name, msg.Body})
+ }
+ return serial
+}
+
+func (tracker *callTracker) handleSendError(msg *Message, err error) {
+ if err == nil {
+ return
+ }
+ tracker.lck.RLock()
+ _, ok := tracker.calls[msg.serial]
+ tracker.lck.RUnlock()
+ if ok {
+ tracker.finalizeWithError(msg.serial, err)
+ }
+}
+
+// finalize was the only func that did not strobe Done
+func (tracker *callTracker) finalize(sn uint32) {
+ tracker.lck.Lock()
+ defer tracker.lck.Unlock()
+ c, ok := tracker.calls[sn]
+ if ok {
+ delete(tracker.calls, sn)
+ c.ContextCancel()
+ }
+ return
+}
+
+func (tracker *callTracker) finalizeWithBody(sn uint32, body []interface{}) {
+ tracker.lck.Lock()
+ c, ok := tracker.calls[sn]
+ if ok {
+ delete(tracker.calls, sn)
+ }
+ tracker.lck.Unlock()
+ if ok {
+ c.Body = body
+ c.done()
+ }
+ return
+}
+
+func (tracker *callTracker) finalizeWithError(sn uint32, err error) {
+ tracker.lck.Lock()
+ c, ok := tracker.calls[sn]
+ if ok {
+ delete(tracker.calls, sn)
+ }
+ tracker.lck.Unlock()
+ if ok {
+ c.Err = err
+ c.done()
+ }
+ return
+}
+
+func (tracker *callTracker) finalizeAllWithError(err error) {
+ tracker.lck.Lock()
+ closedCalls := make([]*Call, 0, len(tracker.calls))
+ for sn := range tracker.calls {
+ closedCalls = append(closedCalls, tracker.calls[sn])
+ }
+ tracker.calls = map[uint32]*Call{}
+ tracker.lck.Unlock()
+ for _, call := range closedCalls {
+ call.Err = err
+ call.done()
+ }
+}