diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r-- | vendor/google.golang.org/grpc/clientconn.go | 97 |
1 files changed, 36 insertions, 61 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 24109264f..cbd671a85 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "math" + "net" "reflect" "strings" "sync" @@ -38,7 +39,6 @@ import ( "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcutil" - iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" @@ -48,7 +48,6 @@ import ( _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. - _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. ) const ( @@ -105,17 +104,6 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...) } -type defaultConfigSelector struct { - sc *ServiceConfig -} - -func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { - return &iresolver.RPCConfig{ - Context: rpcInfo.Context, - MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method), - }, nil -} - // DialContext creates a client connection to the given target. By default, it's // a non-blocking dial (the function won't wait for connections to be // established, and connecting happens in the background). To make it a blocking @@ -143,7 +131,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * firstResolveEvent: grpcsync.NewEvent(), } cc.retryThrottler.Store((*retryThrottler)(nil)) - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) cc.ctx, cc.cancel = context.WithCancel(context.Background()) for _, opt := range opts { @@ -204,6 +191,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } cc.mkp = cc.dopts.copts.KeepaliveParams + if cc.dopts.copts.Dialer == nil { + cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { + network, addr := parseDialTarget(addr) + return (&net.Dialer{}).DialContext(ctx, network, addr) + } + if cc.dopts.withProxy { + cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer) + } + } + if cc.dopts.copts.UserAgent != "" { cc.dopts.copts.UserAgent += " " + grpcUA } else { @@ -237,7 +234,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * case sc, ok := <-cc.dopts.scChan: if ok { cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) scSet = true } default: @@ -248,7 +244,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } // Determine the resolver to use. - cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil) + cc.parsedTarget = grpcutil.ParseTarget(cc.target) + unixScheme := strings.HasPrefix(cc.target, "unix:") channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) if resolverBuilder == nil { @@ -271,10 +268,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.authority = creds.Info().ServerName } else if cc.dopts.insecure && cc.dopts.authority != "" { cc.authority = cc.dopts.authority - } else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") { + } else if unixScheme { cc.authority = "localhost" - } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") { - cc.authority = "localhost" + cc.parsedTarget.Endpoint } else { // Use endpoint from "scheme://authority/endpoint" as the default // authority for ClientConn. @@ -287,7 +282,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * case sc, ok := <-cc.dopts.scChan: if ok { cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) } case <-ctx.Done(): return nil, ctx.Err() @@ -305,7 +299,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * DialCreds: credsClone, CredsBundle: cc.dopts.copts.CredsBundle, Dialer: cc.dopts.copts.Dialer, - CustomUserAgent: cc.dopts.copts.UserAgent, ChannelzParentID: cc.channelzID, Target: cc.parsedTarget, } @@ -494,8 +487,6 @@ type ClientConn struct { balancerBuildOpts balancer.BuildOptions blockingpicker *pickerWrapper - safeConfigSelector iresolver.SafeConfigSelector - mu sync.RWMutex resolverWrapper *ccResolverWrapper sc *ServiceConfig @@ -556,7 +547,6 @@ func (cc *ClientConn) scWatcher() { // TODO: load balance policy runtime change is ignored. // We may revisit this decision in the future. cc.sc = &sc - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) cc.mu.Unlock() case <-cc.ctx.Done(): return @@ -595,13 +585,13 @@ func init() { func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { if cc.sc != nil { - cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs) + cc.applyServiceConfigAndBalancer(cc.sc, addrs) return } if cc.dopts.defaultServiceConfig != nil { - cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs) + cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs) } else { - cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs) + cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) } } @@ -638,15 +628,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { // default, per the error handling design? } else { if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { - configSelector := iresolver.GetConfigSelector(s) - if configSelector != nil { - if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { - channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") - } - } else { - configSelector = &defaultConfigSelector{sc} - } - cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) + cc.applyServiceConfigAndBalancer(sc, s.Addresses) } else { ret = balancer.ErrBadResolverState if cc.balancerWrapper == nil { @@ -656,7 +638,6 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { } else { err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) } - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc}) cc.blockingpicker.updatePicker(base.NewErrPicker(err)) cc.csMgr.updateState(connectivity.TransientFailure) cc.mu.Unlock() @@ -891,20 +872,6 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { return curAddrFound } -func getMethodConfig(sc *ServiceConfig, method string) MethodConfig { - if sc == nil { - return MethodConfig{} - } - if m, ok := sc.Methods[method]; ok { - return m - } - i := strings.LastIndex(method, "/") - if m, ok := sc.Methods[method[:i+1]]; ok { - return m - } - return sc.Methods[""] -} - // GetMethodConfig gets the method config of the input method. // If there's an exact match for input method (i.e. /service/method), we return // the corresponding MethodConfig. @@ -917,7 +884,17 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { // TODO: Avoid the locking here. cc.mu.RLock() defer cc.mu.RUnlock() - return getMethodConfig(cc.sc, method) + if cc.sc == nil { + return MethodConfig{} + } + if m, ok := cc.sc.Methods[method]; ok { + return m + } + i := strings.LastIndex(method, "/") + if m, ok := cc.sc.Methods[method[:i+1]]; ok { + return m + } + return cc.sc.Methods[""] } func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { @@ -940,15 +917,12 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st return t, done, nil } -func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { +func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) { if sc == nil { // should never reach here. return } cc.sc = sc - if configSelector != nil { - cc.safeConfigSelector.UpdateConfigSelector(configSelector) - } if cc.sc.retryThrottling != nil { newThrottler := &retryThrottler{ @@ -1198,7 +1172,7 @@ func (ac *addrConn) resetTransport() { ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() - newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN")) + newTr.Close() return } ac.curAddr = addr @@ -1330,7 +1304,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne select { case <-time.After(time.Until(connectDeadline)): // We didn't get the preface in time. - newTr.Close(fmt.Errorf("failed to receive server preface within timeout")) + newTr.Close() channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) return nil, nil, errors.New("timed out waiting for server handshake") case <-prefaceReceived: @@ -1447,9 +1421,10 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { } // tearDown starts to tear down the addrConn. -// -// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct -// will leak. In most cases, call cc.removeAddrConn() instead. +// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in +// some edge cases (e.g., the caller opens and closes many addrConn's in a +// tight loop. +// tearDown doesn't remove ac from ac.cc.conns. func (ac *addrConn) tearDown(err error) { ac.mu.Lock() if ac.state == connectivity.Shutdown { |