aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/uber/jaeger-client-go/transport_udp.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/uber/jaeger-client-go/transport_udp.go')
-rw-r--r--vendor/github.com/uber/jaeger-client-go/transport_udp.go61
1 files changed, 50 insertions, 11 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/transport_udp.go b/vendor/github.com/uber/jaeger-client-go/transport_udp.go
index 7b9ccf937..7370d8007 100644
--- a/vendor/github.com/uber/jaeger-client-go/transport_udp.go
+++ b/vendor/github.com/uber/jaeger-client-go/transport_udp.go
@@ -18,8 +18,8 @@ import (
"errors"
"fmt"
+ "github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/thrift"
-
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
)
@@ -27,12 +27,14 @@ import (
// Empirically obtained constant for how many bytes in the message are used for envelope.
// The total datagram size is:
// sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
-// There is a unit test `TestEmitBatchOverhead` that validates this number.
+//
// Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
// in the batch, because the length of the list is encoded as varint32, as well as SeqId.
-const emitBatchOverhead = 30
+//
+// There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68.
+const emitBatchOverhead = 70
-var errSpanTooLarge = errors.New("Span is too large")
+var errSpanTooLarge = errors.New("span is too large")
type udpSender struct {
client *utils.AgentClientUDP
@@ -44,9 +46,19 @@ type udpSender struct {
thriftProtocol thrift.TProtocol
process *j.Process
processByteSize int
+
+ // reporterStats provides access to stats that are only known to Reporter
+ reporterStats reporterstats.ReporterStats
+
+ // The following counters are always non-negative, but we need to send them in signed i64 Thrift fields,
+ // so we keep them as signed. At 10k QPS, overflow happens in about 300 million years.
+ batchSeqNo int64
+ tooLargeDroppedSpans int64
+ failedToEmitSpans int64
}
-// NewUDPTransport creates a reporter that submits spans to jaeger-agent
+// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
+// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
@@ -66,17 +78,22 @@ func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return nil, err
}
- sender := &udpSender{
+ return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
- thriftProtocol: thriftProtocol}
- return sender, nil
+ thriftProtocol: thriftProtocol,
+ }, nil
+}
+
+// SetReporterStats implements reporterstats.Receiver.
+func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
+ s.reporterStats = rs
}
func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
s.thriftBuffer.Reset()
- thriftStruct.Write(s.thriftProtocol)
+ _ = thriftStruct.Write(s.thriftProtocol)
return s.thriftBuffer.Len()
}
@@ -89,6 +106,7 @@ func (s *udpSender) Append(span *Span) (int, error) {
jSpan := BuildJaegerThrift(span)
spanSize := s.calcSizeOfSerializedThrift(jSpan)
if spanSize > s.maxSpanBytes {
+ s.tooLargeDroppedSpans++
return 1, errSpanTooLarge
}
@@ -112,9 +130,18 @@ func (s *udpSender) Flush() (int, error) {
if n == 0 {
return 0, nil
}
- err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer})
+ s.batchSeqNo++
+ batchSeqNo := int64(s.batchSeqNo)
+ err := s.client.EmitBatch(&j.Batch{
+ Process: s.process,
+ Spans: s.spanBuffer,
+ SeqNo: &batchSeqNo,
+ Stats: s.makeStats(),
+ })
s.resetBuffers()
-
+ if err != nil {
+ s.failedToEmitSpans += int64(n)
+ }
return n, err
}
@@ -129,3 +156,15 @@ func (s *udpSender) resetBuffers() {
s.spanBuffer = s.spanBuffer[:0]
s.byteBufferSize = s.processByteSize
}
+
+func (s *udpSender) makeStats() *j.ClientStats {
+ var dropped int64
+ if s.reporterStats != nil {
+ dropped = s.reporterStats.SpansDroppedFromQueue()
+ }
+ return &j.ClientStats{
+ FullQueueDroppedSpans: dropped,
+ TooLargeDroppedSpans: s.tooLargeDroppedSpans,
+ FailedToEmitSpans: s.failedToEmitSpans,
+ }
+}