aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/balancer/base/balancer.go4
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go4
-rw-r--r--vendor/google.golang.org/grpc/dialoptions.go13
-rw-r--r--vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/binarylog.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/xds.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go70
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go22
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go45
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go28
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go5
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go6
-rw-r--r--vendor/google.golang.org/grpc/regenerate.sh7
-rw-r--r--vendor/google.golang.org/grpc/resolver/map.go55
-rw-r--r--vendor/google.golang.org/grpc/server.go54
-rw-r--r--vendor/google.golang.org/grpc/stream.go64
-rw-r--r--vendor/google.golang.org/grpc/version.go2
17 files changed, 260 insertions, 127 deletions
diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go
index a67074a3a..e8dfc828a 100644
--- a/vendor/google.golang.org/grpc/balancer/base/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go
@@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
+ state: connectivity.Connecting,
}
// Initialize picker to a picker that always returns
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
@@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
+
+ b.regeneratePicker()
+ b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil
}
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index de6d41c23..0d21f2210 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -146,6 +146,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ for _, opt := range extraDialOptions {
+ opt.apply(&cc.dopts)
+ }
+
for _, opt := range opts {
opt.apply(&cc.dopts)
}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index f2f605a17..75d01ba77 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -35,6 +35,15 @@ import (
"google.golang.org/grpc/stats"
)
+func init() {
+ internal.AddExtraDialOptions = func(opt ...DialOption) {
+ extraDialOptions = append(extraDialOptions, opt...)
+ }
+ internal.ClearExtraDialOptions = func() {
+ extraDialOptions = nil
+ }
+}
+
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
@@ -70,6 +79,8 @@ type DialOption interface {
apply(*dialOptions)
}
+var extraDialOptions []DialOption
+
// EmptyDialOption does not alter the dial configuration. It can be embedded in
// another structure to build custom dial options.
//
@@ -380,7 +391,7 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
// all the RPCs and underlying network connections in this ClientConn.
func WithStatsHandler(h stats.Handler) DialOption {
return newFuncDialOption(func(o *dialOptions) {
- o.copts.StatsHandler = h
+ o.copts.StatsHandlers = append(o.copts.StatsHandlers, h)
})
}
diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
index 7ba8f4d18..08666f62a 100644
--- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
+++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
@@ -193,6 +193,8 @@ func (gsb *Balancer) ExitIdle() {
ei.ExitIdle()
return
}
+ gsb.mu.Lock()
+ defer gsb.mu.Unlock()
for sc := range balToUpdate.subconns {
sc.Connect()
}
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index 0a25ce43f..e3dfe204f 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -42,14 +42,14 @@ var binLogger Logger
var grpclogLogger = grpclog.Component("binarylog")
-// SetLogger sets the binarg logger.
+// SetLogger sets the binary logger.
//
// Only call this at init time.
func SetLogger(l Logger) {
binLogger = l
}
-// GetLogger gets the binarg logger.
+// GetLogger gets the binary logger.
//
// Only call this at init time.
func GetLogger() Logger {
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
index 7d996e51b..55aaeea8b 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
@@ -77,7 +77,7 @@ var (
// environment variable
// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to
// "true".
- XDSAggregateAndDNS = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")
+ XDSAggregateAndDNS = !strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "false")
// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
// which can be disabled by setting the environment variable
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 6d355b0b0..83018be7c 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -63,6 +63,76 @@ var (
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
+ // AddExtraServerOptions adds an array of ServerOption that will be
+ // effective globally for newly created servers. The priority will be: 1.
+ // user-provided; 2. this method; 3. default values.
+ AddExtraServerOptions interface{} // func(opt ...ServerOption)
+ // ClearExtraServerOptions clears the array of extra ServerOption. This
+ // method is useful in testing and benchmarking.
+ ClearExtraServerOptions func()
+ // AddExtraDialOptions adds an array of DialOption that will be effective
+ // globally for newly created client channels. The priority will be: 1.
+ // user-provided; 2. this method; 3. default values.
+ AddExtraDialOptions interface{} // func(opt ...DialOption)
+ // ClearExtraDialOptions clears the array of extra DialOption. This
+ // method is useful in testing and benchmarking.
+ ClearExtraDialOptions func()
+
+ // NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
+ // the provided xds bootstrap config instead of the global configuration from
+ // the supported environment variables. The resolver.Builder is meant to be
+ // used in conjunction with the grpc.WithResolvers DialOption.
+ //
+ // Testing Only
+ //
+ // This function should ONLY be used for testing and may not work with some
+ // other features, including the CSDS service.
+ NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error)
+
+ // RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
+ // Specifier Plugin for testing purposes, regardless of the XDSRLS environment
+ // variable.
+ //
+ // TODO: Remove this function once the RLS env var is removed.
+ RegisterRLSClusterSpecifierPluginForTesting func()
+
+ // UnregisterRLSClusterSpecifierPluginForTesting unregisters the RLS Cluster
+ // Specifier Plugin for testing purposes. This is needed because there is no way
+ // to unregister the RLS Cluster Specifier Plugin after registering it solely
+ // for testing purposes using RegisterRLSClusterSpecifierPluginForTesting().
+ //
+ // TODO: Remove this function once the RLS env var is removed.
+ UnregisterRLSClusterSpecifierPluginForTesting func()
+
+ // RegisterRBACHTTPFilterForTesting registers the RBAC HTTP Filter for testing
+ // purposes, regardless of the RBAC environment variable.
+ //
+ // TODO: Remove this function once the RBAC env var is removed.
+ RegisterRBACHTTPFilterForTesting func()
+
+ // UnregisterRBACHTTPFilterForTesting unregisters the RBAC HTTP Filter for
+ // testing purposes. This is needed because there is no way to unregister the
+ // HTTP Filter after registering it solely for testing purposes using
+ // RegisterRBACHTTPFilterForTesting().
+ //
+ // TODO: Remove this function once the RBAC env var is removed.
+ UnregisterRBACHTTPFilterForTesting func()
+
+ // RegisterOutlierDetectionBalancerForTesting registers the Outlier
+ // Detection Balancer for testing purposes, regardless of the Outlier
+ // Detection environment variable.
+ //
+ // TODO: Remove this function once the Outlier Detection env var is removed.
+ RegisterOutlierDetectionBalancerForTesting func()
+
+ // UnregisterOutlierDetectionBalancerForTesting unregisters the Outlier
+ // Detection Balancer for testing purposes. This is needed because there is
+ // no way to unregister the Outlier Detection Balancer after registering it
+ // solely for testing purposes using
+ // RegisterOutlierDetectionBalancerForTesting().
+ //
+ // TODO: Remove this function once the Outlier Detection env var is removed.
+ UnregisterOutlierDetectionBalancerForTesting func()
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 1c3459c2b..090120925 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -49,7 +49,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
-func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
+func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
}
@@ -138,7 +138,7 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string
- stats stats.Handler
+ stats []stats.Handler
}
func (ht *serverHandlerTransport) Close() {
@@ -228,10 +228,10 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
})
if err == nil { // transport has not been closed
- if ht.stats != nil {
- // Note: The trailer fields are compressed with hpack after this call returns.
- // No WireLength field is set here.
- ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
+ // Note: The trailer fields are compressed with hpack after this call returns.
+ // No WireLength field is set here.
+ for _, sh := range ht.stats {
+ sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
@@ -314,10 +314,10 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
if err == nil {
- if ht.stats != nil {
+ for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
- ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
+ sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
@@ -369,14 +369,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
- if ht.stats != nil {
- s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ for _, sh := range ht.stats {
+ s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress,
}
- ht.stats.HandleRPC(s.ctx, inHeader)
+ sh.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 24ca59084..be371c6e0 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -90,7 +90,7 @@ type http2Client struct {
kp keepalive.ClientParameters
keepaliveEnabled bool
- statsHandler stats.Handler
+ statsHandlers []stats.Handler
initialWindowSize int32
@@ -311,7 +311,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
- statsHandler: opts.StatsHandler,
+ statsHandlers: opts.StatsHandlers,
initialWindowSize: initialWindowSize,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
@@ -341,15 +341,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl,
}
}
- if t.statsHandler != nil {
- t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
+ for _, sh := range t.statsHandlers {
+ t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
- t.statsHandler.HandleConn(t.ctx, connBegin)
+ sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
@@ -773,24 +773,27 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
- if t.statsHandler != nil {
+ if len(t.statsHandlers) != 0 {
header, ok := metadata.FromOutgoingContext(ctx)
if ok {
header.Set("user-agent", t.userAgent)
} else {
header = metadata.Pairs("user-agent", t.userAgent)
}
- // Note: The header fields are compressed with hpack after this call returns.
- // No WireLength field is set here.
- outHeader := &stats.OutHeader{
- Client: true,
- FullMethod: callHdr.Method,
- RemoteAddr: t.remoteAddr,
- LocalAddr: t.localAddr,
- Compression: callHdr.SendCompress,
- Header: header,
+ for _, sh := range t.statsHandlers {
+ // Note: The header fields are compressed with hpack after this call returns.
+ // No WireLength field is set here.
+ // Note: Creating a new stats object to prevent pollution.
+ outHeader := &stats.OutHeader{
+ Client: true,
+ FullMethod: callHdr.Method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: callHdr.SendCompress,
+ Header: header,
+ }
+ sh.HandleRPC(s.ctx, outHeader)
}
- t.statsHandler.HandleRPC(s.ctx, outHeader)
}
return s, nil
}
@@ -916,11 +919,11 @@ func (t *http2Client) Close(err error) {
for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
- if t.statsHandler != nil {
+ for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{
Client: true,
}
- t.statsHandler.HandleConn(t.ctx, connEnd)
+ sh.HandleConn(t.ctx, connEnd)
}
}
@@ -1432,7 +1435,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
close(s.headerChan)
}
- if t.statsHandler != nil {
+ for _, sh := range t.statsHandlers {
if isHeader {
inHeader := &stats.InHeader{
Client: true,
@@ -1440,14 +1443,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
- t.statsHandler.HandleRPC(s.ctx, inHeader)
+ sh.HandleRPC(s.ctx, inHeader)
} else {
inTrailer := &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
- t.statsHandler.HandleRPC(s.ctx, inTrailer)
+ sh.HandleRPC(s.ctx, inTrailer)
}
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 45d7bd145..2b0fde334 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -82,7 +82,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
fc *trInFlow
- stats stats.Handler
+ stats []stats.Handler
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
@@ -257,7 +257,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
- stats: config.StatsHandler,
+ stats: config.StatsHandlers,
kp: kp,
idle: time.Now(),
kep: kep,
@@ -272,13 +272,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
- if t.stats != nil {
- t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ for _, sh := range t.stats {
+ t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
- t.stats.HandleConn(t.ctx, connBegin)
+ sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
@@ -570,8 +570,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.adjustWindow(s, uint32(n))
}
s.ctx = traceCtx(s.ctx, s.method)
- if t.stats != nil {
- s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ for _, sh := range t.stats {
+ s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
@@ -580,7 +580,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
}
- t.stats.HandleRPC(s.ctx, inHeader)
+ sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
@@ -996,14 +996,14 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
- if t.stats != nil {
+ for _, sh := range t.stats {
// Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
Header: s.header.Copy(),
Compression: s.sendCompress,
}
- t.stats.HandleRPC(s.Context(), outHeader)
+ sh.HandleRPC(s.Context(), outHeader)
}
return nil
}
@@ -1064,10 +1064,10 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
- if t.stats != nil {
+ for _, sh := range t.stats {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
- t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
+ sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
@@ -1222,9 +1222,9 @@ func (t *http2Server) Close() {
for _, s := range streams {
s.cancel()
}
- if t.stats != nil {
+ for _, sh := range t.stats {
connEnd := &stats.ConnEnd{}
- t.stats.HandleConn(t.ctx, connEnd)
+ sh.HandleConn(t.ctx, connEnd)
}
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index d8247bcdf..b77513068 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -322,8 +322,6 @@ type bufWriter struct {
batchSize int
conn net.Conn
err error
-
- onFlush func()
}
func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
@@ -360,9 +358,6 @@ func (w *bufWriter) Flush() error {
if w.offset == 0 {
return nil
}
- if w.onFlush != nil {
- w.onFlush()
- }
_, w.err = w.conn.Write(w.buf[:w.offset])
w.offset = 0
return w.err
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index a9ce717f1..6c3ba8515 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -523,7 +523,7 @@ type ServerConfig struct {
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
- StatsHandler stats.Handler
+ StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
@@ -553,8 +553,8 @@ type ConnectOptions struct {
CredsBundle credentials.Bundle
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams keepalive.ClientParameters
- // StatsHandler stores the handler for stats.
- StatsHandler stats.Handler
+ // StatsHandlers stores the handler for stats.
+ StatsHandlers []stats.Handler
// InitialWindowSize sets the initial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the initial window size for a connection.
diff --git a/vendor/google.golang.org/grpc/regenerate.sh b/vendor/google.golang.org/grpc/regenerate.sh
index 978b89f37..99db79faf 100644
--- a/vendor/google.golang.org/grpc/regenerate.sh
+++ b/vendor/google.golang.org/grpc/regenerate.sh
@@ -68,7 +68,6 @@ SOURCES=(
${WORKDIR}/grpc-proto/grpc/gcp/transport_security_common.proto
${WORKDIR}/grpc-proto/grpc/lookup/v1/rls.proto
${WORKDIR}/grpc-proto/grpc/lookup/v1/rls_config.proto
- ${WORKDIR}/grpc-proto/grpc/service_config/service_config.proto
${WORKDIR}/grpc-proto/grpc/testing/*.proto
${WORKDIR}/grpc-proto/grpc/core/*.proto
)
@@ -80,8 +79,7 @@ SOURCES=(
# Note that the protos listed here are all for testing purposes. All protos to
# be used externally should have a go_package option (and they don't need to be
# listed here).
-OPTS=Mgrpc/service_config/service_config.proto=/internal/proto/grpc_service_config,\
-Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core,\
+OPTS=Mgrpc/core/stats.proto=google.golang.org/grpc/interop/grpc_testing/core,\
Mgrpc/testing/benchmark_service.proto=google.golang.org/grpc/interop/grpc_testing,\
Mgrpc/testing/stats.proto=google.golang.org/grpc/interop/grpc_testing,\
Mgrpc/testing/report_qps_scenario_service.proto=google.golang.org/grpc/interop/grpc_testing,\
@@ -121,9 +119,6 @@ mv ${WORKDIR}/out/google.golang.org/grpc/lookup/grpc_lookup_v1/* ${WORKDIR}/out/
# see grpc_testing_not_regenerate/README.md for details.
rm ${WORKDIR}/out/google.golang.org/grpc/reflection/grpc_testing_not_regenerate/*.pb.go
-# grpc/service_config/service_config.proto does not have a go_package option.
-mv ${WORKDIR}/out/grpc/service_config/service_config.pb.go internal/proto/grpc_service_config
-
# grpc/testing does not have a go_package option.
mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/
mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/
diff --git a/vendor/google.golang.org/grpc/resolver/map.go b/vendor/google.golang.org/grpc/resolver/map.go
index e87ecd0ee..efcb7f3ef 100644
--- a/vendor/google.golang.org/grpc/resolver/map.go
+++ b/vendor/google.golang.org/grpc/resolver/map.go
@@ -28,25 +28,40 @@ type addressMapEntry struct {
// Multiple accesses may not be performed concurrently. Must be created via
// NewAddressMap; do not construct directly.
type AddressMap struct {
- m map[string]addressMapEntryList
+ // The underlying map is keyed by an Address with fields that we don't care
+ // about being set to their zero values. The only fields that we care about
+ // are `Addr`, `ServerName` and `Attributes`. Since we need to be able to
+ // distinguish between addresses with same `Addr` and `ServerName`, but
+ // different `Attributes`, we cannot store the `Attributes` in the map key.
+ //
+ // The comparison operation for structs work as follows:
+ // Struct values are comparable if all their fields are comparable. Two
+ // struct values are equal if their corresponding non-blank fields are equal.
+ //
+ // The value type of the map contains a slice of addresses which match the key
+ // in their `Addr` and `ServerName` fields and contain the corresponding value
+ // associated with them.
+ m map[Address]addressMapEntryList
+}
+
+func toMapKey(addr *Address) Address {
+ return Address{Addr: addr.Addr, ServerName: addr.ServerName}
}
type addressMapEntryList []*addressMapEntry
// NewAddressMap creates a new AddressMap.
func NewAddressMap() *AddressMap {
- return &AddressMap{m: make(map[string]addressMapEntryList)}
+ return &AddressMap{m: make(map[Address]addressMapEntryList)}
}
// find returns the index of addr in the addressMapEntry slice, or -1 if not
// present.
func (l addressMapEntryList) find(addr Address) int {
- if len(l) == 0 {
- return -1
- }
for i, entry := range l {
- if entry.addr.ServerName == addr.ServerName &&
- entry.addr.Attributes.Equal(addr.Attributes) {
+ // Attributes are the only thing to match on here, since `Addr` and
+ // `ServerName` are already equal.
+ if entry.addr.Attributes.Equal(addr.Attributes) {
return i
}
}
@@ -55,7 +70,8 @@ func (l addressMapEntryList) find(addr Address) int {
// Get returns the value for the address in the map, if present.
func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) {
- entryList := a.m[addr.Addr]
+ addrKey := toMapKey(&addr)
+ entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 {
return entryList[entry].value, true
}
@@ -64,17 +80,19 @@ func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) {
// Set updates or adds the value to the address in the map.
func (a *AddressMap) Set(addr Address, value interface{}) {
- entryList := a.m[addr.Addr]
+ addrKey := toMapKey(&addr)
+ entryList := a.m[addrKey]
if entry := entryList.find(addr); entry != -1 {
- a.m[addr.Addr][entry].value = value
+ entryList[entry].value = value
return
}
- a.m[addr.Addr] = append(a.m[addr.Addr], &addressMapEntry{addr: addr, value: value})
+ a.m[addrKey] = append(entryList, &addressMapEntry{addr: addr, value: value})
}
// Delete removes addr from the map.
func (a *AddressMap) Delete(addr Address) {
- entryList := a.m[addr.Addr]
+ addrKey := toMapKey(&addr)
+ entryList := a.m[addrKey]
entry := entryList.find(addr)
if entry == -1 {
return
@@ -85,7 +103,7 @@ func (a *AddressMap) Delete(addr Address) {
copy(entryList[entry:], entryList[entry+1:])
entryList = entryList[:len(entryList)-1]
}
- a.m[addr.Addr] = entryList
+ a.m[addrKey] = entryList
}
// Len returns the number of entries in the map.
@@ -107,3 +125,14 @@ func (a *AddressMap) Keys() []Address {
}
return ret
}
+
+// Values returns a slice of all current map values.
+func (a *AddressMap) Values() []interface{} {
+ ret := make([]interface{}, 0, a.Len())
+ for _, entryList := range a.m {
+ for _, entry := range entryList {
+ ret = append(ret, entry.value)
+ }
+ }
+ return ret
+}
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 65de84b30..b54f5bb57 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -73,6 +73,12 @@ func init() {
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
+ internal.AddExtraServerOptions = func(opt ...ServerOption) {
+ extraServerOptions = opt
+ }
+ internal.ClearExtraServerOptions = func() {
+ extraServerOptions = nil
+ }
}
var statusOK = status.New(codes.OK, "")
@@ -150,7 +156,7 @@ type serverOptions struct {
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
inTapHandle tap.ServerInHandle
- statsHandler stats.Handler
+ statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
@@ -174,6 +180,7 @@ var defaultServerOptions = serverOptions{
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
+var extraServerOptions []ServerOption
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
@@ -435,7 +442,7 @@ func InTapHandle(h tap.ServerInHandle) ServerOption {
// StatsHandler returns a ServerOption that sets the stats handler for the server.
func StatsHandler(h stats.Handler) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
- o.statsHandler = h
+ o.statsHandlers = append(o.statsHandlers, h)
})
}
@@ -560,6 +567,9 @@ func (s *Server) stopServerWorkers() {
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
+ for _, o := range extraServerOptions {
+ o.apply(&opts)
+ }
for _, o := range opt {
o.apply(&opts)
}
@@ -867,7 +877,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
ConnectionTimeout: s.opts.connectionTimeout,
Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
+ StatsHandlers: s.opts.statsHandlers,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
@@ -963,7 +973,7 @@ var _ http.Handler = (*Server)(nil)
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
+ st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -1076,8 +1086,10 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
}
err = t.Write(stream, hdr, payload, opts)
- if err == nil && s.opts.statsHandler != nil {
- s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
+ if err == nil {
+ for _, sh := range s.opts.statsHandlers {
+ sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
+ }
}
return err
}
@@ -1124,13 +1136,13 @@ func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerIn
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
- sh := s.opts.statsHandler
- if sh != nil || trInfo != nil || channelz.IsOn() {
+ shs := s.opts.statsHandlers
+ if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
if channelz.IsOn() {
s.incrCallsStarted()
}
var statsBegin *stats.Begin
- if sh != nil {
+ for _, sh := range shs {
beginTime := time.Now()
statsBegin = &stats.Begin{
BeginTime: beginTime,
@@ -1161,7 +1173,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
trInfo.tr.Finish()
}
- if sh != nil {
+ for _, sh := range shs {
end := &stats.End{
BeginTime: statsBegin.BeginTime,
EndTime: time.Now(),
@@ -1243,7 +1255,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
var payInfo *payloadInfo
- if sh != nil || binlog != nil {
+ if len(shs) != 0 || binlog != nil {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
@@ -1260,7 +1272,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
- if sh != nil {
+ for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
@@ -1418,16 +1430,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if channelz.IsOn() {
s.incrCallsStarted()
}
- sh := s.opts.statsHandler
+ shs := s.opts.statsHandlers
var statsBegin *stats.Begin
- if sh != nil {
+ if len(shs) != 0 {
beginTime := time.Now()
statsBegin = &stats.Begin{
BeginTime: beginTime,
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
- sh.HandleRPC(stream.Context(), statsBegin)
+ for _, sh := range shs {
+ sh.HandleRPC(stream.Context(), statsBegin)
+ }
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ss := &serverStream{
@@ -1439,10 +1453,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
- statsHandler: sh,
+ statsHandler: shs,
}
- if sh != nil || trInfo != nil || channelz.IsOn() {
+ if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
// See comment in processUnaryRPC on defers.
defer func() {
if trInfo != nil {
@@ -1456,7 +1470,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.mu.Unlock()
}
- if sh != nil {
+ if len(shs) != 0 {
end := &stats.End{
BeginTime: statsBegin.BeginTime,
EndTime: time.Now(),
@@ -1464,7 +1478,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
- sh.HandleRPC(stream.Context(), end)
+ for _, sh := range shs {
+ sh.HandleRPC(stream.Context(), end)
+ }
}
if channelz.IsOn() {
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 236fc17ec..6d82e0d7c 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -374,9 +374,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
- sh := cs.cc.dopts.copts.StatsHandler
var beginTime time.Time
- if sh != nil {
+ shs := cs.cc.dopts.copts.StatsHandlers
+ for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
@@ -414,12 +414,12 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
}
return &csAttempt{
- ctx: ctx,
- beginTime: beginTime,
- cs: cs,
- dc: cs.cc.dopts.dc,
- statsHandler: sh,
- trInfo: trInfo,
+ ctx: ctx,
+ beginTime: beginTime,
+ cs: cs,
+ dc: cs.cc.dopts.dc,
+ statsHandlers: shs,
+ trInfo: trInfo,
}, nil
}
@@ -536,8 +536,8 @@ type csAttempt struct {
// and cleared when the finish method is called.
trInfo *traceInfo
- statsHandler stats.Handler
- beginTime time.Time
+ statsHandlers []stats.Handler
+ beginTime time.Time
// set for newStream errors that may be transparently retried
allowTransparentRetry bool
@@ -960,8 +960,8 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
}
return io.EOF
}
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -971,7 +971,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
cs := a.cs
- if a.statsHandler != nil && payInfo == nil {
+ if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
}
@@ -1008,8 +1008,8 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
}
a.mu.Unlock()
}
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
@@ -1068,7 +1068,7 @@ func (a *csAttempt) finish(err error) {
ServerLoad: balancerload.Parse(tr),
})
}
- if a.statsHandler != nil {
+ for _, sh := range a.statsHandlers {
end := &stats.End{
Client: true,
BeginTime: a.beginTime,
@@ -1076,7 +1076,7 @@ func (a *csAttempt) finish(err error) {
Trailer: tr,
Error: err,
}
- a.statsHandler.HandleRPC(a.ctx, end)
+ sh.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
@@ -1445,7 +1445,7 @@ type serverStream struct {
maxSendMessageSize int
trInfo *traceInfo
- statsHandler stats.Handler
+ statsHandler []stats.Handler
binlog binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
@@ -1555,8 +1555,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
Message: data,
})
}
- if ss.statsHandler != nil {
- ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ if len(ss.statsHandler) != 0 {
+ for _, sh := range ss.statsHandler {
+ sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ }
}
return nil
}
@@ -1590,7 +1592,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
}()
var payInfo *payloadInfo
- if ss.statsHandler != nil || ss.binlog != nil {
+ if len(ss.statsHandler) != 0 || ss.binlog != nil {
payInfo = &payloadInfo{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
@@ -1605,15 +1607,17 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
- if ss.statsHandler != nil {
- ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: m,
- // TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength + headerLen,
- Length: len(payInfo.uncompressedBytes),
- })
+ if len(ss.statsHandler) != 0 {
+ for _, sh := range ss.statsHandler {
+ sh.HandleRPC(ss.s.Context(), &stats.InPayload{
+ RecvTime: time.Now(),
+ Payload: m,
+ // TODO truncate large payload.
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.wireLength + headerLen,
+ Length: len(payInfo.uncompressedBytes),
+ })
+ }
}
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientMessage{
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 5bc03f9b3..0eb2998cb 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.47.0"
+const Version = "1.48.0"