summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go64
1 files changed, 34 insertions, 30 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 236fc17ec..6d82e0d7c 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -374,9 +374,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
- sh := cs.cc.dopts.copts.StatsHandler
var beginTime time.Time
- if sh != nil {
+ shs := cs.cc.dopts.copts.StatsHandlers
+ for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
@@ -414,12 +414,12 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
}
return &csAttempt{
- ctx: ctx,
- beginTime: beginTime,
- cs: cs,
- dc: cs.cc.dopts.dc,
- statsHandler: sh,
- trInfo: trInfo,
+ ctx: ctx,
+ beginTime: beginTime,
+ cs: cs,
+ dc: cs.cc.dopts.dc,
+ statsHandlers: shs,
+ trInfo: trInfo,
}, nil
}
@@ -536,8 +536,8 @@ type csAttempt struct {
// and cleared when the finish method is called.
trInfo *traceInfo
- statsHandler stats.Handler
- beginTime time.Time
+ statsHandlers []stats.Handler
+ beginTime time.Time
// set for newStream errors that may be transparently retried
allowTransparentRetry bool
@@ -960,8 +960,8 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
}
return io.EOF
}
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -971,7 +971,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
cs := a.cs
- if a.statsHandler != nil && payInfo == nil {
+ if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
}
@@ -1008,8 +1008,8 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
}
a.mu.Unlock()
}
- if a.statsHandler != nil {
- a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
@@ -1068,7 +1068,7 @@ func (a *csAttempt) finish(err error) {
ServerLoad: balancerload.Parse(tr),
})
}
- if a.statsHandler != nil {
+ for _, sh := range a.statsHandlers {
end := &stats.End{
Client: true,
BeginTime: a.beginTime,
@@ -1076,7 +1076,7 @@ func (a *csAttempt) finish(err error) {
Trailer: tr,
Error: err,
}
- a.statsHandler.HandleRPC(a.ctx, end)
+ sh.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
@@ -1445,7 +1445,7 @@ type serverStream struct {
maxSendMessageSize int
trInfo *traceInfo
- statsHandler stats.Handler
+ statsHandler []stats.Handler
binlog binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
@@ -1555,8 +1555,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
Message: data,
})
}
- if ss.statsHandler != nil {
- ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ if len(ss.statsHandler) != 0 {
+ for _, sh := range ss.statsHandler {
+ sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ }
}
return nil
}
@@ -1590,7 +1592,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
}()
var payInfo *payloadInfo
- if ss.statsHandler != nil || ss.binlog != nil {
+ if len(ss.statsHandler) != 0 || ss.binlog != nil {
payInfo = &payloadInfo{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
@@ -1605,15 +1607,17 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
- if ss.statsHandler != nil {
- ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: m,
- // TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength + headerLen,
- Length: len(payInfo.uncompressedBytes),
- })
+ if len(ss.statsHandler) != 0 {
+ for _, sh := range ss.statsHandler {
+ sh.HandleRPC(ss.s.Context(), &stats.InPayload{
+ RecvTime: time.Now(),
+ Payload: m,
+ // TODO truncate large payload.
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.wireLength + headerLen,
+ Length: len(payInfo.uncompressedBytes),
+ })
+ }
}
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientMessage{