diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r-- | vendor/google.golang.org/grpc/stream.go | 81 |
1 files changed, 16 insertions, 65 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 1f3e70d2c..5fd856a38 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -36,8 +36,6 @@ import ( "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpcutil" - iresolver "google.golang.org/grpc/internal/resolver" - "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -52,20 +50,14 @@ import ( // of the RPC. type StreamHandler func(srv interface{}, stream ServerStream) error -// StreamDesc represents a streaming RPC service's method specification. Used -// on the server when registering services and on the client when initiating -// new streams. +// StreamDesc represents a streaming RPC service's method specification. type StreamDesc struct { - // StreamName and Handler are only used when registering handlers on a - // server. - StreamName string // the name of the method excluding the service - Handler StreamHandler // the handler called for the method - - // ServerStreams and ClientStreams are used for registering handlers on a - // server as well as defining RPC behavior when passed to NewClientStream - // and ClientConn.NewStream. At least one must be true. - ServerStreams bool // indicates the server can perform streaming sends - ClientStreams bool // indicates the client can perform streaming sends + StreamName string + Handler StreamHandler + + // At least one of these is true. + ServerStreams bool + ClientStreams bool } // Stream defines the common interface a client or server stream has to satisfy. @@ -172,48 +164,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } }() } + c := defaultCallInfo() // Provide an opportunity for the first RPC to see the first service config // provided by the resolver. if err := cc.waitForResolvedAddrs(ctx); err != nil { return nil, err } - - var mc serviceconfig.MethodConfig - var onCommit func() - var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { - return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...) - } - - rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method} - rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo) - if err != nil { - return nil, toRPCErr(err) - } - - if rpcConfig != nil { - if rpcConfig.Context != nil { - ctx = rpcConfig.Context - } - mc = rpcConfig.MethodConfig - onCommit = rpcConfig.OnCommitted - if rpcConfig.Interceptor != nil { - rpcInfo.Context = nil - ns := newStream - newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { - cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns) - if err != nil { - return nil, toRPCErr(err) - } - return cs, nil - } - } - } - - return newStream(ctx, func() {}) -} - -func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) { - c := defaultCallInfo() + mc := cc.GetMethodConfig(method) if mc.WaitForReady != nil { c.failFast = !*mc.WaitForReady } @@ -250,7 +207,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client Host: cc.authority, Method: method, ContentSubtype: c.contentSubtype, - DoneFunc: doneFunc, } // Set our outgoing compression according to the UseCompressor CallOption, if @@ -316,7 +272,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client cancel: cancel, beginTime: beginTime, firstAttempt: true, - onCommit: onCommit, } if !cc.dopts.disableRetry { cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) @@ -477,8 +432,7 @@ type clientStream struct { // place where we need to check if the attempt is nil. attempt *csAttempt // TODO(hedging): hedging will have multiple attempts simultaneously. - committed bool // active attempt committed for retry? - onCommit func() + committed bool // active attempt committed for retry? buffer []func(a *csAttempt) error // operations to replay on retry bufferSize int // current size of buffer } @@ -507,9 +461,6 @@ type csAttempt struct { } func (cs *clientStream) commitAttemptLocked() { - if !cs.committed && cs.onCommit != nil { - cs.onCommit() - } cs.committed = true cs.buffer = nil } @@ -588,8 +539,8 @@ func (cs *clientStream) shouldRetry(err error) error { code = status.Convert(err).Code() } - rp := cs.methodConfig.RetryPolicy - if rp == nil || !rp.RetryableStatusCodes[code] { + rp := cs.methodConfig.retryPolicy + if rp == nil || !rp.retryableStatusCodes[code] { return err } @@ -598,7 +549,7 @@ func (cs *clientStream) shouldRetry(err error) error { if cs.retryThrottler.throttle() { return err } - if cs.numRetries+1 >= rp.MaxAttempts { + if cs.numRetries+1 >= rp.maxAttempts { return err } @@ -607,9 +558,9 @@ func (cs *clientStream) shouldRetry(err error) error { dur = time.Millisecond * time.Duration(pushback) cs.numRetriesSincePushback = 0 } else { - fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback)) - cur := float64(rp.InitialBackoff) * fact - if max := float64(rp.MaxBackoff); cur > max { + fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) + cur := float64(rp.initialBackoff) * fact + if max := float64(rp.maxBackoff); cur > max { cur = max } dur = time.Duration(grpcrand.Int63n(int64(cur))) |