summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go131
1 files changed, 68 insertions, 63 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index e224af12d..625d47b34 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -274,35 +274,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
if c.creds != nil {
callHdr.Creds = c.creds
}
- var trInfo *traceInfo
- if EnableTracing {
- trInfo = &traceInfo{
- tr: trace.New("grpc.Sent."+methodFamily(method), method),
- firstLine: firstLine{
- client: true,
- },
- }
- if deadline, ok := ctx.Deadline(); ok {
- trInfo.firstLine.deadline = time.Until(deadline)
- }
- trInfo.tr.LazyLog(&trInfo.firstLine, false)
- ctx = trace.NewContext(ctx, trInfo.tr)
- }
- ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
- sh := cc.dopts.copts.StatsHandler
- var beginTime time.Time
- if sh != nil {
- ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
- beginTime = time.Now()
- begin := &stats.Begin{
- Client: true,
- BeginTime: beginTime,
- FailFast: c.failFast,
- IsClientStream: desc.ClientStreams,
- IsServerStream: desc.ServerStreams,
- }
- sh.HandleRPC(ctx, begin)
- }
cs := &clientStream{
callHdr: callHdr,
@@ -316,7 +287,6 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
cp: cp,
comp: comp,
cancel: cancel,
- beginTime: beginTime,
firstAttempt: true,
onCommit: onCommit,
}
@@ -325,9 +295,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)
- // Only this initial attempt has stats/tracing.
- // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
- if err := cs.newAttemptLocked(sh, trInfo); err != nil {
+ if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.finish(err)
return nil, err
}
@@ -375,8 +343,43 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
// newAttemptLocked creates a new attempt with a transport.
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+ ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
+ method := cs.callHdr.Method
+ sh := cs.cc.dopts.copts.StatsHandler
+ var beginTime time.Time
+ if sh != nil {
+ ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
+ beginTime = time.Now()
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: beginTime,
+ FailFast: cs.callInfo.failFast,
+ IsClientStream: cs.desc.ClientStreams,
+ IsServerStream: cs.desc.ServerStreams,
+ IsTransparentRetryAttempt: isTransparent,
+ }
+ sh.HandleRPC(ctx, begin)
+ }
+
+ var trInfo *traceInfo
+ if EnableTracing {
+ trInfo = &traceInfo{
+ tr: trace.New("grpc.Sent."+methodFamily(method), method),
+ firstLine: firstLine{
+ client: true,
+ },
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ trInfo.firstLine.deadline = time.Until(deadline)
+ }
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ ctx = trace.NewContext(ctx, trInfo.tr)
+ }
+
newAttempt := &csAttempt{
+ ctx: ctx,
+ beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
@@ -391,15 +394,14 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
}
}()
- if err := cs.ctx.Err(); err != nil {
+ if err := ctx.Err(); err != nil {
return toRPCErr(err)
}
- ctx := cs.ctx
if cs.cc.parsedTarget.Scheme == "xds" {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
- ctx = grpcutil.WithExtraMetadata(cs.ctx, metadata.Pairs(
+ ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
))
}
@@ -419,7 +421,7 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (r
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries
- s, err := a.t.NewStream(cs.ctx, cs.callHdr)
+ s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
// Return without converting to an RPC error so retry code can
// inspect.
@@ -444,8 +446,7 @@ type clientStream struct {
cancel context.CancelFunc // cancels all attempts
- sentLast bool // sent an end stream
- beginTime time.Time
+ sentLast bool // sent an end stream
methodConfig *MethodConfig
@@ -485,6 +486,7 @@ type clientStream struct {
// csAttempt implements a single transport stream attempt within a
// clientStream.
type csAttempt struct {
+ ctx context.Context
cs *clientStream
t transport.ClientTransport
s *transport.Stream
@@ -503,6 +505,7 @@ type csAttempt struct {
trInfo *traceInfo
statsHandler stats.Handler
+ beginTime time.Time
}
func (cs *clientStream) commitAttemptLocked() {
@@ -520,15 +523,16 @@ func (cs *clientStream) commitAttempt() {
}
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
-// the error that should be returned by the operation.
-func (cs *clientStream) shouldRetry(err error) error {
+// the error that should be returned by the operation. If the RPC should be
+// retried, the bool indicates whether it is being retried transparently.
+func (cs *clientStream) shouldRetry(err error) (bool, error) {
if cs.attempt.s == nil {
// Error from NewClientStream.
nse, ok := err.(*transport.NewStreamError)
if !ok {
// Unexpected, but assume no I/O was performed and the RPC is not
// fatal, so retry indefinitely.
- return nil
+ return true, nil
}
// Unwrap and convert error.
@@ -537,19 +541,19 @@ func (cs *clientStream) shouldRetry(err error) error {
// Never retry DoNotRetry errors, which indicate the RPC should not be
// retried due to max header list size violation, etc.
if nse.DoNotRetry {
- return err
+ return false, err
}
// In the event of a non-IO operation error from NewStream, we never
// attempted to write anything to the wire, so we can retry
// indefinitely.
- if !nse.PerformedIO {
- return nil
+ if !nse.DoNotTransparentRetry {
+ return true, nil
}
}
if cs.finished || cs.committed {
// RPC is finished or committed; cannot retry.
- return err
+ return false, err
}
// Wait for the trailers.
unprocessed := false
@@ -559,17 +563,17 @@ func (cs *clientStream) shouldRetry(err error) error {
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
- return nil
+ return true, nil
}
if cs.cc.dopts.disableRetry {
- return err
+ return false, err
}
pushback := 0
hasPushback := false
if cs.attempt.s != nil {
if !cs.attempt.s.TrailersOnly() {
- return err
+ return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
@@ -580,13 +584,13 @@ func (cs *clientStream) shouldRetry(err error) error {
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
cs.retryThrottler.throttle() // This counts as a failure for throttling.
- return err
+ return false, err
}
hasPushback = true
} else if len(sps) > 1 {
channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
cs.retryThrottler.throttle() // This counts as a failure for throttling.
- return err
+ return false, err
}
}
@@ -599,16 +603,16 @@ func (cs *clientStream) shouldRetry(err error) error {
rp := cs.methodConfig.RetryPolicy
if rp == nil || !rp.RetryableStatusCodes[code] {
- return err
+ return false, err
}
// Note: the ordering here is important; we count this as a failure
// only if the code matched a retryable code.
if cs.retryThrottler.throttle() {
- return err
+ return false, err
}
if cs.numRetries+1 >= rp.MaxAttempts {
- return err
+ return false, err
}
var dur time.Duration
@@ -631,10 +635,10 @@ func (cs *clientStream) shouldRetry(err error) error {
select {
case <-t.C:
cs.numRetries++
- return nil
+ return false, nil
case <-cs.ctx.Done():
t.Stop()
- return status.FromContextError(cs.ctx.Err()).Err()
+ return false, status.FromContextError(cs.ctx.Err()).Err()
}
}
@@ -642,12 +646,13 @@ func (cs *clientStream) shouldRetry(err error) error {
func (cs *clientStream) retryLocked(lastErr error) error {
for {
cs.attempt.finish(toRPCErr(lastErr))
- if err := cs.shouldRetry(lastErr); err != nil {
+ isTransparent, err := cs.shouldRetry(lastErr)
+ if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
- if err := cs.newAttemptLocked(nil, nil); err != nil {
+ if err := cs.newAttemptLocked(isTransparent); err != nil {
return err
}
if lastErr = cs.replayBufferLocked(); lastErr == nil {
@@ -937,7 +942,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
return io.EOF
}
if a.statsHandler != nil {
- a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
+ a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -985,7 +990,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
a.mu.Unlock()
}
if a.statsHandler != nil {
- a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
+ a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
@@ -1047,12 +1052,12 @@ func (a *csAttempt) finish(err error) {
if a.statsHandler != nil {
end := &stats.End{
Client: true,
- BeginTime: a.cs.beginTime,
+ BeginTime: a.beginTime,
EndTime: time.Now(),
Trailer: tr,
Error: err,
}
- a.statsHandler.HandleRPC(a.cs.ctx, end)
+ a.statsHandler.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {