summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go235
1 files changed, 134 insertions, 101 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 625d47b34..236fc17ec 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcutil"
+ imetadata "google.golang.org/grpc/internal/metadata"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/transport"
@@ -46,10 +47,12 @@ import (
)
// StreamHandler defines the handler called by gRPC server to complete the
-// execution of a streaming RPC. If a StreamHandler returns an error, it
-// should be produced by the status package, or else gRPC will use
-// codes.Unknown as the status code and err.Error() as the status message
-// of the RPC.
+// execution of a streaming RPC.
+//
+// If a StreamHandler returns an error, it should either be produced by the
+// status package, or be one of the context errors. Otherwise, gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message of the
+// RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error
// StreamDesc represents a streaming RPC service's method specification. Used
@@ -164,6 +167,11 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+ if md, _, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ if err := imetadata.Validate(md); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ }
if channelz.IsOn() {
cc.incrCallsStarted()
defer func() {
@@ -295,14 +303,28 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
cs.binlog = binarylog.GetMethodLogger(method)
- if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
+ cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
+ if err != nil {
cs.finish(err)
return nil, err
}
- op := func(a *csAttempt) error { return a.newStream() }
+ // Pick the transport to use and create a new stream on the transport.
+ // Assign cs.attempt upon success.
+ op := func(a *csAttempt) error {
+ if err := a.getTransport(); err != nil {
+ return err
+ }
+ if err := a.newStream(); err != nil {
+ return err
+ }
+ // Because this operation is always called either here (while creating
+ // the clientStream) or by the retry code while locked when replaying
+ // the operation, it is safe to access cs.attempt directly.
+ cs.attempt = a
+ return nil
+ }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
- cs.finish(err)
return nil, err
}
@@ -341,9 +363,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return cs, nil
}
-// newAttemptLocked creates a new attempt with a transport.
-// If it succeeds, then it replaces clientStream's attempt with this new attempt.
-func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
+// newAttemptLocked creates a new csAttempt without a transport or stream.
+func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
+ if err := cs.ctx.Err(); err != nil {
+ return nil, toRPCErr(err)
+ }
+ if err := cs.cc.ctx.Err(); err != nil {
+ return nil, ErrClientConnClosing
+ }
+
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler
@@ -377,44 +405,39 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
ctx = trace.NewContext(ctx, trInfo.tr)
}
- newAttempt := &csAttempt{
+ if cs.cc.parsedTarget.Scheme == "xds" {
+ // Add extra metadata (metadata that will be added by transport) to context
+ // so the balancer can see them.
+ ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
+ "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
+ ))
+ }
+
+ return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
- }
- defer func() {
- if retErr != nil {
- // This attempt is not set in the clientStream, so it's finish won't
- // be called. Call it here for stats and trace in case they are not
- // nil.
- newAttempt.finish(retErr)
- }
- }()
+ }, nil
+}
- if err := ctx.Err(); err != nil {
- return toRPCErr(err)
- }
+func (a *csAttempt) getTransport() error {
+ cs := a.cs
- if cs.cc.parsedTarget.Scheme == "xds" {
- // Add extra metadata (metadata that will be added by transport) to context
- // so the balancer can see them.
- ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
- "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
- ))
- }
- t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
+ var err error
+ a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
+ if de, ok := err.(dropError); ok {
+ err = de.error
+ a.drop = true
+ }
return err
}
- if trInfo != nil {
- trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+ if a.trInfo != nil {
+ a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
}
- newAttempt.t = t
- newAttempt.done = done
- cs.attempt = newAttempt
return nil
}
@@ -423,12 +446,21 @@ func (a *csAttempt) newStream() error {
cs.callHdr.PreviousAttempts = cs.numRetries
s, err := a.t.NewStream(a.ctx, cs.callHdr)
if err != nil {
- // Return without converting to an RPC error so retry code can
- // inspect.
- return err
+ nse, ok := err.(*transport.NewStreamError)
+ if !ok {
+ // Unexpected.
+ return err
+ }
+
+ if nse.AllowTransparentRetry {
+ a.allowTransparentRetry = true
+ }
+
+ // Unwrap and convert error.
+ return toRPCErr(nse.Err)
}
- cs.attempt.s = s
- cs.attempt.p = &parser{r: s}
+ a.s = s
+ a.p = &parser{r: s}
return nil
}
@@ -454,7 +486,7 @@ type clientStream struct {
retryThrottler *retryThrottler // The throttler active when the RPC began.
- binlog *binarylog.MethodLogger // Binary logger, can be nil.
+ binlog binarylog.MethodLogger // Binary logger, can be nil.
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
@@ -506,6 +538,11 @@ type csAttempt struct {
statsHandler stats.Handler
beginTime time.Time
+
+ // set for newStream errors that may be transparently retried
+ allowTransparentRetry bool
+ // set for pick errors that are returned as a status
+ drop bool
}
func (cs *clientStream) commitAttemptLocked() {
@@ -525,41 +562,21 @@ func (cs *clientStream) commitAttempt() {
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
// the error that should be returned by the operation. If the RPC should be
// retried, the bool indicates whether it is being retried transparently.
-func (cs *clientStream) shouldRetry(err error) (bool, error) {
- if cs.attempt.s == nil {
- // Error from NewClientStream.
- nse, ok := err.(*transport.NewStreamError)
- if !ok {
- // Unexpected, but assume no I/O was performed and the RPC is not
- // fatal, so retry indefinitely.
- return true, nil
- }
-
- // Unwrap and convert error.
- err = toRPCErr(nse.Err)
-
- // Never retry DoNotRetry errors, which indicate the RPC should not be
- // retried due to max header list size violation, etc.
- if nse.DoNotRetry {
- return false, err
- }
+func (a *csAttempt) shouldRetry(err error) (bool, error) {
+ cs := a.cs
- // In the event of a non-IO operation error from NewStream, we never
- // attempted to write anything to the wire, so we can retry
- // indefinitely.
- if !nse.DoNotTransparentRetry {
- return true, nil
- }
- }
- if cs.finished || cs.committed {
- // RPC is finished or committed; cannot retry.
+ if cs.finished || cs.committed || a.drop {
+ // RPC is finished or committed or was dropped by the picker; cannot retry.
return false, err
}
+ if a.s == nil && a.allowTransparentRetry {
+ return true, nil
+ }
// Wait for the trailers.
unprocessed := false
- if cs.attempt.s != nil {
- <-cs.attempt.s.Done()
- unprocessed = cs.attempt.s.Unprocessed()
+ if a.s != nil {
+ <-a.s.Done()
+ unprocessed = a.s.Unprocessed()
}
if cs.firstAttempt && unprocessed {
// First attempt, stream unprocessed: transparently retry.
@@ -571,14 +588,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
pushback := 0
hasPushback := false
- if cs.attempt.s != nil {
- if !cs.attempt.s.TrailersOnly() {
+ if a.s != nil {
+ if !a.s.TrailersOnly() {
return false, err
}
// TODO(retry): Move down if the spec changes to not check server pushback
// before considering this a failure for throttling.
- sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+ sps := a.s.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
@@ -595,10 +612,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}
var code codes.Code
- if cs.attempt.s != nil {
- code = cs.attempt.s.Status().Code()
+ if a.s != nil {
+ code = a.s.Status().Code()
} else {
- code = status.Convert(err).Code()
+ code = status.Code(err)
}
rp := cs.methodConfig.RetryPolicy
@@ -643,19 +660,24 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
}
// Returns nil if a retry was performed and succeeded; error otherwise.
-func (cs *clientStream) retryLocked(lastErr error) error {
+func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
for {
- cs.attempt.finish(toRPCErr(lastErr))
- isTransparent, err := cs.shouldRetry(lastErr)
+ attempt.finish(toRPCErr(lastErr))
+ isTransparent, err := attempt.shouldRetry(lastErr)
if err != nil {
cs.commitAttemptLocked()
return err
}
cs.firstAttempt = false
- if err := cs.newAttemptLocked(isTransparent); err != nil {
+ attempt, err = cs.newAttemptLocked(isTransparent)
+ if err != nil {
+ // Only returns error if the clientconn is closed or the context of
+ // the stream is canceled.
return err
}
- if lastErr = cs.replayBufferLocked(); lastErr == nil {
+ // Note that the first op in the replay buffer always sets cs.attempt
+ // if it is able to pick a transport and create a stream.
+ if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
}
}
@@ -665,7 +687,10 @@ func (cs *clientStream) Context() context.Context {
cs.commitAttempt()
// No need to lock before using attempt, since we know it is committed and
// cannot change.
- return cs.attempt.s.Context()
+ if cs.attempt.s != nil {
+ return cs.attempt.s.Context()
+ }
+ return cs.ctx
}
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
@@ -695,7 +720,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
cs.mu.Unlock()
return err
}
- if err := cs.retryLocked(err); err != nil {
+ if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
@@ -726,7 +751,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true
}
- return m, err
+ return m, nil
}
func (cs *clientStream) Trailer() metadata.MD {
@@ -744,10 +769,9 @@ func (cs *clientStream) Trailer() metadata.MD {
return cs.attempt.s.Trailer()
}
-func (cs *clientStream) replayBufferLocked() error {
- a := cs.attempt
+func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
for _, f := range cs.buffer {
- if err := f(a); err != nil {
+ if err := f(attempt); err != nil {
return err
}
}
@@ -795,22 +819,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
- msgBytes := data // Store the pointer before setting to nil. For binary logging.
op := func(a *csAttempt) error {
- err := a.sendMsg(m, hdr, payload, data)
- // nil out the message and uncomp when replaying; they are only needed for
- // stats which is disabled for subsequent attempts.
- m, data = nil, nil
- return err
+ return a.sendMsg(m, hdr, payload, data)
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
- Message: msgBytes,
+ Message: data,
})
}
- return
+ return err
}
func (cs *clientStream) RecvMsg(m interface{}) error {
@@ -1362,8 +1381,10 @@ func (as *addrConnStream) finish(err error) {
// ServerStream defines the server-side behavior of a streaming RPC.
//
-// All errors returned from ServerStream methods are compatible with the
-// status package.
+// Errors returned from ServerStream methods are compatible with the status
+// package. However, the status code will often not match the RPC status as
+// seen by the client application, and therefore, should not be relied upon for
+// this purpose.
type ServerStream interface {
// SetHeader sets the header metadata. It may be called multiple times.
// When call multiple times, all the provided metadata will be merged.
@@ -1426,7 +1447,7 @@ type serverStream struct {
statsHandler stats.Handler
- binlog *binarylog.MethodLogger
+ binlog binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(),
// stream.Send().
@@ -1446,11 +1467,20 @@ func (ss *serverStream) SetHeader(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
+ err := imetadata.Validate(md)
+ if err != nil {
+ return status.Error(codes.Internal, err.Error())
+ }
return ss.s.SetHeader(md)
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
- err := ss.t.WriteHeader(ss.s, md)
+ err := imetadata.Validate(md)
+ if err != nil {
+ return status.Error(codes.Internal, err.Error())
+ }
+
+ err = ss.t.WriteHeader(ss.s, md)
if ss.binlog != nil && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{
@@ -1465,6 +1495,9 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
if md.Len() == 0 {
return
}
+ if err := imetadata.Validate(md); err != nil {
+ logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
+ }
ss.s.SetTrailer(md)
}