aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2021-10-22 14:18:45 +0000
committerGitHub <noreply@github.com>2021-10-22 14:18:45 +0000
commit5dd211f91b4dfe736056551b2804edb6f978a659 (patch)
treeb8e2512dfed285281200887799ab796d1d6eee12 /vendor/google.golang.org/grpc/balancer_conn_wrappers.go
parent833d92d7092c86cd8c31a72423634fb8b8cfad9a (diff)
parent087f8fc73bec664a30dcf0757cd3cb44ea150582 (diff)
downloadpodman-5dd211f91b4dfe736056551b2804edb6f978a659.tar.gz
podman-5dd211f91b4dfe736056551b2804edb6f978a659.tar.bz2
podman-5dd211f91b4dfe736056551b2804edb6f978a659.zip
Merge pull request #11991 from rhatdan/size
Allow API to specify size and inode quota
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_conn_wrappers.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go63
1 files changed, 38 insertions, 25 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 4cc7f9159..dd8397963 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -43,7 +43,8 @@ type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
- scBuffer *buffer.Unbounded
+ updateCh *buffer.Unbounded
+ closed *grpcsync.Event
done *grpcsync.Event
mu sync.Mutex
@@ -53,7 +54,8 @@ type ccBalancerWrapper struct {
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
- scBuffer: buffer.NewUnbounded(),
+ updateCh: buffer.NewUnbounded(),
+ closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
@@ -67,35 +69,53 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
- case t := <-ccb.scBuffer.Get():
- ccb.scBuffer.Load()
- if ccb.done.HasFired() {
+ case t := <-ccb.updateCh.Get():
+ ccb.updateCh.Load()
+ if ccb.closed.HasFired() {
break
}
- ccb.balancerMu.Lock()
- su := t.(*scStateUpdate)
- ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
- ccb.balancerMu.Unlock()
- case <-ccb.done.Done():
+ switch u := t.(type) {
+ case *scStateUpdate:
+ ccb.balancerMu.Lock()
+ ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
+ ccb.balancerMu.Unlock()
+ case *acBalancerWrapper:
+ ccb.mu.Lock()
+ if ccb.subConns != nil {
+ delete(ccb.subConns, u)
+ ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
+ }
+ ccb.mu.Unlock()
+ default:
+ logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
+ }
+ case <-ccb.closed.Done():
}
- if ccb.done.HasFired() {
+ if ccb.closed.HasFired() {
+ ccb.balancerMu.Lock()
ccb.balancer.Close()
+ ccb.balancerMu.Unlock()
ccb.mu.Lock()
scs := ccb.subConns
ccb.subConns = nil
ccb.mu.Unlock()
+ ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
+ ccb.done.Fire()
+ // Fire done before removing the addr conns. We can safely unblock
+ // ccb.close and allow the removeAddrConns to happen
+ // asynchronously.
for acbw := range scs {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
- ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
return
}
}
}
func (ccb *ccBalancerWrapper) close() {
- ccb.done.Fire()
+ ccb.closed.Fire()
+ <-ccb.done.Done()
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
@@ -109,7 +129,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
if sc == nil {
return
}
- ccb.scBuffer.Put(&scStateUpdate{
+ ccb.updateCh.Put(&scStateUpdate{
sc: sc,
state: s,
err: err,
@@ -150,17 +170,10 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- acbw, ok := sc.(*acBalancerWrapper)
- if !ok {
- return
- }
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- if ccb.subConns == nil {
- return
- }
- delete(ccb.subConns, acbw)
- ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
+ // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
+ // during switchBalancer() if the old balancer calls RemoveSubConn() in its
+ // Close().
+ ccb.updateCh.Put(sc)
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {