summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/CHANGELOG.md28
-rw-r--r--vendor/github.com/uber/jaeger-client-go/Makefile8
-rw-r--r--vendor/github.com/uber/jaeger-client-go/README.md2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/config/config.go13
-rw-r--r--vendor/github.com/uber/jaeger-client-go/constants.go2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go25
-rw-r--r--vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/reporter.go33
-rw-r--r--vendor/github.com/uber/jaeger-client-go/span.go77
-rw-r--r--vendor/github.com/uber/jaeger-client-go/span_context.go4
-rw-r--r--vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go272
-rw-r--r--vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go4
-rw-r--r--vendor/github.com/uber/jaeger-client-go/tracer.go1
-rw-r--r--vendor/github.com/uber/jaeger-client-go/tracer_options.go12
-rw-r--r--vendor/github.com/uber/jaeger-client-go/transport/http.go11
-rw-r--r--vendor/github.com/uber/jaeger-client-go/transport_udp.go61
-rw-r--r--vendor/github.com/uber/jaeger-client-go/utils/udp_client.go2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go1
-rw-r--r--vendor/modules.txt3
21 files changed, 527 insertions, 38 deletions
diff --git a/go.mod b/go.mod
index 54af7fda4..670b26f34 100644
--- a/go.mod
+++ b/go.mod
@@ -61,7 +61,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.4.0
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2
- github.com/uber/jaeger-client-go v2.20.1+incompatible
+ github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5 // indirect
github.com/varlink/go v0.0.0-20190502142041-0f1d566d194b
github.com/vishvananda/netlink v1.0.0
diff --git a/go.sum b/go.sum
index de7791f47..519b42d74 100644
--- a/go.sum
+++ b/go.sum
@@ -488,6 +488,8 @@ github.com/theckman/go-flock v0.7.1/go.mod h1:kjuth3y9VJ2aNlkNEO99G/8lp9fMIKaGyB
github.com/u-root/u-root v5.0.0+incompatible/go.mod h1:RYkpo8pTHrNjW08opNd/U6p/RJE7K0D8fXO0d47+3YY=
github.com/uber/jaeger-client-go v2.20.1+incompatible h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU=
github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
+github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
+github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5 h1:CwmGyzHTzCqCdZJkWR0A7ucZXgrCY7spRcpvm7ci//s=
github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
diff --git a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md
index 5e7e9d5e5..818568b28 100644
--- a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md
+++ b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md
@@ -1,6 +1,34 @@
Changes by Version
==================
+2.22.1 (2020-01-16)
+-------------------
+
+- Increase UDP batch overhead to account for data loss metrics ([#488](https://github.com/jaegertracing/jaeger-client-go/pull/488)) -- Yuri Shkuro
+
+
+2.22.0 (2020-01-15)
+-------------------
+
+- Report data loss stats to Jaeger backend ([#482](https://github.com/jaegertracing/jaeger-client-go/pull/482)) -- Yuri Shkuro
+- Add limit on log records per span ([#483](https://github.com/jaegertracing/jaeger-client-go/pull/483)) -- Sokolov Yura
+
+
+2.21.1 (2019-12-20)
+-------------------
+
+- Update version correctly.
+
+
+2.21.0 (2019-12-20)
+-------------------
+
+- Clarify reporting error logs ([#469](https://github.com/jaegertracing/jaeger-client-go/pull/469)) -- Yuri Shkuro
+- Do not strip leading zeros from trace IDs ([#472](https://github.com/jaegertracing/jaeger-client-go/pull/472)) -- Yuri Shkuro
+- Chore (docs): fixed a couple of typos ([#475](https://github.com/jaegertracing/jaeger-client-go/pull/475)) -- Marc Bramaud
+- Support custom HTTP headers when reporting spans over HTTP ([#479](https://github.com/jaegertracing/jaeger-client-go/pull/479)) -- Albert Teoh
+
+
2.20.1 (2019-11-08)
-------------------
diff --git a/vendor/github.com/uber/jaeger-client-go/Makefile b/vendor/github.com/uber/jaeger-client-go/Makefile
index 0cfe6a5f6..d5e962ccf 100644
--- a/vendor/github.com/uber/jaeger-client-go/Makefile
+++ b/vendor/github.com/uber/jaeger-client-go/Makefile
@@ -83,8 +83,12 @@ cover-html: cover
test-examples:
make -C examples
+.PHONY: thrift
+thrift: idl-submodule thrift-compile
+
# TODO at the moment we're not generating tchan_*.go files
-thrift: idl-submodule thrift-image
+.PHONY: thrift-compile
+thrift-compile: thrift-image
$(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/agent.thrift
$(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/sampling.thrift
$(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/jaeger.thrift
@@ -99,10 +103,12 @@ thrift: idl-submodule thrift-image
rm -rf crossdock/thrift/*/*-remote
rm -rf thrift-gen/jaeger/collector.go
+.PHONY: idl-submodule
idl-submodule:
git submodule init
git submodule update
+.PHONY: thrift-image
thrift-image:
$(THRIFT) -version
diff --git a/vendor/github.com/uber/jaeger-client-go/README.md b/vendor/github.com/uber/jaeger-client-go/README.md
index a3366114d..0e4d9fc0b 100644
--- a/vendor/github.com/uber/jaeger-client-go/README.md
+++ b/vendor/github.com/uber/jaeger-client-go/README.md
@@ -45,7 +45,7 @@ and [config/example_test.go](./config/example_test.go).
### Environment variables
The tracer can be initialized with values coming from environment variables. None of the env vars are required
-and all of them can be overriden via direct setting of the property on the configuration object.
+and all of them can be overridden via direct setting of the property on the configuration object.
Property| Description
--- | ---
diff --git a/vendor/github.com/uber/jaeger-client-go/config/config.go b/vendor/github.com/uber/jaeger-client-go/config/config.go
index a0c32d804..44e93533c 100644
--- a/vendor/github.com/uber/jaeger-client-go/config/config.go
+++ b/vendor/github.com/uber/jaeger-client-go/config/config.go
@@ -134,6 +134,10 @@ type ReporterConfig struct {
// Password instructs reporter to include a password for basic http authentication when sending spans to
// jaeger-collector. Can be set by exporting an environment variable named JAEGER_PASSWORD
Password string `yaml:"password"`
+
+ // HTTPHeaders instructs the reporter to add these headers to the http request when reporting spans.
+ // This field takes effect only when using HTTPTransport by setting the CollectorEndpoint.
+ HTTPHeaders map[string]string `yaml:"http_headers"`
}
// BaggageRestrictionsConfig configures the baggage restrictions manager which can be used to whitelist
@@ -397,11 +401,12 @@ func (rc *ReporterConfig) NewReporter(
func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
switch {
- case rc.CollectorEndpoint != "" && rc.User != "" && rc.Password != "":
- return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1),
- transport.HTTPBasicAuth(rc.User, rc.Password)), nil
case rc.CollectorEndpoint != "":
- return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1)), nil
+ httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)}
+ if rc.User != "" && rc.Password != "" {
+ httpOptions = append(httpOptions, transport.HTTPBasicAuth(rc.User, rc.Password))
+ }
+ return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil
default:
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
}
diff --git a/vendor/github.com/uber/jaeger-client-go/constants.go b/vendor/github.com/uber/jaeger-client-go/constants.go
index 5d27b628d..1702c7de4 100644
--- a/vendor/github.com/uber/jaeger-client-go/constants.go
+++ b/vendor/github.com/uber/jaeger-client-go/constants.go
@@ -22,7 +22,7 @@ import (
const (
// JaegerClientVersion is the version of the client library reported as Span tag.
- JaegerClientVersion = "Go-2.20.1"
+ JaegerClientVersion = "Go-2.22.1"
// JaegerClientVersionTagKey is the name of the tag used to report client version.
JaegerClientVersionTagKey = "jaeger.version"
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go b/vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go
new file mode 100644
index 000000000..fe0bef268
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go
@@ -0,0 +1,25 @@
+// Copyright (c) 2020 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package reporterstats
+
+// ReporterStats exposes some metrics from the RemoteReporter.
+type ReporterStats interface {
+ SpansDroppedFromQueue() int64
+}
+
+// Receiver can be implemented by a Transport to be given ReporterStats.
+type Receiver interface {
+ SetReporterStats(ReporterStats)
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go b/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
index f0f1afe2f..3ac2f8f94 100644
--- a/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
+++ b/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
@@ -24,6 +24,7 @@ import (
)
// BuildJaegerThrift builds jaeger span based on internal span.
+// TODO: (breaking change) move to internal package.
func BuildJaegerThrift(span *Span) *j.Span {
span.Lock()
defer span.Unlock()
@@ -46,6 +47,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
}
// BuildJaegerProcessThrift creates a thrift Process type.
+// TODO: (breaking change) move to internal package.
func BuildJaegerProcessThrift(span *Span) *j.Process {
span.Lock()
defer span.Unlock()
diff --git a/vendor/github.com/uber/jaeger-client-go/reporter.go b/vendor/github.com/uber/jaeger-client-go/reporter.go
index 0b78cec20..830b5a4bb 100644
--- a/vendor/github.com/uber/jaeger-client-go/reporter.go
+++ b/vendor/github.com/uber/jaeger-client-go/reporter.go
@@ -22,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
+ "github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
)
@@ -176,16 +177,31 @@ type reporterQueueItem struct {
close *sync.WaitGroup
}
+// reporterStats implements reporterstats.ReporterStats.
+type reporterStats struct {
+ droppedCount int64 // provided to Transports to report data loss to the backend
+}
+
+// SpansDroppedFromQueue implements reporterstats.ReporterStats.
+func (r *reporterStats) SpansDroppedFromQueue() int64 {
+ return atomic.LoadInt64(&r.droppedCount)
+}
+
+func (r *reporterStats) incDroppedCount() {
+ atomic.AddInt64(&r.droppedCount, 1)
+}
+
type remoteReporter struct {
// These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
- queueLength int64
+ queueLength int64 // used to update metrics.Gauge
closed int64 // 0 - not closed, 1 - closed
reporterOptions
- sender Transport
- queue chan reporterQueueItem
+ sender Transport
+ queue chan reporterQueueItem
+ reporterStats *reporterStats
}
// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender.
@@ -213,6 +229,10 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
reporterOptions: options,
sender: sender,
queue: make(chan reporterQueueItem, options.queueSize),
+ reporterStats: new(reporterStats),
+ }
+ if receiver, ok := sender.(reporterstats.Receiver); ok {
+ receiver.SetReporterStats(reporter.reporterStats)
}
go reporter.processQueue()
return reporter
@@ -231,6 +251,7 @@ func (r *remoteReporter) Report(span *Span) {
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
+ r.reporterStats.incDroppedCount()
}
}
@@ -241,7 +262,7 @@ func (r *remoteReporter) Close() {
return
}
r.sendCloseEvent()
- r.sender.Close()
+ _ = r.sender.Close()
}
func (r *remoteReporter) sendCloseEvent() {
@@ -263,7 +284,7 @@ func (r *remoteReporter) processQueue() {
flush := func() {
if flushed, err := r.sender.Flush(); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
- r.logger.Error(fmt.Sprintf("error when flushing the buffer: %s", err.Error()))
+ r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
}
@@ -281,7 +302,7 @@ func (r *remoteReporter) processQueue() {
span := item.span
if flushed, err := r.sender.Append(span); err != nil {
r.metrics.ReporterFailure.Inc(int64(flushed))
- r.logger.Error(fmt.Sprintf("error reporting span %q: %s", span.OperationName(), err.Error()))
+ r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error()))
} else if flushed > 0 {
r.metrics.ReporterSuccess.Inc(int64(flushed))
// to reduce the number of gauge stats, we only emit queue length on flush
diff --git a/vendor/github.com/uber/jaeger-client-go/span.go b/vendor/github.com/uber/jaeger-client-go/span.go
index bbf6fb068..42c9112c0 100644
--- a/vendor/github.com/uber/jaeger-client-go/span.go
+++ b/vendor/github.com/uber/jaeger-client-go/span.go
@@ -59,6 +59,9 @@ type Span struct {
// The span's "micro-log"
logs []opentracing.LogRecord
+ // The number of logs dropped because of MaxLogsPerSpan.
+ numDroppedLogs int
+
// references for this span
references []Reference
@@ -152,7 +155,12 @@ func (s *Span) Logs() []opentracing.LogRecord {
s.Lock()
defer s.Unlock()
- return append([]opentracing.LogRecord(nil), s.logs...)
+ logs := append([]opentracing.LogRecord(nil), s.logs...)
+ if s.numDroppedLogs != 0 {
+ fixLogs(logs, s.numDroppedLogs)
+ }
+
+ return logs
}
// References returns references for this span
@@ -234,8 +242,65 @@ func (s *Span) Log(ld opentracing.LogData) {
// this function should only be called while holding a Write lock
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
- // TODO add logic to limit number of logs per span (issue #46)
- s.logs = append(s.logs, lr)
+ maxLogs := s.tracer.options.maxLogsPerSpan
+ if maxLogs == 0 || len(s.logs) < maxLogs {
+ s.logs = append(s.logs, lr)
+ return
+ }
+
+ // We have too many logs. We don't touch the first numOld logs; we treat the
+ // rest as a circular buffer and overwrite the oldest log among those.
+ numOld := (maxLogs - 1) / 2
+ numNew := maxLogs - numOld
+ s.logs[numOld+s.numDroppedLogs%numNew] = lr
+ s.numDroppedLogs++
+}
+
+// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
+// the end (i.e. pos circular left shifts).
+func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
+ // This algorithm is described in:
+ // http://www.cplusplus.com/reference/algorithm/rotate
+ for first, middle, next := 0, pos, pos; first != middle; {
+ buf[first], buf[next] = buf[next], buf[first]
+ first++
+ next++
+ if next == len(buf) {
+ next = middle
+ } else if first == middle {
+ middle = next
+ }
+ }
+}
+
+func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) {
+ // We dropped some log events, which means that we used part of Logs as a
+ // circular buffer (see appendLog). De-circularize it.
+ numOld := (len(logs) - 1) / 2
+ numNew := len(logs) - numOld
+ rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew)
+
+ // Replace the log in the middle (the oldest "new" log) with information
+ // about the dropped logs. This means that we are effectively dropping one
+ // more "new" log.
+ numDropped := numDroppedLogs + 1
+ logs[numOld] = opentracing.LogRecord{
+ // Keep the timestamp of the last dropped event.
+ Timestamp: logs[numOld].Timestamp,
+ Fields: []log.Field{
+ log.String("event", "dropped Span logs"),
+ log.Int("dropped_log_count", numDropped),
+ log.String("component", "jaeger-client"),
+ },
+ }
+}
+
+func (s *Span) fixLogsIfDropped() {
+ if s.numDroppedLogs == 0 {
+ return
+ }
+ fixLogs(s.logs, s.numDroppedLogs)
+ s.numDroppedLogs = 0
}
// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
@@ -274,8 +339,9 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
s.applySamplingDecision(decision, true)
}
if s.context.IsSampled() {
+ s.Lock()
+ s.fixLogsIfDropped()
if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
- s.Lock()
// Note: bulk logs are not subject to maxLogsPerSpan limit
if options.LogRecords != nil {
s.logs = append(s.logs, options.LogRecords...)
@@ -283,8 +349,8 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
for _, ld := range options.BulkLogData {
s.logs = append(s.logs, ld.ToLogRecord())
}
- s.Unlock()
}
+ s.Unlock()
}
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
@@ -344,6 +410,7 @@ func (s *Span) reset() {
// Note: To reuse memory we can save the pointers on the heap
s.tags = s.tags[:0]
s.logs = s.logs[:0]
+ s.numDroppedLogs = 0
s.references = s.references[:0]
}
diff --git a/vendor/github.com/uber/jaeger-client-go/span_context.go b/vendor/github.com/uber/jaeger-client-go/span_context.go
index b7230abfe..1b44f3f8c 100644
--- a/vendor/github.com/uber/jaeger-client-go/span_context.go
+++ b/vendor/github.com/uber/jaeger-client-go/span_context.go
@@ -213,9 +213,9 @@ func (c SpanContext) SetFirehose() {
func (c SpanContext) String() string {
if c.traceID.High == 0 {
- return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
+ return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
- return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
+ return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
// ContextFromString reconstructs the Context encoded in a string
diff --git a/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go b/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go
index d23ed2fc2..e69c6d603 100644
--- a/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go
+++ b/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go
@@ -1577,11 +1577,192 @@ func (p *Process) String() string {
}
// Attributes:
+// - FullQueueDroppedSpans
+// - TooLargeDroppedSpans
+// - FailedToEmitSpans
+type ClientStats struct {
+ FullQueueDroppedSpans int64 `thrift:"fullQueueDroppedSpans,1,required" json:"fullQueueDroppedSpans"`
+ TooLargeDroppedSpans int64 `thrift:"tooLargeDroppedSpans,2,required" json:"tooLargeDroppedSpans"`
+ FailedToEmitSpans int64 `thrift:"failedToEmitSpans,3,required" json:"failedToEmitSpans"`
+}
+
+func NewClientStats() *ClientStats {
+ return &ClientStats{}
+}
+
+func (p *ClientStats) GetFullQueueDroppedSpans() int64 {
+ return p.FullQueueDroppedSpans
+}
+
+func (p *ClientStats) GetTooLargeDroppedSpans() int64 {
+ return p.TooLargeDroppedSpans
+}
+
+func (p *ClientStats) GetFailedToEmitSpans() int64 {
+ return p.FailedToEmitSpans
+}
+func (p *ClientStats) Read(iprot thrift.TProtocol) error {
+ if _, err := iprot.ReadStructBegin(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
+ }
+
+ var issetFullQueueDroppedSpans bool = false
+ var issetTooLargeDroppedSpans bool = false
+ var issetFailedToEmitSpans bool = false
+
+ for {
+ _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin()
+ if err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err)
+ }
+ if fieldTypeId == thrift.STOP {
+ break
+ }
+ switch fieldId {
+ case 1:
+ if err := p.readField1(iprot); err != nil {
+ return err
+ }
+ issetFullQueueDroppedSpans = true
+ case 2:
+ if err := p.readField2(iprot); err != nil {
+ return err
+ }
+ issetTooLargeDroppedSpans = true
+ case 3:
+ if err := p.readField3(iprot); err != nil {
+ return err
+ }
+ issetFailedToEmitSpans = true
+ default:
+ if err := iprot.Skip(fieldTypeId); err != nil {
+ return err
+ }
+ }
+ if err := iprot.ReadFieldEnd(); err != nil {
+ return err
+ }
+ }
+ if err := iprot.ReadStructEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err)
+ }
+ if !issetFullQueueDroppedSpans {
+ return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FullQueueDroppedSpans is not set"))
+ }
+ if !issetTooLargeDroppedSpans {
+ return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TooLargeDroppedSpans is not set"))
+ }
+ if !issetFailedToEmitSpans {
+ return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FailedToEmitSpans is not set"))
+ }
+ return nil
+}
+
+func (p *ClientStats) readField1(iprot thrift.TProtocol) error {
+ if v, err := iprot.ReadI64(); err != nil {
+ return thrift.PrependError("error reading field 1: ", err)
+ } else {
+ p.FullQueueDroppedSpans = v
+ }
+ return nil
+}
+
+func (p *ClientStats) readField2(iprot thrift.TProtocol) error {
+ if v, err := iprot.ReadI64(); err != nil {
+ return thrift.PrependError("error reading field 2: ", err)
+ } else {
+ p.TooLargeDroppedSpans = v
+ }
+ return nil
+}
+
+func (p *ClientStats) readField3(iprot thrift.TProtocol) error {
+ if v, err := iprot.ReadI64(); err != nil {
+ return thrift.PrependError("error reading field 3: ", err)
+ } else {
+ p.FailedToEmitSpans = v
+ }
+ return nil
+}
+
+func (p *ClientStats) Write(oprot thrift.TProtocol) error {
+ if err := oprot.WriteStructBegin("ClientStats"); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
+ }
+ if err := p.writeField1(oprot); err != nil {
+ return err
+ }
+ if err := p.writeField2(oprot); err != nil {
+ return err
+ }
+ if err := p.writeField3(oprot); err != nil {
+ return err
+ }
+ if err := oprot.WriteFieldStop(); err != nil {
+ return thrift.PrependError("write field stop error: ", err)
+ }
+ if err := oprot.WriteStructEnd(); err != nil {
+ return thrift.PrependError("write struct stop error: ", err)
+ }
+ return nil
+}
+
+func (p *ClientStats) writeField1(oprot thrift.TProtocol) (err error) {
+ if err := oprot.WriteFieldBegin("fullQueueDroppedSpans", thrift.I64, 1); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:fullQueueDroppedSpans: ", p), err)
+ }
+ if err := oprot.WriteI64(int64(p.FullQueueDroppedSpans)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.fullQueueDroppedSpans (1) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 1:fullQueueDroppedSpans: ", p), err)
+ }
+ return err
+}
+
+func (p *ClientStats) writeField2(oprot thrift.TProtocol) (err error) {
+ if err := oprot.WriteFieldBegin("tooLargeDroppedSpans", thrift.I64, 2); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:tooLargeDroppedSpans: ", p), err)
+ }
+ if err := oprot.WriteI64(int64(p.TooLargeDroppedSpans)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.tooLargeDroppedSpans (2) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 2:tooLargeDroppedSpans: ", p), err)
+ }
+ return err
+}
+
+func (p *ClientStats) writeField3(oprot thrift.TProtocol) (err error) {
+ if err := oprot.WriteFieldBegin("failedToEmitSpans", thrift.I64, 3); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:failedToEmitSpans: ", p), err)
+ }
+ if err := oprot.WriteI64(int64(p.FailedToEmitSpans)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.failedToEmitSpans (3) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 3:failedToEmitSpans: ", p), err)
+ }
+ return err
+}
+
+func (p *ClientStats) String() string {
+ if p == nil {
+ return "<nil>"
+ }
+ return fmt.Sprintf("ClientStats(%+v)", *p)
+}
+
+// Attributes:
// - Process
// - Spans
+// - SeqNo
+// - Stats
type Batch struct {
- Process *Process `thrift:"process,1,required" json:"process"`
- Spans []*Span `thrift:"spans,2,required" json:"spans"`
+ Process *Process `thrift:"process,1,required" json:"process"`
+ Spans []*Span `thrift:"spans,2,required" json:"spans"`
+ SeqNo *int64 `thrift:"seqNo,3" json:"seqNo,omitempty"`
+ Stats *ClientStats `thrift:"stats,4" json:"stats,omitempty"`
}
func NewBatch() *Batch {
@@ -1600,10 +1781,36 @@ func (p *Batch) GetProcess() *Process {
func (p *Batch) GetSpans() []*Span {
return p.Spans
}
+
+var Batch_SeqNo_DEFAULT int64
+
+func (p *Batch) GetSeqNo() int64 {
+ if !p.IsSetSeqNo() {
+ return Batch_SeqNo_DEFAULT
+ }
+ return *p.SeqNo
+}
+
+var Batch_Stats_DEFAULT *ClientStats
+
+func (p *Batch) GetStats() *ClientStats {
+ if !p.IsSetStats() {
+ return Batch_Stats_DEFAULT
+ }
+ return p.Stats
+}
func (p *Batch) IsSetProcess() bool {
return p.Process != nil
}
+func (p *Batch) IsSetSeqNo() bool {
+ return p.SeqNo != nil
+}
+
+func (p *Batch) IsSetStats() bool {
+ return p.Stats != nil
+}
+
func (p *Batch) Read(iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@@ -1631,6 +1838,14 @@ func (p *Batch) Read(iprot thrift.TProtocol) error {
return err
}
issetSpans = true
+ case 3:
+ if err := p.readField3(iprot); err != nil {
+ return err
+ }
+ case 4:
+ if err := p.readField4(iprot); err != nil {
+ return err
+ }
default:
if err := iprot.Skip(fieldTypeId); err != nil {
return err
@@ -1680,6 +1895,23 @@ func (p *Batch) readField2(iprot thrift.TProtocol) error {
return nil
}
+func (p *Batch) readField3(iprot thrift.TProtocol) error {
+ if v, err := iprot.ReadI64(); err != nil {
+ return thrift.PrependError("error reading field 3: ", err)
+ } else {
+ p.SeqNo = &v
+ }
+ return nil
+}
+
+func (p *Batch) readField4(iprot thrift.TProtocol) error {
+ p.Stats = &ClientStats{}
+ if err := p.Stats.Read(iprot); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Stats), err)
+ }
+ return nil
+}
+
func (p *Batch) Write(oprot thrift.TProtocol) error {
if err := oprot.WriteStructBegin("Batch"); err != nil {
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err)
@@ -1690,6 +1922,12 @@ func (p *Batch) Write(oprot thrift.TProtocol) error {
if err := p.writeField2(oprot); err != nil {
return err
}
+ if err := p.writeField3(oprot); err != nil {
+ return err
+ }
+ if err := p.writeField4(oprot); err != nil {
+ return err
+ }
if err := oprot.WriteFieldStop(); err != nil {
return thrift.PrependError("write field stop error: ", err)
}
@@ -1733,6 +1971,36 @@ func (p *Batch) writeField2(oprot thrift.TProtocol) (err error) {
return err
}
+func (p *Batch) writeField3(oprot thrift.TProtocol) (err error) {
+ if p.IsSetSeqNo() {
+ if err := oprot.WriteFieldBegin("seqNo", thrift.I64, 3); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:seqNo: ", p), err)
+ }
+ if err := oprot.WriteI64(int64(*p.SeqNo)); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T.seqNo (3) field write error: ", p), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 3:seqNo: ", p), err)
+ }
+ }
+ return err
+}
+
+func (p *Batch) writeField4(oprot thrift.TProtocol) (err error) {
+ if p.IsSetStats() {
+ if err := oprot.WriteFieldBegin("stats", thrift.STRUCT, 4); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:stats: ", p), err)
+ }
+ if err := p.Stats.Write(oprot); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Stats), err)
+ }
+ if err := oprot.WriteFieldEnd(); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error 4:stats: ", p), err)
+ }
+ }
+ return err
+}
+
func (p *Batch) String() string {
if p == nil {
return "<nil>"
diff --git a/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go b/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go
index 2d49e1d5f..15583e56b 100644
--- a/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go
+++ b/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go
@@ -729,7 +729,7 @@ func (p *BinaryAnnotation) String() string {
// precise value possible. For example, gettimeofday or syncing nanoTime
// against a tick of currentTimeMillis.
//
-// For compatibilty with instrumentation that precede this field, collectors
+// For compatibility with instrumentation that precede this field, collectors
// or span stores can derive this via Annotation.timestamp.
// For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp.
//
@@ -741,7 +741,7 @@ func (p *BinaryAnnotation) String() string {
// precise measurement decoupled from problems of clocks, such as skew or NTP
// updates causing time to move backwards.
//
-// For compatibilty with instrumentation that precede this field, collectors
+// For compatibility with instrumentation that precede this field, collectors
// or span stores can derive this by subtracting Annotation.timestamp.
// For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp.
//
diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go
index f03372dc7..da43ec6db 100644
--- a/vendor/github.com/uber/jaeger-client-go/tracer.go
+++ b/vendor/github.com/uber/jaeger-client-go/tracer.go
@@ -52,6 +52,7 @@ type Tracer struct {
highTraceIDGenerator func() uint64 // custom high trace ID generator
maxTagValueLength int
noDebugFlagOnForcedSampling bool
+ maxLogsPerSpan int
// more options to come
}
// allocator of Span objects
diff --git a/vendor/github.com/uber/jaeger-client-go/tracer_options.go b/vendor/github.com/uber/jaeger-client-go/tracer_options.go
index 469685bb4..f016484b9 100644
--- a/vendor/github.com/uber/jaeger-client-go/tracer_options.go
+++ b/vendor/github.com/uber/jaeger-client-go/tracer_options.go
@@ -144,6 +144,18 @@ func (tracerOptions) MaxTagValueLength(maxTagValueLength int) TracerOption {
}
}
+// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
+// value). If a span has more logs than this value, logs are dropped as
+// necessary (and replaced with a log describing how many were dropped).
+//
+// About half of the MaxLogsPerSpan logs kept are the oldest logs, and about
+// half are the newest logs.
+func (tracerOptions) MaxLogsPerSpan(maxLogsPerSpan int) TracerOption {
+ return func(tracer *Tracer) {
+ tracer.options.maxLogsPerSpan = maxLogsPerSpan
+ }
+}
+
func (tracerOptions) ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) TracerOption {
return func(tracer *Tracer) {
tracer.options.zipkinSharedRPCSpan = zipkinSharedRPCSpan
diff --git a/vendor/github.com/uber/jaeger-client-go/transport/http.go b/vendor/github.com/uber/jaeger-client-go/transport/http.go
index bc1b3e6b0..bb7eb00c9 100644
--- a/vendor/github.com/uber/jaeger-client-go/transport/http.go
+++ b/vendor/github.com/uber/jaeger-client-go/transport/http.go
@@ -39,6 +39,7 @@ type HTTPTransport struct {
spans []*j.Span
process *j.Process
httpCredentials *HTTPBasicAuthCredentials
+ headers map[string]string
}
// HTTPBasicAuthCredentials stores credentials for HTTP basic auth.
@@ -76,6 +77,13 @@ func HTTPRoundTripper(transport http.RoundTripper) HTTPOption {
}
}
+// HTTPHeaders defines the HTTP headers that will be attached to the jaeger client's HTTP request
+func HTTPHeaders(headers map[string]string) HTTPOption {
+ return func(c *HTTPTransport) {
+ c.headers = headers
+ }
+}
+
// NewHTTPTransport returns a new HTTP-backend transport. url should be an http
// url of the collector to handle POST request, typically something like:
// http://hostname:14268/api/traces?format=jaeger.thrift
@@ -136,6 +144,9 @@ func (c *HTTPTransport) send(spans []*j.Span) error {
return err
}
req.Header.Set("Content-Type", "application/x-thrift")
+ for k, v := range c.headers {
+ req.Header.Set(k, v)
+ }
if c.httpCredentials != nil {
req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password)
diff --git a/vendor/github.com/uber/jaeger-client-go/transport_udp.go b/vendor/github.com/uber/jaeger-client-go/transport_udp.go
index 7b9ccf937..7370d8007 100644
--- a/vendor/github.com/uber/jaeger-client-go/transport_udp.go
+++ b/vendor/github.com/uber/jaeger-client-go/transport_udp.go
@@ -18,8 +18,8 @@ import (
"errors"
"fmt"
+ "github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/thrift"
-
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
)
@@ -27,12 +27,14 @@ import (
// Empirically obtained constant for how many bytes in the message are used for envelope.
// The total datagram size is:
// sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize
-// There is a unit test `TestEmitBatchOverhead` that validates this number.
+//
// Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans
// in the batch, because the length of the list is encoded as varint32, as well as SeqId.
-const emitBatchOverhead = 30
+//
+// There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68.
+const emitBatchOverhead = 70
-var errSpanTooLarge = errors.New("Span is too large")
+var errSpanTooLarge = errors.New("span is too large")
type udpSender struct {
client *utils.AgentClientUDP
@@ -44,9 +46,19 @@ type udpSender struct {
thriftProtocol thrift.TProtocol
process *j.Process
processByteSize int
+
+ // reporterStats provides access to stats that are only known to Reporter
+ reporterStats reporterstats.ReporterStats
+
+ // The following counters are always non-negative, but we need to send them in signed i64 Thrift fields,
+ // so we keep them as signed. At 10k QPS, overflow happens in about 300 million years.
+ batchSeqNo int64
+ tooLargeDroppedSpans int64
+ failedToEmitSpans int64
}
-// NewUDPTransport creates a reporter that submits spans to jaeger-agent
+// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
+// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
@@ -66,17 +78,22 @@ func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return nil, err
}
- sender := &udpSender{
+ return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
- thriftProtocol: thriftProtocol}
- return sender, nil
+ thriftProtocol: thriftProtocol,
+ }, nil
+}
+
+// SetReporterStats implements reporterstats.Receiver.
+func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
+ s.reporterStats = rs
}
func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int {
s.thriftBuffer.Reset()
- thriftStruct.Write(s.thriftProtocol)
+ _ = thriftStruct.Write(s.thriftProtocol)
return s.thriftBuffer.Len()
}
@@ -89,6 +106,7 @@ func (s *udpSender) Append(span *Span) (int, error) {
jSpan := BuildJaegerThrift(span)
spanSize := s.calcSizeOfSerializedThrift(jSpan)
if spanSize > s.maxSpanBytes {
+ s.tooLargeDroppedSpans++
return 1, errSpanTooLarge
}
@@ -112,9 +130,18 @@ func (s *udpSender) Flush() (int, error) {
if n == 0 {
return 0, nil
}
- err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer})
+ s.batchSeqNo++
+ batchSeqNo := int64(s.batchSeqNo)
+ err := s.client.EmitBatch(&j.Batch{
+ Process: s.process,
+ Spans: s.spanBuffer,
+ SeqNo: &batchSeqNo,
+ Stats: s.makeStats(),
+ })
s.resetBuffers()
-
+ if err != nil {
+ s.failedToEmitSpans += int64(n)
+ }
return n, err
}
@@ -129,3 +156,15 @@ func (s *udpSender) resetBuffers() {
s.spanBuffer = s.spanBuffer[:0]
s.byteBufferSize = s.processByteSize
}
+
+func (s *udpSender) makeStats() *j.ClientStats {
+ var dropped int64
+ if s.reporterStats != nil {
+ dropped = s.reporterStats.SpansDroppedFromQueue()
+ }
+ return &j.ClientStats{
+ FullQueueDroppedSpans: dropped,
+ TooLargeDroppedSpans: s.tooLargeDroppedSpans,
+ FailedToEmitSpans: s.failedToEmitSpans,
+ }
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go
index 6f042073d..fadd73e49 100644
--- a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go
+++ b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go
@@ -85,7 +85,7 @@ func (a *AgentClientUDP) EmitBatch(batch *jaeger.Batch) error {
return err
}
if a.thriftBuffer.Len() > a.maxPacketSize {
- return fmt.Errorf("Data does not fit within one UDP packet; size %d, max %d, spans %d",
+ return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d",
a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans))
}
_, err := a.connUDP.Write(a.thriftBuffer.Bytes())
diff --git a/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go b/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go
index eb31c4369..73aeb000f 100644
--- a/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go
+++ b/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go
@@ -40,6 +40,7 @@ var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){
}
// BuildZipkinThrift builds thrift span based on internal span.
+// TODO: (breaking change) move to transport/zipkin and make private.
func BuildZipkinThrift(s *Span) *z.Span {
span := &zipkinSpan{Span: s}
span.handleSpecialTags()
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 8fa756fbb..0b4cfefa9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -478,11 +478,12 @@ github.com/stretchr/testify/require
github.com/syndtr/gocapability/capability
# github.com/tchap/go-patricia v2.3.0+incompatible
github.com/tchap/go-patricia/patricia
-# github.com/uber/jaeger-client-go v2.20.1+incompatible
+# github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-client-go
github.com/uber/jaeger-client-go/config
github.com/uber/jaeger-client-go/internal/baggage
github.com/uber/jaeger-client-go/internal/baggage/remote
+github.com/uber/jaeger-client-go/internal/reporterstats
github.com/uber/jaeger-client-go/internal/spanlog
github.com/uber/jaeger-client-go/internal/throttler
github.com/uber/jaeger-client-go/internal/throttler/remote