summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go97
1 files changed, 61 insertions, 36 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index cbd671a85..24109264f 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math"
- "net"
"reflect"
"strings"
"sync"
@@ -39,6 +38,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
+ iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
@@ -48,6 +48,7 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
+ _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)
const (
@@ -104,6 +105,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
+type defaultConfigSelector struct {
+ sc *ServiceConfig
+}
+
+func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
+ return &iresolver.RPCConfig{
+ Context: rpcInfo.Context,
+ MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
+ }, nil
+}
+
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
@@ -131,6 +143,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
@@ -191,16 +204,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams
- if cc.dopts.copts.Dialer == nil {
- cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
- network, addr := parseDialTarget(addr)
- return (&net.Dialer{}).DialContext(ctx, network, addr)
- }
- if cc.dopts.withProxy {
- cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
- }
- }
-
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
@@ -234,6 +237,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
scSet = true
}
default:
@@ -244,8 +248,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
// Determine the resolver to use.
- cc.parsedTarget = grpcutil.ParseTarget(cc.target)
- unixScheme := strings.HasPrefix(cc.target, "unix:")
+ cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
@@ -268,8 +271,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
- } else if unixScheme {
+ } else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
cc.authority = "localhost"
+ } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
+ cc.authority = "localhost" + cc.parsedTarget.Endpoint
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
@@ -282,6 +287,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
}
case <-ctx.Done():
return nil, ctx.Err()
@@ -299,6 +305,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
+ CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
@@ -487,6 +494,8 @@ type ClientConn struct {
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
+ safeConfigSelector iresolver.SafeConfigSelector
+
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
@@ -547,6 +556,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
cc.sc = &sc
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -585,13 +595,13 @@ func init() {
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
- cc.applyServiceConfigAndBalancer(cc.sc, addrs)
+ cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
- cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
+ cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
} else {
- cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
+ cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
}
}
@@ -628,7 +638,15 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
- cc.applyServiceConfigAndBalancer(sc, s.Addresses)
+ configSelector := iresolver.GetConfigSelector(s)
+ if configSelector != nil {
+ if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
+ channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
+ }
+ } else {
+ configSelector = &defaultConfigSelector{sc}
+ }
+ cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
@@ -638,6 +656,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
+ cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
@@ -872,6 +891,20 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return curAddrFound
}
+func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
+ if sc == nil {
+ return MethodConfig{}
+ }
+ if m, ok := sc.Methods[method]; ok {
+ return m
+ }
+ i := strings.LastIndex(method, "/")
+ if m, ok := sc.Methods[method[:i+1]]; ok {
+ return m
+ }
+ return sc.Methods[""]
+}
+
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
@@ -884,17 +917,7 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
- if cc.sc == nil {
- return MethodConfig{}
- }
- if m, ok := cc.sc.Methods[method]; ok {
- return m
- }
- i := strings.LastIndex(method, "/")
- if m, ok := cc.sc.Methods[method[:i+1]]; ok {
- return m
- }
- return cc.sc.Methods[""]
+ return getMethodConfig(cc.sc, method)
}
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
@@ -917,12 +940,15 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}
-func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
+func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
return
}
cc.sc = sc
+ if configSelector != nil {
+ cc.safeConfigSelector.UpdateConfigSelector(configSelector)
+ }
if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
@@ -1172,7 +1198,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- newTr.Close()
+ newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return
}
ac.curAddr = addr
@@ -1304,7 +1330,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
- newTr.Close()
+ newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
@@ -1421,10 +1447,9 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
}
// tearDown starts to tear down the addrConn.
-// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
-// some edge cases (e.g., the caller opens and closes many addrConn's in a
-// tight loop.
-// tearDown doesn't remove ac from ac.cc.conns.
+//
+// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
+// will leak. In most cases, call cc.removeAddrConn() instead.
func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {