summaryrefslogtreecommitdiff
path: root/vendor/github.com/uber/jaeger-client-go/reporter.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/uber/jaeger-client-go/reporter.go')
-rw-r--r--vendor/github.com/uber/jaeger-client-go/reporter.go33
1 files changed, 27 insertions, 6 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/reporter.go b/vendor/github.com/uber/jaeger-client-go/reporter.go
index 0b78cec20..830b5a4bb 100644
--- a/vendor/github.com/uber/jaeger-client-go/reporter.go
+++ b/vendor/github.com/uber/jaeger-client-go/reporter.go
@@ -22,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
+ "github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
)
@@ -176,16 +177,31 @@ type reporterQueueItem struct {
close *sync.WaitGroup
}
+// reporterStats implements reporterstats.ReporterStats.
+type reporterStats struct {
+ droppedCount int64 // provided to Transports to report data loss to the backend
+}
+
+// SpansDroppedFromQueue implements reporterstats.ReporterStats.
+func (r *reporterStats) SpansDroppedFromQueue() int64 {
+ return atomic.LoadInt64(&r.droppedCount)
+}
+
+func (r *reporterStats) incDroppedCount() {
+ atomic.AddInt64(&r.droppedCount, 1)
+}
+
type remoteReporter struct {
// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
- queueLength int64
+ queueLength int64 // used to update metrics.Gauge
closed int64 // 0 - not closed, 1 - closed
reporterOptions
- sender Transport
- queue chan reporterQueueItem
+ sender Transport
+ queue chan reporterQueueItem
+ reporterStats *reporterStats
}
// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
@@ -213,6 +229,10 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
reporterOptions: options,
sender: sender,
queue: make(chan reporterQueueItem, options.queueSize),
+ reporterStats: new(reporterStats),
+ }
+ if receiver, ok := sender.(reporterstats.Receiver); ok {
+ receiver.SetReporterStats(reporter.reporterStats)
}
go reporter.processQueue()
return reporter
@@ -231,6 +251,7 @@ func (r *remoteReporter) Report(span *Span) {
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
+ r.reporterStats.incDroppedCount()
}
}
@@ -241,7 +262,7 @@ func (r *remoteReporter) Close() {
return
}
r.sendCloseEvent()
- r.sender.Close()
+ _ = r.sender.Close()
}
func (r *remoteReporter) sendCloseEvent() {
@@ -263,7 +284,7 @@ func (r *remoteReporter) processQueue() {
flush := func() {
if flushed, err := r.sender.Flush(); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
- r.logger.Error(fmt.Sprintf("error when flushing the buffer: %s", err.Error()))
+ r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
}
@@ -281,7 +302,7 @@ func (r *remoteReporter) processQueue() {
span := item.span
if flushed, err := r.sender.Append(span); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
- r.logger.Error(fmt.Sprintf("error reporting span %q: %s", span.OperationName(), err.Error()))
+ r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
// to reduce the number of gauge stats, we only emit queue length on flush