aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go81
1 files changed, 65 insertions, 16 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 5fd856a38..1f3e70d2c 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,6 +36,8 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
+ iresolver "google.golang.org/grpc/internal/resolver"
+ "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -50,14 +52,20 @@ import (
// of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
-// StreamDesc represents a streaming RPC service's method specification.
+// StreamDesc represents a streaming RPC service's method specification. Used
+// on the server when registering services and on the client when initiating
+// new streams.
type StreamDesc struct {
- StreamName string
- Handler StreamHandler
-
- // At least one of these is true.
- ServerStreams bool
- ClientStreams bool
+ // StreamName and Handler are only used when registering handlers on a
+ // server.
+ StreamName string // the name of the method excluding the service
+ Handler StreamHandler // the handler called for the method
+
+ // ServerStreams and ClientStreams are used for registering handlers on a
+ // server as well as defining RPC behavior when passed to NewClientStream
+ // and ClientConn.NewStream. At least one must be true.
+ ServerStreams bool // indicates the server can perform streaming sends
+ ClientStreams bool // indicates the client can perform streaming sends
}
// Stream defines the common interface a client or server stream has to satisfy.
@@ -164,13 +172,48 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
- c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil {
return nil, err
}
- mc := cc.GetMethodConfig(method)
+
+ var mc serviceconfig.MethodConfig
+ var onCommit func()
+ var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+ return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
+ }
+
+ rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
+ rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+
+ if rpcConfig != nil {
+ if rpcConfig.Context != nil {
+ ctx = rpcConfig.Context
+ }
+ mc = rpcConfig.MethodConfig
+ onCommit = rpcConfig.OnCommitted
+ if rpcConfig.Interceptor != nil {
+ rpcInfo.Context = nil
+ ns := newStream
+ newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
+ cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+ return cs, nil
+ }
+ }
+ }
+
+ return newStream(ctx, func() {})
+}
+
+func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
+ c := defaultCallInfo()
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
}
@@ -207,6 +250,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
+ DoneFunc: doneFunc,
}
// Set our outgoing compression according to the UseCompressor CallOption, if
@@ -272,6 +316,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cancel: cancel,
beginTime: beginTime,
firstAttempt: true,
+ onCommit: onCommit,
}
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
@@ -432,7 +477,8 @@ type clientStream struct {
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
- committed bool // active attempt committed for retry?
+ committed bool // active attempt committed for retry?
+ onCommit func()
buffer []func(a *csAttempt) error // operations to replay on retry
bufferSize int // current size of buffer
}
@@ -461,6 +507,9 @@ type csAttempt struct {
}
func (cs *clientStream) commitAttemptLocked() {
+ if !cs.committed && cs.onCommit != nil {
+ cs.onCommit()
+ }
cs.committed = true
cs.buffer = nil
}
@@ -539,8 +588,8 @@ func (cs *clientStream) shouldRetry(err error) error {
code = status.Convert(err).Code()
}
- rp := cs.methodConfig.retryPolicy
- if rp == nil || !rp.retryableStatusCodes[code] {
+ rp := cs.methodConfig.RetryPolicy
+ if rp == nil || !rp.RetryableStatusCodes[code] {
return err
}
@@ -549,7 +598,7 @@ func (cs *clientStream) shouldRetry(err error) error {
if cs.retryThrottler.throttle() {
return err
}
- if cs.numRetries+1 >= rp.maxAttempts {
+ if cs.numRetries+1 >= rp.MaxAttempts {
return err
}
@@ -558,9 +607,9 @@ func (cs *clientStream) shouldRetry(err error) error {
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
- fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
- cur := float64(rp.initialBackoff) * fact
- if max := float64(rp.maxBackoff); cur > max {
+ fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
+ cur := float64(rp.InitialBackoff) * fact
+ if max := float64(rp.MaxBackoff); cur > max {
cur = max
}
dur = time.Duration(grpcrand.Int63n(int64(cur)))