summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go97
1 files changed, 36 insertions, 61 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 24109264f..cbd671a85 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"math"
+ "net"
"reflect"
"strings"
"sync"
@@ -38,7 +39,6 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
- iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
@@ -48,7 +48,6 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
- _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)
const (
@@ -105,17 +104,6 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
-type defaultConfigSelector struct {
- sc *ServiceConfig
-}
-
-func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
- return &iresolver.RPCConfig{
- Context: rpcInfo.Context,
- MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
- }, nil
-}
-
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
@@ -143,7 +131,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
@@ -204,6 +191,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams
+ if cc.dopts.copts.Dialer == nil {
+ cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
+ network, addr := parseDialTarget(addr)
+ return (&net.Dialer{}).DialContext(ctx, network, addr)
+ }
+ if cc.dopts.withProxy {
+ cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
+ }
+ }
+
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
@@ -237,7 +234,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
scSet = true
}
default:
@@ -248,7 +244,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
// Determine the resolver to use.
- cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
+ cc.parsedTarget = grpcutil.ParseTarget(cc.target)
+ unixScheme := strings.HasPrefix(cc.target, "unix:")
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
@@ -271,10 +268,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
- } else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") {
+ } else if unixScheme {
cc.authority = "localhost"
- } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
- cc.authority = "localhost" + cc.parsedTarget.Endpoint
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
@@ -287,7 +282,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
}
case <-ctx.Done():
return nil, ctx.Err()
@@ -305,7 +299,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
- CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
@@ -494,8 +487,6 @@ type ClientConn struct {
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
- safeConfigSelector iresolver.SafeConfigSelector
-
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
@@ -556,7 +547,6 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revisit this decision in the future.
cc.sc = &sc
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -595,13 +585,13 @@ func init() {
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
if cc.sc != nil {
- cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
+ cc.applyServiceConfigAndBalancer(cc.sc, addrs)
return
}
if cc.dopts.defaultServiceConfig != nil {
- cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
+ cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
} else {
- cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
+ cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
}
}
@@ -638,15 +628,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
- configSelector := iresolver.GetConfigSelector(s)
- if configSelector != nil {
- if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
- channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
- }
- } else {
- configSelector = &defaultConfigSelector{sc}
- }
- cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
+ cc.applyServiceConfigAndBalancer(sc, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
@@ -656,7 +638,6 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
}
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
@@ -891,20 +872,6 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return curAddrFound
}
-func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
- if sc == nil {
- return MethodConfig{}
- }
- if m, ok := sc.Methods[method]; ok {
- return m
- }
- i := strings.LastIndex(method, "/")
- if m, ok := sc.Methods[method[:i+1]]; ok {
- return m
- }
- return sc.Methods[""]
-}
-
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
@@ -917,7 +884,17 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
- return getMethodConfig(cc.sc, method)
+ if cc.sc == nil {
+ return MethodConfig{}
+ }
+ if m, ok := cc.sc.Methods[method]; ok {
+ return m
+ }
+ i := strings.LastIndex(method, "/")
+ if m, ok := cc.sc.Methods[method[:i+1]]; ok {
+ return m
+ }
+ return cc.sc.Methods[""]
}
func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
@@ -940,15 +917,12 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st
return t, done, nil
}
-func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
+func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
if sc == nil {
// should never reach here.
return
}
cc.sc = sc
- if configSelector != nil {
- cc.safeConfigSelector.UpdateConfigSelector(configSelector)
- }
if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
@@ -1198,7 +1172,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
- newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
+ newTr.Close()
return
}
ac.curAddr = addr
@@ -1330,7 +1304,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select {
case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time.
- newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
+ newTr.Close()
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
@@ -1447,9 +1421,10 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
}
// tearDown starts to tear down the addrConn.
-//
-// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
-// will leak. In most cases, call cc.removeAddrConn() instead.
+// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
+// some edge cases (e.g., the caller opens and closes many addrConn's in a
+// tight loop.
+// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {