summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go81
1 files changed, 16 insertions, 65 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 1f3e70d2c..5fd856a38 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,8 +36,6 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
- iresolver "google.golang.org/grpc/internal/resolver"
- "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -52,20 +50,14 @@ import (
// of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
-// StreamDesc represents a streaming RPC service's method specification. Used
-// on the server when registering services and on the client when initiating
-// new streams.
+// StreamDesc represents a streaming RPC service's method specification.
type StreamDesc struct {
- // StreamName and Handler are only used when registering handlers on a
- // server.
- StreamName string // the name of the method excluding the service
- Handler StreamHandler // the handler called for the method
-
- // ServerStreams and ClientStreams are used for registering handlers on a
- // server as well as defining RPC behavior when passed to NewClientStream
- // and ClientConn.NewStream. At least one must be true.
- ServerStreams bool // indicates the server can perform streaming sends
- ClientStreams bool // indicates the client can perform streaming sends
+ StreamName string
+ Handler StreamHandler
+
+ // At least one of these is true.
+ ServerStreams bool
+ ClientStreams bool
}
// Stream defines the common interface a client or server stream has to satisfy.
@@ -172,48 +164,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
+ c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
return nil, err
}
-
- var mc serviceconfig.MethodConfig
- var onCommit func()
- var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
- return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
- }
-
- rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
- rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
- if err != nil {
- return nil, toRPCErr(err)
- }
-
- if rpcConfig != nil {
- if rpcConfig.Context != nil {
- ctx = rpcConfig.Context
- }
- mc = rpcConfig.MethodConfig
- onCommit = rpcConfig.OnCommitted
- if rpcConfig.Interceptor != nil {
- rpcInfo.Context = nil
- ns := newStream
- newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
- cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
- if err != nil {
- return nil, toRPCErr(err)
- }
- return cs, nil
- }
- }
- }
-
- return newStream(ctx, func() {})
-}
-
-func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
- c := defaultCallInfo()
+ mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
@@ -250,7 +207,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
- DoneFunc: doneFunc,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
@@ -316,7 +272,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
- onCommit: onCommit,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
@@ -477,8 +432,7 @@ type clientStream struct {
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
- committed bool // active attempt committed for retry?
- onCommit func()
+ committed bool // active attempt committed for retry?
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
}
@@ -507,9 +461,6 @@ type csAttempt struct {
}
func (cs *clientStream) commitAttemptLocked() {
- if !cs.committed && cs.onCommit != nil {
- cs.onCommit()
- }
cs.committed = true
cs.buffer = nil
}
@@ -588,8 +539,8 @@ func (cs *clientStream) shouldRetry(err error) error {
code = status.Convert(err).Code()
}
- rp := cs.methodConfig.RetryPolicy
- if rp == nil || !rp.RetryableStatusCodes[code] {
+ rp := cs.methodConfig.retryPolicy
+ if rp == nil || !rp.retryableStatusCodes[code] {
return err
}
@@ -598,7 +549,7 @@ func (cs *clientStream) shouldRetry(err error) error {
if cs.retryThrottler.throttle() {
return err
}
- if cs.numRetries+1 >= rp.MaxAttempts {
+ if cs.numRetries+1 >= rp.maxAttempts {
return err
}
@@ -607,9 +558,9 @@ func (cs *clientStream) shouldRetry(err error) error {
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
- fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
- cur := float64(rp.InitialBackoff) * fact
- if max := float64(rp.MaxBackoff); cur > max {
+ fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
+ cur := float64(rp.initialBackoff) * fact
+ if max := float64(rp.maxBackoff); cur > max {
cur = max
}
dur = time.Duration(grpcrand.Int63n(int64(cur)))