aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go57
1 files changed, 35 insertions, 22 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 1f3e70d2c..e224af12d 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -295,9 +295,11 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
beginTime = time.Now()
begin := &stats.Begin{
- Client: true,
- BeginTime: beginTime,
- FailFast: c.failFast,
+ Client: true,
+ BeginTime: beginTime,
+ FailFast: c.failFast,
+ IsClientStream: desc.ClientStreams,
+ IsServerStream: desc.ServerStreams,
}
sh.HandleRPC(ctx, begin)
}
@@ -419,12 +421,9 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
if err != nil {
- if _, ok := err.(transport.PerformedIOError); ok {
- // Return without converting to an RPC error so retry code can
- // inspect.
- return err
- }
- return toRPCErr(err)
+ // Return without converting to an RPC error so retry code can
+ // inspect.
+ return err
}
cs.attempt.s = s
cs.attempt.p = &parser{r: s}
@@ -523,19 +522,28 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation.
func (cs *clientStream) shouldRetry(err error) error {
- unprocessed := false
if cs.attempt.s == nil {
- pioErr, ok := err.(transport.PerformedIOError)
- if ok {
- // Unwrap error.
- err = toRPCErr(pioErr.Err)
- } else {
- unprocessed = true
+ // Error from NewClientStream.
+ nse, ok := err.(*transport.NewStreamError)
+ if !ok {
+ // Unexpected, but assume no I/O was performed and the RPC is not
+ // fatal, so retry indefinitely.
+ return nil
}
- if !ok && !cs.callInfo.failFast {
- // In the event of a non-IO operation error from NewStream, we
- // never attempted to write anything to the wire, so we can retry
- // indefinitely for non-fail-fast RPCs.
+
+ // Unwrap and convert error.
+ err = toRPCErr(nse.Err)
+
+ // Never retry DoNotRetry errors, which indicate the RPC should not be
+ // retried due to max header list size violation, etc.
+ if nse.DoNotRetry {
+ return err
+ }
+
+ // In the event of a non-IO operation error from NewStream, we never
+ // attempted to write anything to the wire, so we can retry
+ // indefinitely.
+ if !nse.PerformedIO {
return nil
}
}
@@ -544,6 +552,7 @@ func (cs *clientStream) shouldRetry(err error) error {
return err
}
// Wait for the trailers.
+ unprocessed := false
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
unprocessed = cs.attempt.s.Unprocessed()
@@ -632,7 +641,7 @@ func (cs *clientStream) shouldRetry(err error) error {
// Returns nil if a retry was performed and succeeded; error otherwise.
func (cs *clientStream) retryLocked(lastErr error) error {
for {
- cs.attempt.finish(lastErr)
+ cs.attempt.finish(toRPCErr(lastErr))
if err := cs.shouldRetry(lastErr); err != nil {
cs.commitAttemptLocked()
return err
@@ -659,7 +668,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
for {
if cs.committed {
cs.mu.Unlock()
- return op(cs.attempt)
+ // toRPCErr is used in case the error from the attempt comes from
+ // NewClientStream, which intentionally doesn't return a status
+ // error to allow for further inspection; all other errors should
+ // already be status errors.
+ return toRPCErr(op(cs.attempt))
}
a := cs.attempt
cs.mu.Unlock()