summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go1551
1 files changed, 1551 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
new file mode 100644
index 000000000..5fd856a38
--- /dev/null
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -0,0 +1,1551 @@
+/*
+ *
+ * Copyright 2014 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "io"
+ "math"
+ "strconv"
+ "sync"
+ "time"
+
+ "golang.org/x/net/trace"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/encoding"
+ "google.golang.org/grpc/internal/balancerload"
+ "google.golang.org/grpc/internal/binarylog"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/internal/transport"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
+)
+
+// StreamHandler defines the handler called by gRPC server to complete the
+// execution of a streaming RPC. If a StreamHandler returns an error, it
+// should be produced by the status package, or else gRPC will use
+// codes.Unknown as the status code and err.Error() as the status message
+// of the RPC.
+type StreamHandler func(srv interface{}, stream ServerStream) error
+
+// StreamDesc represents a streaming RPC service's method specification.
+type StreamDesc struct {
+ StreamName string
+ Handler StreamHandler
+
+ // At least one of these is true.
+ ServerStreams bool
+ ClientStreams bool
+}
+
+// Stream defines the common interface a client or server stream has to satisfy.
+//
+// Deprecated: See ClientStream and ServerStream documentation instead.
+type Stream interface {
+ // Deprecated: See ClientStream and ServerStream documentation instead.
+ Context() context.Context
+ // Deprecated: See ClientStream and ServerStream documentation instead.
+ SendMsg(m interface{}) error
+ // Deprecated: See ClientStream and ServerStream documentation instead.
+ RecvMsg(m interface{}) error
+}
+
+// ClientStream defines the client-side behavior of a streaming RPC.
+//
+// All errors returned from ClientStream methods are compatible with the
+// status package.
+type ClientStream interface {
+ // Header returns the header metadata received from the server if there
+ // is any. It blocks if the metadata is not ready to read.
+ Header() (metadata.MD, error)
+ // Trailer returns the trailer metadata from the server, if there is any.
+ // It must only be called after stream.CloseAndRecv has returned, or
+ // stream.Recv has returned a non-nil error (including io.EOF).
+ Trailer() metadata.MD
+ // CloseSend closes the send direction of the stream. It closes the stream
+ // when non-nil error is met. It is also not safe to call CloseSend
+ // concurrently with SendMsg.
+ CloseSend() error
+ // Context returns the context for this stream.
+ //
+ // It should not be called until after Header or RecvMsg has returned. Once
+ // called, subsequent client-side retries are disabled.
+ Context() context.Context
+ // SendMsg is generally called by generated code. On error, SendMsg aborts
+ // the stream. If the error was generated by the client, the status is
+ // returned directly; otherwise, io.EOF is returned and the status of
+ // the stream may be discovered using RecvMsg.
+ //
+ // SendMsg blocks until:
+ // - There is sufficient flow control to schedule m with the transport, or
+ // - The stream is done, or
+ // - The stream breaks.
+ //
+ // SendMsg does not wait until the message is received by the server. An
+ // untimely stream closure may result in lost messages. To ensure delivery,
+ // users should ensure the RPC completed successfully using RecvMsg.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not safe
+ // to call SendMsg on the same stream in different goroutines. It is also
+ // not safe to call CloseSend concurrently with SendMsg.
+ SendMsg(m interface{}) error
+ // RecvMsg blocks until it receives a message into m or the stream is
+ // done. It returns io.EOF when the stream completes successfully. On
+ // any other error, the stream is aborted and the error contains the RPC
+ // status.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not
+ // safe to call RecvMsg on the same stream in different goroutines.
+ RecvMsg(m interface{}) error
+}
+
+// NewStream creates a new Stream for the client side. This is typically
+// called by generated code. ctx is used for the lifetime of the stream.
+//
+// To ensure resources are not leaked due to the stream returned, one of the following
+// actions must be performed:
+//
+// 1. Call Close on the ClientConn.
+// 2. Cancel the context provided.
+// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
+// client-streaming RPC, for instance, might use the helper function
+// CloseAndRecv (note that CloseSend does not Recv, therefore is not
+// guaranteed to release all resources).
+// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
+//
+// If none of the above happen, a goroutine and a context will be leaked, and grpc
+// will not call the optionally-configured stats handler with a stats.End message.
+func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
+ // allow interceptor to see all applicable call options, which means those
+ // configured as defaults from dial option as well as per-call options
+ opts = combine(cc.dopts.callOptions, opts)
+
+ if cc.dopts.streamInt != nil {
+ return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
+ }
+ return newClientStream(ctx, desc, cc, method, opts...)
+}
+
+// NewClientStream is a wrapper for ClientConn.NewStream.
+func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+ return cc.NewStream(ctx, desc, method, opts...)
+}
+
+func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+ if channelz.IsOn() {
+ cc.incrCallsStarted()
+ defer func() {
+ if err != nil {
+ cc.incrCallsFailed()
+ }
+ }()
+ }
+ c := defaultCallInfo()
+ // Provide an opportunity for the first RPC to see the first service config
+ // provided by the resolver.
+ if err := cc.waitForResolvedAddrs(ctx); err != nil {
+ return nil, err
+ }
+ mc := cc.GetMethodConfig(method)
+ if mc.WaitForReady != nil {
+ c.failFast = !*mc.WaitForReady
+ }
+
+ // Possible context leak:
+ // The cancel function for the child context we create will only be called
+ // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
+ // an error is generated by SendMsg.
+ // https://github.com/grpc/grpc-go/issues/1818.
+ var cancel context.CancelFunc
+ if mc.Timeout != nil && *mc.Timeout >= 0 {
+ ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
+ } else {
+ ctx, cancel = context.WithCancel(ctx)
+ }
+ defer func() {
+ if err != nil {
+ cancel()
+ }
+ }()
+
+ for _, o := range opts {
+ if err := o.before(c); err != nil {
+ return nil, toRPCErr(err)
+ }
+ }
+ c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
+ c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+ if err := setCallInfoCodec(c); err != nil {
+ return nil, err
+ }
+
+ callHdr := &transport.CallHdr{
+ Host: cc.authority,
+ Method: method,
+ ContentSubtype: c.contentSubtype,
+ }
+
+ // Set our outgoing compression according to the UseCompressor CallOption, if
+ // set. In that case, also find the compressor from the encoding package.
+ // Otherwise, use the compressor configured by the WithCompressor DialOption,
+ // if set.
+ var cp Compressor
+ var comp encoding.Compressor
+ if ct := c.compressorType; ct != "" {
+ callHdr.SendCompress = ct
+ if ct != encoding.Identity {
+ comp = encoding.GetCompressor(ct)
+ if comp == nil {
+ return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
+ }
+ }
+ } else if cc.dopts.cp != nil {
+ callHdr.SendCompress = cc.dopts.cp.Type()
+ cp = cc.dopts.cp
+ }
+ if c.creds != nil {
+ callHdr.Creds = c.creds
+ }
+ var trInfo *traceInfo
+ if EnableTracing {
+ trInfo = &traceInfo{
+ tr: trace.New("grpc.Sent."+methodFamily(method), method),
+ firstLine: firstLine{
+ client: true,
+ },
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ trInfo.firstLine.deadline = time.Until(deadline)
+ }
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ ctx = trace.NewContext(ctx, trInfo.tr)
+ }
+ ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp)
+ sh := cc.dopts.copts.StatsHandler
+ var beginTime time.Time
+ if sh != nil {
+ ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
+ beginTime = time.Now()
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: beginTime,
+ FailFast: c.failFast,
+ }
+ sh.HandleRPC(ctx, begin)
+ }
+
+ cs := &clientStream{
+ callHdr: callHdr,
+ ctx: ctx,
+ methodConfig: &mc,
+ opts: opts,
+ callInfo: c,
+ cc: cc,
+ desc: desc,
+ codec: c.codec,
+ cp: cp,
+ comp: comp,
+ cancel: cancel,
+ beginTime: beginTime,
+ firstAttempt: true,
+ }
+ if !cc.dopts.disableRetry {
+ cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
+ }
+ cs.binlog = binarylog.GetMethodLogger(method)
+
+ // Only this initial attempt has stats/tracing.
+ // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
+ if err := cs.newAttemptLocked(sh, trInfo); err != nil {
+ cs.finish(err)
+ return nil, err
+ }
+
+ op := func(a *csAttempt) error { return a.newStream() }
+ if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
+ cs.finish(err)
+ return nil, err
+ }
+
+ if cs.binlog != nil {
+ md, _ := metadata.FromOutgoingContext(ctx)
+ logEntry := &binarylog.ClientHeader{
+ OnClientSide: true,
+ Header: md,
+ MethodName: method,
+ Authority: cs.cc.authority,
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ logEntry.Timeout = time.Until(deadline)
+ if logEntry.Timeout < 0 {
+ logEntry.Timeout = 0
+ }
+ }
+ cs.binlog.Log(logEntry)
+ }
+
+ if desc != unaryStreamDesc {
+ // Listen on cc and stream contexts to cleanup when the user closes the
+ // ClientConn or cancels the stream context. In all other cases, an error
+ // should already be injected into the recv buffer by the transport, which
+ // the client will eventually receive, and then we will cancel the stream's
+ // context in clientStream.finish.
+ go func() {
+ select {
+ case <-cc.ctx.Done():
+ cs.finish(ErrClientConnClosing)
+ case <-ctx.Done():
+ cs.finish(toRPCErr(ctx.Err()))
+ }
+ }()
+ }
+ return cs, nil
+}
+
+// newAttemptLocked creates a new attempt with a transport.
+// If it succeeds, then it replaces clientStream's attempt with this new attempt.
+func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) {
+ newAttempt := &csAttempt{
+ cs: cs,
+ dc: cs.cc.dopts.dc,
+ statsHandler: sh,
+ trInfo: trInfo,
+ }
+ defer func() {
+ if retErr != nil {
+ // This attempt is not set in the clientStream, so it's finish won't
+ // be called. Call it here for stats and trace in case they are not
+ // nil.
+ newAttempt.finish(retErr)
+ }
+ }()
+
+ if err := cs.ctx.Err(); err != nil {
+ return toRPCErr(err)
+ }
+
+ ctx := cs.ctx
+ if cs.cc.parsedTarget.Scheme == "xds" {
+ // Add extra metadata (metadata that will be added by transport) to context
+ // so the balancer can see them.
+ ctx = grpcutil.WithExtraMetadata(cs.ctx, metadata.Pairs(
+ "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
+ ))
+ }
+ t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
+ if err != nil {
+ return err
+ }
+ if trInfo != nil {
+ trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
+ }
+ newAttempt.t = t
+ newAttempt.done = done
+ cs.attempt = newAttempt
+ return nil
+}
+
+func (a *csAttempt) newStream() error {
+ cs := a.cs
+ cs.callHdr.PreviousAttempts = cs.numRetries
+ s, err := a.t.NewStream(cs.ctx, cs.callHdr)
+ if err != nil {
+ if _, ok := err.(transport.PerformedIOError); ok {
+ // Return without converting to an RPC error so retry code can
+ // inspect.
+ return err
+ }
+ return toRPCErr(err)
+ }
+ cs.attempt.s = s
+ cs.attempt.p = &parser{r: s}
+ return nil
+}
+
+// clientStream implements a client side Stream.
+type clientStream struct {
+ callHdr *transport.CallHdr
+ opts []CallOption
+ callInfo *callInfo
+ cc *ClientConn
+ desc *StreamDesc
+
+ codec baseCodec
+ cp Compressor
+ comp encoding.Compressor
+
+ cancel context.CancelFunc // cancels all attempts
+
+ sentLast bool // sent an end stream
+ beginTime time.Time
+
+ methodConfig *MethodConfig
+
+ ctx context.Context // the application's context, wrapped by stats/tracing
+
+ retryThrottler *retryThrottler // The throttler active when the RPC began.
+
+ binlog *binarylog.MethodLogger // Binary logger, can be nil.
+ // serverHeaderBinlogged is a boolean for whether server header has been
+ // logged. Server header will be logged when the first time one of those
+ // happens: stream.Header(), stream.Recv().
+ //
+ // It's only read and used by Recv() and Header(), so it doesn't need to be
+ // synchronized.
+ serverHeaderBinlogged bool
+
+ mu sync.Mutex
+ firstAttempt bool // if true, transparent retry is valid
+ numRetries int // exclusive of transparent retry attempt(s)
+ numRetriesSincePushback int // retries since pushback; to reset backoff
+ finished bool // TODO: replace with atomic cmpxchg or sync.Once?
+ // attempt is the active client stream attempt.
+ // The only place where it is written is the newAttemptLocked method and this method never writes nil.
+ // So, attempt can be nil only inside newClientStream function when clientStream is first created.
+ // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
+ // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
+ // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
+ // place where we need to check if the attempt is nil.
+ attempt *csAttempt
+ // TODO(hedging): hedging will have multiple attempts simultaneously.
+ committed bool // active attempt committed for retry?
+ buffer []func(a *csAttempt) error // operations to replay on retry
+ bufferSize int // current size of buffer
+}
+
+// csAttempt implements a single transport stream attempt within a
+// clientStream.
+type csAttempt struct {
+ cs *clientStream
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ done func(balancer.DoneInfo)
+
+ finished bool
+ dc Decompressor
+ decomp encoding.Compressor
+ decompSet bool
+
+ mu sync.Mutex // guards trInfo.tr
+ // trInfo may be nil (if EnableTracing is false).
+ // trInfo.tr is set when created (if EnableTracing is true),
+ // and cleared when the finish method is called.
+ trInfo *traceInfo
+
+ statsHandler stats.Handler
+}
+
+func (cs *clientStream) commitAttemptLocked() {
+ cs.committed = true
+ cs.buffer = nil
+}
+
+func (cs *clientStream) commitAttempt() {
+ cs.mu.Lock()
+ cs.commitAttemptLocked()
+ cs.mu.Unlock()
+}
+
+// shouldRetry returns nil if the RPC should be retried; otherwise it returns
+// the error that should be returned by the operation.
+func (cs *clientStream) shouldRetry(err error) error {
+ unprocessed := false
+ if cs.attempt.s == nil {
+ pioErr, ok := err.(transport.PerformedIOError)
+ if ok {
+ // Unwrap error.
+ err = toRPCErr(pioErr.Err)
+ } else {
+ unprocessed = true
+ }
+ if !ok && !cs.callInfo.failFast {
+ // In the event of a non-IO operation error from NewStream, we
+ // never attempted to write anything to the wire, so we can retry
+ // indefinitely for non-fail-fast RPCs.
+ return nil
+ }
+ }
+ if cs.finished || cs.committed {
+ // RPC is finished or committed; cannot retry.
+ return err
+ }
+ // Wait for the trailers.
+ if cs.attempt.s != nil {
+ <-cs.attempt.s.Done()
+ unprocessed = cs.attempt.s.Unprocessed()
+ }
+ if cs.firstAttempt && unprocessed {
+ // First attempt, stream unprocessed: transparently retry.
+ return nil
+ }
+ if cs.cc.dopts.disableRetry {
+ return err
+ }
+
+ pushback := 0
+ hasPushback := false
+ if cs.attempt.s != nil {
+ if !cs.attempt.s.TrailersOnly() {
+ return err
+ }
+
+ // TODO(retry): Move down if the spec changes to not check server pushback
+ // before considering this a failure for throttling.
+ sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
+ if len(sps) == 1 {
+ var e error
+ if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
+ channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
+ cs.retryThrottler.throttle() // This counts as a failure for throttling.
+ return err
+ }
+ hasPushback = true
+ } else if len(sps) > 1 {
+ channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
+ cs.retryThrottler.throttle() // This counts as a failure for throttling.
+ return err
+ }
+ }
+
+ var code codes.Code
+ if cs.attempt.s != nil {
+ code = cs.attempt.s.Status().Code()
+ } else {
+ code = status.Convert(err).Code()
+ }
+
+ rp := cs.methodConfig.retryPolicy
+ if rp == nil || !rp.retryableStatusCodes[code] {
+ return err
+ }
+
+ // Note: the ordering here is important; we count this as a failure
+ // only if the code matched a retryable code.
+ if cs.retryThrottler.throttle() {
+ return err
+ }
+ if cs.numRetries+1 >= rp.maxAttempts {
+ return err
+ }
+
+ var dur time.Duration
+ if hasPushback {
+ dur = time.Millisecond * time.Duration(pushback)
+ cs.numRetriesSincePushback = 0
+ } else {
+ fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
+ cur := float64(rp.initialBackoff) * fact
+ if max := float64(rp.maxBackoff); cur > max {
+ cur = max
+ }
+ dur = time.Duration(grpcrand.Int63n(int64(cur)))
+ cs.numRetriesSincePushback++
+ }
+
+ // TODO(dfawley): we could eagerly fail here if dur puts us past the
+ // deadline, but unsure if it is worth doing.
+ t := time.NewTimer(dur)
+ select {
+ case <-t.C:
+ cs.numRetries++
+ return nil
+ case <-cs.ctx.Done():
+ t.Stop()
+ return status.FromContextError(cs.ctx.Err()).Err()
+ }
+}
+
+// Returns nil if a retry was performed and succeeded; error otherwise.
+func (cs *clientStream) retryLocked(lastErr error) error {
+ for {
+ cs.attempt.finish(lastErr)
+ if err := cs.shouldRetry(lastErr); err != nil {
+ cs.commitAttemptLocked()
+ return err
+ }
+ cs.firstAttempt = false
+ if err := cs.newAttemptLocked(nil, nil); err != nil {
+ return err
+ }
+ if lastErr = cs.replayBufferLocked(); lastErr == nil {
+ return nil
+ }
+ }
+}
+
+func (cs *clientStream) Context() context.Context {
+ cs.commitAttempt()
+ // No need to lock before using attempt, since we know it is committed and
+ // cannot change.
+ return cs.attempt.s.Context()
+}
+
+func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
+ cs.mu.Lock()
+ for {
+ if cs.committed {
+ cs.mu.Unlock()
+ return op(cs.attempt)
+ }
+ a := cs.attempt
+ cs.mu.Unlock()
+ err := op(a)
+ cs.mu.Lock()
+ if a != cs.attempt {
+ // We started another attempt already.
+ continue
+ }
+ if err == io.EOF {
+ <-a.s.Done()
+ }
+ if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
+ onSuccess()
+ cs.mu.Unlock()
+ return err
+ }
+ if err := cs.retryLocked(err); err != nil {
+ cs.mu.Unlock()
+ return err
+ }
+ }
+}
+
+func (cs *clientStream) Header() (metadata.MD, error) {
+ var m metadata.MD
+ err := cs.withRetry(func(a *csAttempt) error {
+ var err error
+ m, err = a.s.Header()
+ return toRPCErr(err)
+ }, cs.commitAttemptLocked)
+ if err != nil {
+ cs.finish(err)
+ return nil, err
+ }
+ if cs.binlog != nil && !cs.serverHeaderBinlogged {
+ // Only log if binary log is on and header has not been logged.
+ logEntry := &binarylog.ServerHeader{
+ OnClientSide: true,
+ Header: m,
+ PeerAddr: nil,
+ }
+ if peer, ok := peer.FromContext(cs.Context()); ok {
+ logEntry.PeerAddr = peer.Addr
+ }
+ cs.binlog.Log(logEntry)
+ cs.serverHeaderBinlogged = true
+ }
+ return m, err
+}
+
+func (cs *clientStream) Trailer() metadata.MD {
+ // On RPC failure, we never need to retry, because usage requires that
+ // RecvMsg() returned a non-nil error before calling this function is valid.
+ // We would have retried earlier if necessary.
+ //
+ // Commit the attempt anyway, just in case users are not following those
+ // directions -- it will prevent races and should not meaningfully impact
+ // performance.
+ cs.commitAttempt()
+ if cs.attempt.s == nil {
+ return nil
+ }
+ return cs.attempt.s.Trailer()
+}
+
+func (cs *clientStream) replayBufferLocked() error {
+ a := cs.attempt
+ for _, f := range cs.buffer {
+ if err := f(a); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
+ // Note: we still will buffer if retry is disabled (for transparent retries).
+ if cs.committed {
+ return
+ }
+ cs.bufferSize += sz
+ if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
+ cs.commitAttemptLocked()
+ return
+ }
+ cs.buffer = append(cs.buffer, op)
+}
+
+func (cs *clientStream) SendMsg(m interface{}) (err error) {
+ defer func() {
+ if err != nil && err != io.EOF {
+ // Call finish on the client stream for errors generated by this SendMsg
+ // call, as these indicate problems created by this client. (Transport
+ // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
+ // error will be returned from RecvMsg eventually in that case, or be
+ // retried.)
+ cs.finish(err)
+ }
+ }()
+ if cs.sentLast {
+ return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
+ }
+ if !cs.desc.ClientStreams {
+ cs.sentLast = true
+ }
+
+ // load hdr, payload, data
+ hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
+ if err != nil {
+ return err
+ }
+
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > *cs.callInfo.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
+ }
+ msgBytes := data // Store the pointer before setting to nil. For binary logging.
+ op := func(a *csAttempt) error {
+ err := a.sendMsg(m, hdr, payload, data)
+ // nil out the message and uncomp when replaying; they are only needed for
+ // stats which is disabled for subsequent attempts.
+ m, data = nil, nil
+ return err
+ }
+ err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
+ if cs.binlog != nil && err == nil {
+ cs.binlog.Log(&binarylog.ClientMessage{
+ OnClientSide: true,
+ Message: msgBytes,
+ })
+ }
+ return
+}
+
+func (cs *clientStream) RecvMsg(m interface{}) error {
+ if cs.binlog != nil && !cs.serverHeaderBinlogged {
+ // Call Header() to binary log header if it's not already logged.
+ cs.Header()
+ }
+ var recvInfo *payloadInfo
+ if cs.binlog != nil {
+ recvInfo = &payloadInfo{}
+ }
+ err := cs.withRetry(func(a *csAttempt) error {
+ return a.recvMsg(m, recvInfo)
+ }, cs.commitAttemptLocked)
+ if cs.binlog != nil && err == nil {
+ cs.binlog.Log(&binarylog.ServerMessage{
+ OnClientSide: true,
+ Message: recvInfo.uncompressedBytes,
+ })
+ }
+ if err != nil || !cs.desc.ServerStreams {
+ // err != nil or non-server-streaming indicates end of stream.
+ cs.finish(err)
+
+ if cs.binlog != nil {
+ // finish will not log Trailer. Log Trailer here.
+ logEntry := &binarylog.ServerTrailer{
+ OnClientSide: true,
+ Trailer: cs.Trailer(),
+ Err: err,
+ }
+ if logEntry.Err == io.EOF {
+ logEntry.Err = nil
+ }
+ if peer, ok := peer.FromContext(cs.Context()); ok {
+ logEntry.PeerAddr = peer.Addr
+ }
+ cs.binlog.Log(logEntry)
+ }
+ }
+ return err
+}
+
+func (cs *clientStream) CloseSend() error {
+ if cs.sentLast {
+ // TODO: return an error and finish the stream instead, due to API misuse?
+ return nil
+ }
+ cs.sentLast = true
+ op := func(a *csAttempt) error {
+ a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
+ // Always return nil; io.EOF is the only error that might make sense
+ // instead, but there is no need to signal the client to call RecvMsg
+ // as the only use left for the stream after CloseSend is to call
+ // RecvMsg. This also matches historical behavior.
+ return nil
+ }
+ cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
+ if cs.binlog != nil {
+ cs.binlog.Log(&binarylog.ClientHalfClose{
+ OnClientSide: true,
+ })
+ }
+ // We never returned an error here for reasons.
+ return nil
+}
+
+func (cs *clientStream) finish(err error) {
+ if err == io.EOF {
+ // Ending a stream with EOF indicates a success.
+ err = nil
+ }
+ cs.mu.Lock()
+ if cs.finished {
+ cs.mu.Unlock()
+ return
+ }
+ cs.finished = true
+ cs.commitAttemptLocked()
+ if cs.attempt != nil {
+ cs.attempt.finish(err)
+ // after functions all rely upon having a stream.
+ if cs.attempt.s != nil {
+ for _, o := range cs.opts {
+ o.after(cs.callInfo, cs.attempt)
+ }
+ }
+ }
+ cs.mu.Unlock()
+ // For binary logging. only log cancel in finish (could be caused by RPC ctx
+ // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
+ //
+ // Only one of cancel or trailer needs to be logged. In the cases where
+ // users don't call RecvMsg, users must have already canceled the RPC.
+ if cs.binlog != nil && status.Code(err) == codes.Canceled {
+ cs.binlog.Log(&binarylog.Cancel{
+ OnClientSide: true,
+ })
+ }
+ if err == nil {
+ cs.retryThrottler.successfulRPC()
+ }
+ if channelz.IsOn() {
+ if err != nil {
+ cs.cc.incrCallsFailed()
+ } else {
+ cs.cc.incrCallsSucceeded()
+ }
+ }
+ cs.cancel()
+}
+
+func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
+ cs := a.cs
+ if a.trInfo != nil {
+ a.mu.Lock()
+ if a.trInfo.tr != nil {
+ a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
+ }
+ a.mu.Unlock()
+ }
+ if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
+ if !cs.desc.ClientStreams {
+ // For non-client-streaming RPCs, we return nil instead of EOF on error
+ // because the generated code requires it. finish is not called; RecvMsg()
+ // will call it with the stream's status independently.
+ return nil
+ }
+ return io.EOF
+ }
+ if a.statsHandler != nil {
+ a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
+ }
+ if channelz.IsOn() {
+ a.t.IncrMsgSent()
+ }
+ return nil
+}
+
+func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
+ cs := a.cs
+ if a.statsHandler != nil && payInfo == nil {
+ payInfo = &payloadInfo{}
+ }
+
+ if !a.decompSet {
+ // Block until we receive headers containing received message encoding.
+ if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
+ if a.dc == nil || a.dc.Type() != ct {
+ // No configured decompressor, or it does not match the incoming
+ // message encoding; attempt to find a registered compressor that does.
+ a.dc = nil
+ a.decomp = encoding.GetCompressor(ct)
+ }
+ } else {
+ // No compression is used; disable our decompressor.
+ a.dc = nil
+ }
+ // Only initialize this state once per stream.
+ a.decompSet = true
+ }
+ err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
+ if err != nil {
+ if err == io.EOF {
+ if statusErr := a.s.Status().Err(); statusErr != nil {
+ return statusErr
+ }
+ return io.EOF // indicates successful end of stream.
+ }
+ return toRPCErr(err)
+ }
+ if a.trInfo != nil {
+ a.mu.Lock()
+ if a.trInfo.tr != nil {
+ a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
+ }
+ a.mu.Unlock()
+ }
+ if a.statsHandler != nil {
+ a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
+ Client: true,
+ RecvTime: time.Now(),
+ Payload: m,
+ // TODO truncate large payload.
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.wireLength + headerLen,
+ Length: len(payInfo.uncompressedBytes),
+ })
+ }
+ if channelz.IsOn() {
+ a.t.IncrMsgRecv()
+ }
+ if cs.desc.ServerStreams {
+ // Subsequent messages should be received by subsequent RecvMsg calls.
+ return nil
+ }
+ // Special handling for non-server-stream rpcs.
+ // This recv expects EOF or errors, so we don't collect inPayload.
+ err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
+ if err == nil {
+ return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
+ }
+ if err == io.EOF {
+ return a.s.Status().Err() // non-server streaming Recv returns nil on success
+ }
+ return toRPCErr(err)
+}
+
+func (a *csAttempt) finish(err error) {
+ a.mu.Lock()
+ if a.finished {
+ a.mu.Unlock()
+ return
+ }
+ a.finished = true
+ if err == io.EOF {
+ // Ending a stream with EOF indicates a success.
+ err = nil
+ }
+ var tr metadata.MD
+ if a.s != nil {
+ a.t.CloseStream(a.s, err)
+ tr = a.s.Trailer()
+ }
+
+ if a.done != nil {
+ br := false
+ if a.s != nil {
+ br = a.s.BytesReceived()
+ }
+ a.done(balancer.DoneInfo{
+ Err: err,
+ Trailer: tr,
+ BytesSent: a.s != nil,
+ BytesReceived: br,
+ ServerLoad: balancerload.Parse(tr),
+ })
+ }
+ if a.statsHandler != nil {
+ end := &stats.End{
+ Client: true,
+ BeginTime: a.cs.beginTime,
+ EndTime: time.Now(),
+ Trailer: tr,
+ Error: err,
+ }
+ a.statsHandler.HandleRPC(a.cs.ctx, end)
+ }
+ if a.trInfo != nil && a.trInfo.tr != nil {
+ if err == nil {
+ a.trInfo.tr.LazyPrintf("RPC: [OK]")
+ } else {
+ a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
+ a.trInfo.tr.SetError()
+ }
+ a.trInfo.tr.Finish()
+ a.trInfo.tr = nil
+ }
+ a.mu.Unlock()
+}
+
+// newClientStream creates a ClientStream with the specified transport, on the
+// given addrConn.
+//
+// It's expected that the given transport is either the same one in addrConn, or
+// is already closed. To avoid race, transport is specified separately, instead
+// of using ac.transpot.
+//
+// Main difference between this and ClientConn.NewStream:
+// - no retry
+// - no service config (or wait for service config)
+// - no tracing or stats
+func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
+ if t == nil {
+ // TODO: return RPC error here?
+ return nil, errors.New("transport provided is nil")
+ }
+ // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
+ c := &callInfo{}
+
+ // Possible context leak:
+ // The cancel function for the child context we create will only be called
+ // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
+ // an error is generated by SendMsg.
+ // https://github.com/grpc/grpc-go/issues/1818.
+ ctx, cancel := context.WithCancel(ctx)
+ defer func() {
+ if err != nil {
+ cancel()
+ }
+ }()
+
+ for _, o := range opts {
+ if err := o.before(c); err != nil {
+ return nil, toRPCErr(err)
+ }
+ }
+ c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
+ c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
+ if err := setCallInfoCodec(c); err != nil {
+ return nil, err
+ }
+
+ callHdr := &transport.CallHdr{
+ Host: ac.cc.authority,
+ Method: method,
+ ContentSubtype: c.contentSubtype,
+ }
+
+ // Set our outgoing compression according to the UseCompressor CallOption, if
+ // set. In that case, also find the compressor from the encoding package.
+ // Otherwise, use the compressor configured by the WithCompressor DialOption,
+ // if set.
+ var cp Compressor
+ var comp encoding.Compressor
+ if ct := c.compressorType; ct != "" {
+ callHdr.SendCompress = ct
+ if ct != encoding.Identity {
+ comp = encoding.GetCompressor(ct)
+ if comp == nil {
+ return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
+ }
+ }
+ } else if ac.cc.dopts.cp != nil {
+ callHdr.SendCompress = ac.cc.dopts.cp.Type()
+ cp = ac.cc.dopts.cp
+ }
+ if c.creds != nil {
+ callHdr.Creds = c.creds
+ }
+
+ // Use a special addrConnStream to avoid retry.
+ as := &addrConnStream{
+ callHdr: callHdr,
+ ac: ac,
+ ctx: ctx,
+ cancel: cancel,
+ opts: opts,
+ callInfo: c,
+ desc: desc,
+ codec: c.codec,
+ cp: cp,
+ comp: comp,
+ t: t,
+ }
+
+ s, err := as.t.NewStream(as.ctx, as.callHdr)
+ if err != nil {
+ err = toRPCErr(err)
+ return nil, err
+ }
+ as.s = s
+ as.p = &parser{r: s}
+ ac.incrCallsStarted()
+ if desc != unaryStreamDesc {
+ // Listen on cc and stream contexts to cleanup when the user closes the
+ // ClientConn or cancels the stream context. In all other cases, an error
+ // should already be injected into the recv buffer by the transport, which
+ // the client will eventually receive, and then we will cancel the stream's
+ // context in clientStream.finish.
+ go func() {
+ select {
+ case <-ac.ctx.Done():
+ as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
+ case <-ctx.Done():
+ as.finish(toRPCErr(ctx.Err()))
+ }
+ }()
+ }
+ return as, nil
+}
+
+type addrConnStream struct {
+ s *transport.Stream
+ ac *addrConn
+ callHdr *transport.CallHdr
+ cancel context.CancelFunc
+ opts []CallOption
+ callInfo *callInfo
+ t transport.ClientTransport
+ ctx context.Context
+ sentLast bool
+ desc *StreamDesc
+ codec baseCodec
+ cp Compressor
+ comp encoding.Compressor
+ decompSet bool
+ dc Decompressor
+ decomp encoding.Compressor
+ p *parser
+ mu sync.Mutex
+ finished bool
+}
+
+func (as *addrConnStream) Header() (metadata.MD, error) {
+ m, err := as.s.Header()
+ if err != nil {
+ as.finish(toRPCErr(err))
+ }
+ return m, err
+}
+
+func (as *addrConnStream) Trailer() metadata.MD {
+ return as.s.Trailer()
+}
+
+func (as *addrConnStream) CloseSend() error {
+ if as.sentLast {
+ // TODO: return an error and finish the stream instead, due to API misuse?
+ return nil
+ }
+ as.sentLast = true
+
+ as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
+ // Always return nil; io.EOF is the only error that might make sense
+ // instead, but there is no need to signal the client to call RecvMsg
+ // as the only use left for the stream after CloseSend is to call
+ // RecvMsg. This also matches historical behavior.
+ return nil
+}
+
+func (as *addrConnStream) Context() context.Context {
+ return as.s.Context()
+}
+
+func (as *addrConnStream) SendMsg(m interface{}) (err error) {
+ defer func() {
+ if err != nil && err != io.EOF {
+ // Call finish on the client stream for errors generated by this SendMsg
+ // call, as these indicate problems created by this client. (Transport
+ // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
+ // error will be returned from RecvMsg eventually in that case, or be
+ // retried.)
+ as.finish(err)
+ }
+ }()
+ if as.sentLast {
+ return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
+ }
+ if !as.desc.ClientStreams {
+ as.sentLast = true
+ }
+
+ // load hdr, payload, data
+ hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
+ if err != nil {
+ return err
+ }
+
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payld) > *as.callInfo.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
+ }
+
+ if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
+ if !as.desc.ClientStreams {
+ // For non-client-streaming RPCs, we return nil instead of EOF on error
+ // because the generated code requires it. finish is not called; RecvMsg()
+ // will call it with the stream's status independently.
+ return nil
+ }
+ return io.EOF
+ }
+
+ if channelz.IsOn() {
+ as.t.IncrMsgSent()
+ }
+ return nil
+}
+
+func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
+ defer func() {
+ if err != nil || !as.desc.ServerStreams {
+ // err != nil or non-server-streaming indicates end of stream.
+ as.finish(err)
+ }
+ }()
+
+ if !as.decompSet {
+ // Block until we receive headers containing received message encoding.
+ if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
+ if as.dc == nil || as.dc.Type() != ct {
+ // No configured decompressor, or it does not match the incoming
+ // message encoding; attempt to find a registered compressor that does.
+ as.dc = nil
+ as.decomp = encoding.GetCompressor(ct)
+ }
+ } else {
+ // No compression is used; disable our decompressor.
+ as.dc = nil
+ }
+ // Only initialize this state once per stream.
+ as.decompSet = true
+ }
+ err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
+ if err != nil {
+ if err == io.EOF {
+ if statusErr := as.s.Status().Err(); statusErr != nil {
+ return statusErr
+ }
+ return io.EOF // indicates successful end of stream.
+ }
+ return toRPCErr(err)
+ }
+
+ if channelz.IsOn() {
+ as.t.IncrMsgRecv()
+ }
+ if as.desc.ServerStreams {
+ // Subsequent messages should be received by subsequent RecvMsg calls.
+ return nil
+ }
+
+ // Special handling for non-server-stream rpcs.
+ // This recv expects EOF or errors, so we don't collect inPayload.
+ err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
+ if err == nil {
+ return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
+ }
+ if err == io.EOF {
+ return as.s.Status().Err() // non-server streaming Recv returns nil on success
+ }
+ return toRPCErr(err)
+}
+
+func (as *addrConnStream) finish(err error) {
+ as.mu.Lock()
+ if as.finished {
+ as.mu.Unlock()
+ return
+ }
+ as.finished = true
+ if err == io.EOF {
+ // Ending a stream with EOF indicates a success.
+ err = nil
+ }
+ if as.s != nil {
+ as.t.CloseStream(as.s, err)
+ }
+
+ if err != nil {
+ as.ac.incrCallsFailed()
+ } else {
+ as.ac.incrCallsSucceeded()
+ }
+ as.cancel()
+ as.mu.Unlock()
+}
+
+// ServerStream defines the server-side behavior of a streaming RPC.
+//
+// All errors returned from ServerStream methods are compatible with the
+// status package.
+type ServerStream interface {
+ // SetHeader sets the header metadata. It may be called multiple times.
+ // When call multiple times, all the provided metadata will be merged.
+ // All the metadata will be sent out when one of the following happens:
+ // - ServerStream.SendHeader() is called;
+ // - The first response is sent out;
+ // - An RPC status is sent out (error or success).
+ SetHeader(metadata.MD) error
+ // SendHeader sends the header metadata.
+ // The provided md and headers set by SetHeader() will be sent.
+ // It fails if called multiple times.
+ SendHeader(metadata.MD) error
+ // SetTrailer sets the trailer metadata which will be sent with the RPC status.
+ // When called more than once, all the provided metadata will be merged.
+ SetTrailer(metadata.MD)
+ // Context returns the context for this stream.
+ Context() context.Context
+ // SendMsg sends a message. On error, SendMsg aborts the stream and the
+ // error is returned directly.
+ //
+ // SendMsg blocks until:
+ // - There is sufficient flow control to schedule m with the transport, or
+ // - The stream is done, or
+ // - The stream breaks.
+ //
+ // SendMsg does not wait until the message is received by the client. An
+ // untimely stream closure may result in lost messages.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not safe
+ // to call SendMsg on the same stream in different goroutines.
+ SendMsg(m interface{}) error
+ // RecvMsg blocks until it receives a message into m or the stream is
+ // done. It returns io.EOF when the client has performed a CloseSend. On
+ // any non-EOF error, the stream is aborted and the error contains the
+ // RPC status.
+ //
+ // It is safe to have a goroutine calling SendMsg and another goroutine
+ // calling RecvMsg on the same stream at the same time, but it is not
+ // safe to call RecvMsg on the same stream in different goroutines.
+ RecvMsg(m interface{}) error
+}
+
+// serverStream implements a server side Stream.
+type serverStream struct {
+ ctx context.Context
+ t transport.ServerTransport
+ s *transport.Stream
+ p *parser
+ codec baseCodec
+
+ cp Compressor
+ dc Decompressor
+ comp encoding.Compressor
+ decomp encoding.Compressor
+
+ maxReceiveMessageSize int
+ maxSendMessageSize int
+ trInfo *traceInfo
+
+ statsHandler stats.Handler
+
+ binlog *binarylog.MethodLogger
+ // serverHeaderBinlogged indicates whether server header has been logged. It
+ // will happen when one of the following two happens: stream.SendHeader(),
+ // stream.Send().
+ //
+ // It's only checked in send and sendHeader, doesn't need to be
+ // synchronized.
+ serverHeaderBinlogged bool
+
+ mu sync.Mutex // protects trInfo.tr after the service handler runs.
+}
+
+func (ss *serverStream) Context() context.Context {
+ return ss.ctx
+}
+
+func (ss *serverStream) SetHeader(md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ return ss.s.SetHeader(md)
+}
+
+func (ss *serverStream) SendHeader(md metadata.MD) error {
+ err := ss.t.WriteHeader(ss.s, md)
+ if ss.binlog != nil && !ss.serverHeaderBinlogged {
+ h, _ := ss.s.Header()
+ ss.binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ ss.serverHeaderBinlogged = true
+ }
+ return err
+}
+
+func (ss *serverStream) SetTrailer(md metadata.MD) {
+ if md.Len() == 0 {
+ return
+ }
+ ss.s.SetTrailer(md)
+}
+
+func (ss *serverStream) SendMsg(m interface{}) (err error) {
+ defer func() {
+ if ss.trInfo != nil {
+ ss.mu.Lock()
+ if ss.trInfo.tr != nil {
+ if err == nil {
+ ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
+ } else {
+ ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ ss.trInfo.tr.SetError()
+ }
+ }
+ ss.mu.Unlock()
+ }
+ if err != nil && err != io.EOF {
+ st, _ := status.FromError(toRPCErr(err))
+ ss.t.WriteStatus(ss.s, st)
+ // Non-user specified status was sent out. This should be an error
+ // case (as a server side Cancel maybe).
+ //
+ // This is not handled specifically now. User will return a final
+ // status from the service handler, we will log that error instead.
+ // This behavior is similar to an interceptor.
+ }
+ if channelz.IsOn() && err == nil {
+ ss.t.IncrMsgSent()
+ }
+ }()
+
+ // load hdr, payload, data
+ hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
+ if err != nil {
+ return err
+ }
+
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > ss.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
+ }
+ if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
+ return toRPCErr(err)
+ }
+ if ss.binlog != nil {
+ if !ss.serverHeaderBinlogged {
+ h, _ := ss.s.Header()
+ ss.binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ ss.serverHeaderBinlogged = true
+ }
+ ss.binlog.Log(&binarylog.ServerMessage{
+ Message: data,
+ })
+ }
+ if ss.statsHandler != nil {
+ ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ }
+ return nil
+}
+
+func (ss *serverStream) RecvMsg(m interface{}) (err error) {
+ defer func() {
+ if ss.trInfo != nil {
+ ss.mu.Lock()
+ if ss.trInfo.tr != nil {
+ if err == nil {
+ ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
+ } else if err != io.EOF {
+ ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ ss.trInfo.tr.SetError()
+ }
+ }
+ ss.mu.Unlock()
+ }
+ if err != nil && err != io.EOF {
+ st, _ := status.FromError(toRPCErr(err))
+ ss.t.WriteStatus(ss.s, st)
+ // Non-user specified status was sent out. This should be an error
+ // case (as a server side Cancel maybe).
+ //
+ // This is not handled specifically now. User will return a final
+ // status from the service handler, we will log that error instead.
+ // This behavior is similar to an interceptor.
+ }
+ if channelz.IsOn() && err == nil {
+ ss.t.IncrMsgRecv()
+ }
+ }()
+ var payInfo *payloadInfo
+ if ss.statsHandler != nil || ss.binlog != nil {
+ payInfo = &payloadInfo{}
+ }
+ if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
+ if err == io.EOF {
+ if ss.binlog != nil {
+ ss.binlog.Log(&binarylog.ClientHalfClose{})
+ }
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ }
+ return toRPCErr(err)
+ }
+ if ss.statsHandler != nil {
+ ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
+ RecvTime: time.Now(),
+ Payload: m,
+ // TODO truncate large payload.
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.wireLength + headerLen,
+ Length: len(payInfo.uncompressedBytes),
+ })
+ }
+ if ss.binlog != nil {
+ ss.binlog.Log(&binarylog.ClientMessage{
+ Message: payInfo.uncompressedBytes,
+ })
+ }
+ return nil
+}
+
+// MethodFromServerStream returns the method string for the input stream.
+// The returned string is in the format of "/service/method".
+func MethodFromServerStream(stream ServerStream) (string, bool) {
+ return Method(stream.Context())
+}
+
+// prepareMsg returns the hdr, payload and data
+// using the compressors passed or using the
+// passed preparedmsg
+func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
+ if preparedMsg, ok := m.(*PreparedMsg); ok {
+ return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
+ }
+ // The input interface is not a prepared msg.
+ // Marshal and Compress the data at this point
+ data, err = encode(codec, m)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ compData, err := compress(data, cp, comp)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ hdr, payload = msgHeader(data, compData)
+ return hdr, payload, data, nil
+}