diff options
21 files changed, 527 insertions, 38 deletions
@@ -61,7 +61,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.4.0 github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2 - github.com/uber/jaeger-client-go v2.20.1+incompatible + github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5 // indirect github.com/varlink/go v0.0.0-20190502142041-0f1d566d194b github.com/vishvananda/netlink v1.0.0 @@ -488,6 +488,8 @@ github.com/theckman/go-flock v0.7.1/go.mod h1:kjuth3y9VJ2aNlkNEO99G/8lp9fMIKaGyB github.com/u-root/u-root v5.0.0+incompatible/go.mod h1:RYkpo8pTHrNjW08opNd/U6p/RJE7K0D8fXO0d47+3YY= github.com/uber/jaeger-client-go v2.20.1+incompatible h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU= github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM= +github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5 h1:CwmGyzHTzCqCdZJkWR0A7ucZXgrCY7spRcpvm7ci//s= github.com/uber/jaeger-lib v0.0.0-20190122222657-d036253de8f5/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= diff --git a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md index 5e7e9d5e5..818568b28 100644 --- a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md +++ b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md @@ -1,6 +1,34 @@ Changes by Version ================== +2.22.1 (2020-01-16) +------------------- + +- Increase UDP batch overhead to account for data loss metrics ([#488](https://github.com/jaegertracing/jaeger-client-go/pull/488)) -- Yuri Shkuro + + +2.22.0 (2020-01-15) +------------------- + +- Report data loss stats to Jaeger backend ([#482](https://github.com/jaegertracing/jaeger-client-go/pull/482)) -- Yuri Shkuro +- Add limit on log records per span ([#483](https://github.com/jaegertracing/jaeger-client-go/pull/483)) -- Sokolov Yura + + +2.21.1 (2019-12-20) +------------------- + +- Update version correctly. + + +2.21.0 (2019-12-20) +------------------- + +- Clarify reporting error logs ([#469](https://github.com/jaegertracing/jaeger-client-go/pull/469)) -- Yuri Shkuro +- Do not strip leading zeros from trace IDs ([#472](https://github.com/jaegertracing/jaeger-client-go/pull/472)) -- Yuri Shkuro +- Chore (docs): fixed a couple of typos ([#475](https://github.com/jaegertracing/jaeger-client-go/pull/475)) -- Marc Bramaud +- Support custom HTTP headers when reporting spans over HTTP ([#479](https://github.com/jaegertracing/jaeger-client-go/pull/479)) -- Albert Teoh + + 2.20.1 (2019-11-08) ------------------- diff --git a/vendor/github.com/uber/jaeger-client-go/Makefile b/vendor/github.com/uber/jaeger-client-go/Makefile index 0cfe6a5f6..d5e962ccf 100644 --- a/vendor/github.com/uber/jaeger-client-go/Makefile +++ b/vendor/github.com/uber/jaeger-client-go/Makefile @@ -83,8 +83,12 @@ cover-html: cover test-examples: make -C examples +.PHONY: thrift +thrift: idl-submodule thrift-compile + # TODO at the moment we're not generating tchan_*.go files -thrift: idl-submodule thrift-image +.PHONY: thrift-compile +thrift-compile: thrift-image $(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/agent.thrift $(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/sampling.thrift $(THRIFT) -o /data --gen go:$(THRIFT_GO_ARGS) --out /data/$(THRIFT_GEN_DIR) /data/idl/thrift/jaeger.thrift @@ -99,10 +103,12 @@ thrift: idl-submodule thrift-image rm -rf crossdock/thrift/*/*-remote rm -rf thrift-gen/jaeger/collector.go +.PHONY: idl-submodule idl-submodule: git submodule init git submodule update +.PHONY: thrift-image thrift-image: $(THRIFT) -version diff --git a/vendor/github.com/uber/jaeger-client-go/README.md b/vendor/github.com/uber/jaeger-client-go/README.md index a3366114d..0e4d9fc0b 100644 --- a/vendor/github.com/uber/jaeger-client-go/README.md +++ b/vendor/github.com/uber/jaeger-client-go/README.md @@ -45,7 +45,7 @@ and [config/example_test.go](./config/example_test.go). ### Environment variables The tracer can be initialized with values coming from environment variables. None of the env vars are required -and all of them can be overriden via direct setting of the property on the configuration object. +and all of them can be overridden via direct setting of the property on the configuration object. Property| Description --- | --- diff --git a/vendor/github.com/uber/jaeger-client-go/config/config.go b/vendor/github.com/uber/jaeger-client-go/config/config.go index a0c32d804..44e93533c 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config.go @@ -134,6 +134,10 @@ type ReporterConfig struct { // Password instructs reporter to include a password for basic http authentication when sending spans to // jaeger-collector. Can be set by exporting an environment variable named JAEGER_PASSWORD Password string `yaml:"password"` + + // HTTPHeaders instructs the reporter to add these headers to the http request when reporting spans. + // This field takes effect only when using HTTPTransport by setting the CollectorEndpoint. + HTTPHeaders map[string]string `yaml:"http_headers"` } // BaggageRestrictionsConfig configures the baggage restrictions manager which can be used to whitelist @@ -397,11 +401,12 @@ func (rc *ReporterConfig) NewReporter( func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { switch { - case rc.CollectorEndpoint != "" && rc.User != "" && rc.Password != "": - return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1), - transport.HTTPBasicAuth(rc.User, rc.Password)), nil case rc.CollectorEndpoint != "": - return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1)), nil + httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)} + if rc.User != "" && rc.Password != "" { + httpOptions = append(httpOptions, transport.HTTPBasicAuth(rc.User, rc.Password)) + } + return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil default: return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) } diff --git a/vendor/github.com/uber/jaeger-client-go/constants.go b/vendor/github.com/uber/jaeger-client-go/constants.go index 5d27b628d..1702c7de4 100644 --- a/vendor/github.com/uber/jaeger-client-go/constants.go +++ b/vendor/github.com/uber/jaeger-client-go/constants.go @@ -22,7 +22,7 @@ import ( const ( // JaegerClientVersion is the version of the client library reported as Span tag. - JaegerClientVersion = "Go-2.20.1" + JaegerClientVersion = "Go-2.22.1" // JaegerClientVersionTagKey is the name of the tag used to report client version. JaegerClientVersionTagKey = "jaeger.version" diff --git a/vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go b/vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go new file mode 100644 index 000000000..fe0bef268 --- /dev/null +++ b/vendor/github.com/uber/jaeger-client-go/internal/reporterstats/stats.go @@ -0,0 +1,25 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reporterstats + +// ReporterStats exposes some metrics from the RemoteReporter. +type ReporterStats interface { + SpansDroppedFromQueue() int64 +} + +// Receiver can be implemented by a Transport to be given ReporterStats. +type Receiver interface { + SetReporterStats(ReporterStats) +} diff --git a/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go b/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go index f0f1afe2f..3ac2f8f94 100644 --- a/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go +++ b/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go @@ -24,6 +24,7 @@ import ( ) // BuildJaegerThrift builds jaeger span based on internal span. +// TODO: (breaking change) move to internal package. func BuildJaegerThrift(span *Span) *j.Span { span.Lock() defer span.Unlock() @@ -46,6 +47,7 @@ func BuildJaegerThrift(span *Span) *j.Span { } // BuildJaegerProcessThrift creates a thrift Process type. +// TODO: (breaking change) move to internal package. func BuildJaegerProcessThrift(span *Span) *j.Process { span.Lock() defer span.Unlock() diff --git a/vendor/github.com/uber/jaeger-client-go/reporter.go b/vendor/github.com/uber/jaeger-client-go/reporter.go index 0b78cec20..830b5a4bb 100644 --- a/vendor/github.com/uber/jaeger-client-go/reporter.go +++ b/vendor/github.com/uber/jaeger-client-go/reporter.go @@ -22,6 +22,7 @@ import ( "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/internal/reporterstats" "github.com/uber/jaeger-client-go/log" ) @@ -176,16 +177,31 @@ type reporterQueueItem struct { close *sync.WaitGroup } +// reporterStats implements reporterstats.ReporterStats. +type reporterStats struct { + droppedCount int64 // provided to Transports to report data loss to the backend +} + +// SpansDroppedFromQueue implements reporterstats.ReporterStats. +func (r *reporterStats) SpansDroppedFromQueue() int64 { + return atomic.LoadInt64(&r.droppedCount) +} + +func (r *reporterStats) incDroppedCount() { + atomic.AddInt64(&r.droppedCount, 1) +} + type remoteReporter struct { // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment. // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq - queueLength int64 + queueLength int64 // used to update metrics.Gauge closed int64 // 0 - not closed, 1 - closed reporterOptions - sender Transport - queue chan reporterQueueItem + sender Transport + queue chan reporterQueueItem + reporterStats *reporterStats } // NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender. @@ -213,6 +229,10 @@ func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter { reporterOptions: options, sender: sender, queue: make(chan reporterQueueItem, options.queueSize), + reporterStats: new(reporterStats), + } + if receiver, ok := sender.(reporterstats.Receiver); ok { + receiver.SetReporterStats(reporter.reporterStats) } go reporter.processQueue() return reporter @@ -231,6 +251,7 @@ func (r *remoteReporter) Report(span *Span) { atomic.AddInt64(&r.queueLength, 1) default: r.metrics.ReporterDropped.Inc(1) + r.reporterStats.incDroppedCount() } } @@ -241,7 +262,7 @@ func (r *remoteReporter) Close() { return } r.sendCloseEvent() - r.sender.Close() + _ = r.sender.Close() } func (r *remoteReporter) sendCloseEvent() { @@ -263,7 +284,7 @@ func (r *remoteReporter) processQueue() { flush := func() { if flushed, err := r.sender.Flush(); err != nil { r.metrics.ReporterFailure.Inc(int64(flushed)) - r.logger.Error(fmt.Sprintf("error when flushing the buffer: %s", err.Error())) + r.logger.Error(fmt.Sprintf("failed to flush Jaeger spans to server: %s", err.Error())) } else if flushed > 0 { r.metrics.ReporterSuccess.Inc(int64(flushed)) } @@ -281,7 +302,7 @@ func (r *remoteReporter) processQueue() { span := item.span if flushed, err := r.sender.Append(span); err != nil { r.metrics.ReporterFailure.Inc(int64(flushed)) - r.logger.Error(fmt.Sprintf("error reporting span %q: %s", span.OperationName(), err.Error())) + r.logger.Error(fmt.Sprintf("error reporting Jaeger span %q: %s", span.OperationName(), err.Error())) } else if flushed > 0 { r.metrics.ReporterSuccess.Inc(int64(flushed)) // to reduce the number of gauge stats, we only emit queue length on flush diff --git a/vendor/github.com/uber/jaeger-client-go/span.go b/vendor/github.com/uber/jaeger-client-go/span.go index bbf6fb068..42c9112c0 100644 --- a/vendor/github.com/uber/jaeger-client-go/span.go +++ b/vendor/github.com/uber/jaeger-client-go/span.go @@ -59,6 +59,9 @@ type Span struct { // The span's "micro-log" logs []opentracing.LogRecord + // The number of logs dropped because of MaxLogsPerSpan. + numDroppedLogs int + // references for this span references []Reference @@ -152,7 +155,12 @@ func (s *Span) Logs() []opentracing.LogRecord { s.Lock() defer s.Unlock() - return append([]opentracing.LogRecord(nil), s.logs...) + logs := append([]opentracing.LogRecord(nil), s.logs...) + if s.numDroppedLogs != 0 { + fixLogs(logs, s.numDroppedLogs) + } + + return logs } // References returns references for this span @@ -234,8 +242,65 @@ func (s *Span) Log(ld opentracing.LogData) { // this function should only be called while holding a Write lock func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { - // TODO add logic to limit number of logs per span (issue #46) - s.logs = append(s.logs, lr) + maxLogs := s.tracer.options.maxLogsPerSpan + if maxLogs == 0 || len(s.logs) < maxLogs { + s.logs = append(s.logs, lr) + return + } + + // We have too many logs. We don't touch the first numOld logs; we treat the + // rest as a circular buffer and overwrite the oldest log among those. + numOld := (maxLogs - 1) / 2 + numNew := maxLogs - numOld + s.logs[numOld+s.numDroppedLogs%numNew] = lr + s.numDroppedLogs++ +} + +// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at +// the end (i.e. pos circular left shifts). +func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { + // This algorithm is described in: + // http://www.cplusplus.com/reference/algorithm/rotate + for first, middle, next := 0, pos, pos; first != middle; { + buf[first], buf[next] = buf[next], buf[first] + first++ + next++ + if next == len(buf) { + next = middle + } else if first == middle { + middle = next + } + } +} + +func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) { + // We dropped some log events, which means that we used part of Logs as a + // circular buffer (see appendLog). De-circularize it. + numOld := (len(logs) - 1) / 2 + numNew := len(logs) - numOld + rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew) + + // Replace the log in the middle (the oldest "new" log) with information + // about the dropped logs. This means that we are effectively dropping one + // more "new" log. + numDropped := numDroppedLogs + 1 + logs[numOld] = opentracing.LogRecord{ + // Keep the timestamp of the last dropped event. + Timestamp: logs[numOld].Timestamp, + Fields: []log.Field{ + log.String("event", "dropped Span logs"), + log.Int("dropped_log_count", numDropped), + log.String("component", "jaeger-client"), + }, + } +} + +func (s *Span) fixLogsIfDropped() { + if s.numDroppedLogs == 0 { + return + } + fixLogs(s.logs, s.numDroppedLogs) + s.numDroppedLogs = 0 } // SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext @@ -274,8 +339,9 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { s.applySamplingDecision(decision, true) } if s.context.IsSampled() { + s.Lock() + s.fixLogsIfDropped() if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 { - s.Lock() // Note: bulk logs are not subject to maxLogsPerSpan limit if options.LogRecords != nil { s.logs = append(s.logs, options.LogRecords...) @@ -283,8 +349,8 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { for _, ld := range options.BulkLogData { s.logs = append(s.logs, ld.ToLogRecord()) } - s.Unlock() } + s.Unlock() } // call reportSpan even for non-sampled traces, to return span to the pool // and update metrics counter @@ -344,6 +410,7 @@ func (s *Span) reset() { // Note: To reuse memory we can save the pointers on the heap s.tags = s.tags[:0] s.logs = s.logs[:0] + s.numDroppedLogs = 0 s.references = s.references[:0] } diff --git a/vendor/github.com/uber/jaeger-client-go/span_context.go b/vendor/github.com/uber/jaeger-client-go/span_context.go index b7230abfe..1b44f3f8c 100644 --- a/vendor/github.com/uber/jaeger-client-go/span_context.go +++ b/vendor/github.com/uber/jaeger-client-go/span_context.go @@ -213,9 +213,9 @@ func (c SpanContext) SetFirehose() { func (c SpanContext) String() string { if c.traceID.High == 0 { - return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) } - return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) } // ContextFromString reconstructs the Context encoded in a string diff --git a/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go b/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go index d23ed2fc2..e69c6d603 100644 --- a/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go +++ b/vendor/github.com/uber/jaeger-client-go/thrift-gen/jaeger/ttypes.go @@ -1577,11 +1577,192 @@ func (p *Process) String() string { } // Attributes: +// - FullQueueDroppedSpans +// - TooLargeDroppedSpans +// - FailedToEmitSpans +type ClientStats struct { + FullQueueDroppedSpans int64 `thrift:"fullQueueDroppedSpans,1,required" json:"fullQueueDroppedSpans"` + TooLargeDroppedSpans int64 `thrift:"tooLargeDroppedSpans,2,required" json:"tooLargeDroppedSpans"` + FailedToEmitSpans int64 `thrift:"failedToEmitSpans,3,required" json:"failedToEmitSpans"` +} + +func NewClientStats() *ClientStats { + return &ClientStats{} +} + +func (p *ClientStats) GetFullQueueDroppedSpans() int64 { + return p.FullQueueDroppedSpans +} + +func (p *ClientStats) GetTooLargeDroppedSpans() int64 { + return p.TooLargeDroppedSpans +} + +func (p *ClientStats) GetFailedToEmitSpans() int64 { + return p.FailedToEmitSpans +} +func (p *ClientStats) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetFullQueueDroppedSpans bool = false + var issetTooLargeDroppedSpans bool = false + var issetFailedToEmitSpans bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.readField1(iprot); err != nil { + return err + } + issetFullQueueDroppedSpans = true + case 2: + if err := p.readField2(iprot); err != nil { + return err + } + issetTooLargeDroppedSpans = true + case 3: + if err := p.readField3(iprot); err != nil { + return err + } + issetFailedToEmitSpans = true + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetFullQueueDroppedSpans { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FullQueueDroppedSpans is not set")) + } + if !issetTooLargeDroppedSpans { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TooLargeDroppedSpans is not set")) + } + if !issetFailedToEmitSpans { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field FailedToEmitSpans is not set")) + } + return nil +} + +func (p *ClientStats) readField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.FullQueueDroppedSpans = v + } + return nil +} + +func (p *ClientStats) readField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 2: ", err) + } else { + p.TooLargeDroppedSpans = v + } + return nil +} + +func (p *ClientStats) readField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.FailedToEmitSpans = v + } + return nil +} + +func (p *ClientStats) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("ClientStats"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *ClientStats) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("fullQueueDroppedSpans", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:fullQueueDroppedSpans: ", p), err) + } + if err := oprot.WriteI64(int64(p.FullQueueDroppedSpans)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.fullQueueDroppedSpans (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:fullQueueDroppedSpans: ", p), err) + } + return err +} + +func (p *ClientStats) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("tooLargeDroppedSpans", thrift.I64, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:tooLargeDroppedSpans: ", p), err) + } + if err := oprot.WriteI64(int64(p.TooLargeDroppedSpans)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.tooLargeDroppedSpans (2) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:tooLargeDroppedSpans: ", p), err) + } + return err +} + +func (p *ClientStats) writeField3(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("failedToEmitSpans", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:failedToEmitSpans: ", p), err) + } + if err := oprot.WriteI64(int64(p.FailedToEmitSpans)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.failedToEmitSpans (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:failedToEmitSpans: ", p), err) + } + return err +} + +func (p *ClientStats) String() string { + if p == nil { + return "<nil>" + } + return fmt.Sprintf("ClientStats(%+v)", *p) +} + +// Attributes: // - Process // - Spans +// - SeqNo +// - Stats type Batch struct { - Process *Process `thrift:"process,1,required" json:"process"` - Spans []*Span `thrift:"spans,2,required" json:"spans"` + Process *Process `thrift:"process,1,required" json:"process"` + Spans []*Span `thrift:"spans,2,required" json:"spans"` + SeqNo *int64 `thrift:"seqNo,3" json:"seqNo,omitempty"` + Stats *ClientStats `thrift:"stats,4" json:"stats,omitempty"` } func NewBatch() *Batch { @@ -1600,10 +1781,36 @@ func (p *Batch) GetProcess() *Process { func (p *Batch) GetSpans() []*Span { return p.Spans } + +var Batch_SeqNo_DEFAULT int64 + +func (p *Batch) GetSeqNo() int64 { + if !p.IsSetSeqNo() { + return Batch_SeqNo_DEFAULT + } + return *p.SeqNo +} + +var Batch_Stats_DEFAULT *ClientStats + +func (p *Batch) GetStats() *ClientStats { + if !p.IsSetStats() { + return Batch_Stats_DEFAULT + } + return p.Stats +} func (p *Batch) IsSetProcess() bool { return p.Process != nil } +func (p *Batch) IsSetSeqNo() bool { + return p.SeqNo != nil +} + +func (p *Batch) IsSetStats() bool { + return p.Stats != nil +} + func (p *Batch) Read(iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -1631,6 +1838,14 @@ func (p *Batch) Read(iprot thrift.TProtocol) error { return err } issetSpans = true + case 3: + if err := p.readField3(iprot); err != nil { + return err + } + case 4: + if err := p.readField4(iprot); err != nil { + return err + } default: if err := iprot.Skip(fieldTypeId); err != nil { return err @@ -1680,6 +1895,23 @@ func (p *Batch) readField2(iprot thrift.TProtocol) error { return nil } +func (p *Batch) readField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.SeqNo = &v + } + return nil +} + +func (p *Batch) readField4(iprot thrift.TProtocol) error { + p.Stats = &ClientStats{} + if err := p.Stats.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Stats), err) + } + return nil +} + func (p *Batch) Write(oprot thrift.TProtocol) error { if err := oprot.WriteStructBegin("Batch"); err != nil { return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) @@ -1690,6 +1922,12 @@ func (p *Batch) Write(oprot thrift.TProtocol) error { if err := p.writeField2(oprot); err != nil { return err } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := p.writeField4(oprot); err != nil { + return err + } if err := oprot.WriteFieldStop(); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -1733,6 +1971,36 @@ func (p *Batch) writeField2(oprot thrift.TProtocol) (err error) { return err } +func (p *Batch) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetSeqNo() { + if err := oprot.WriteFieldBegin("seqNo", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:seqNo: ", p), err) + } + if err := oprot.WriteI64(int64(*p.SeqNo)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.seqNo (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:seqNo: ", p), err) + } + } + return err +} + +func (p *Batch) writeField4(oprot thrift.TProtocol) (err error) { + if p.IsSetStats() { + if err := oprot.WriteFieldBegin("stats", thrift.STRUCT, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:stats: ", p), err) + } + if err := p.Stats.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Stats), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:stats: ", p), err) + } + } + return err +} + func (p *Batch) String() string { if p == nil { return "<nil>" diff --git a/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go b/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go index 2d49e1d5f..15583e56b 100644 --- a/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go +++ b/vendor/github.com/uber/jaeger-client-go/thrift-gen/zipkincore/ttypes.go @@ -729,7 +729,7 @@ func (p *BinaryAnnotation) String() string { // precise value possible. For example, gettimeofday or syncing nanoTime // against a tick of currentTimeMillis. // -// For compatibilty with instrumentation that precede this field, collectors +// For compatibility with instrumentation that precede this field, collectors // or span stores can derive this via Annotation.timestamp. // For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. // @@ -741,7 +741,7 @@ func (p *BinaryAnnotation) String() string { // precise measurement decoupled from problems of clocks, such as skew or NTP // updates causing time to move backwards. // -// For compatibilty with instrumentation that precede this field, collectors +// For compatibility with instrumentation that precede this field, collectors // or span stores can derive this by subtracting Annotation.timestamp. // For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp. // diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go index f03372dc7..da43ec6db 100644 --- a/vendor/github.com/uber/jaeger-client-go/tracer.go +++ b/vendor/github.com/uber/jaeger-client-go/tracer.go @@ -52,6 +52,7 @@ type Tracer struct { highTraceIDGenerator func() uint64 // custom high trace ID generator maxTagValueLength int noDebugFlagOnForcedSampling bool + maxLogsPerSpan int // more options to come } // allocator of Span objects diff --git a/vendor/github.com/uber/jaeger-client-go/tracer_options.go b/vendor/github.com/uber/jaeger-client-go/tracer_options.go index 469685bb4..f016484b9 100644 --- a/vendor/github.com/uber/jaeger-client-go/tracer_options.go +++ b/vendor/github.com/uber/jaeger-client-go/tracer_options.go @@ -144,6 +144,18 @@ func (tracerOptions) MaxTagValueLength(maxTagValueLength int) TracerOption { } } +// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero +// value). If a span has more logs than this value, logs are dropped as +// necessary (and replaced with a log describing how many were dropped). +// +// About half of the MaxLogsPerSpan logs kept are the oldest logs, and about +// half are the newest logs. +func (tracerOptions) MaxLogsPerSpan(maxLogsPerSpan int) TracerOption { + return func(tracer *Tracer) { + tracer.options.maxLogsPerSpan = maxLogsPerSpan + } +} + func (tracerOptions) ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) TracerOption { return func(tracer *Tracer) { tracer.options.zipkinSharedRPCSpan = zipkinSharedRPCSpan diff --git a/vendor/github.com/uber/jaeger-client-go/transport/http.go b/vendor/github.com/uber/jaeger-client-go/transport/http.go index bc1b3e6b0..bb7eb00c9 100644 --- a/vendor/github.com/uber/jaeger-client-go/transport/http.go +++ b/vendor/github.com/uber/jaeger-client-go/transport/http.go @@ -39,6 +39,7 @@ type HTTPTransport struct { spans []*j.Span process *j.Process httpCredentials *HTTPBasicAuthCredentials + headers map[string]string } // HTTPBasicAuthCredentials stores credentials for HTTP basic auth. @@ -76,6 +77,13 @@ func HTTPRoundTripper(transport http.RoundTripper) HTTPOption { } } +// HTTPHeaders defines the HTTP headers that will be attached to the jaeger client's HTTP request +func HTTPHeaders(headers map[string]string) HTTPOption { + return func(c *HTTPTransport) { + c.headers = headers + } +} + // NewHTTPTransport returns a new HTTP-backend transport. url should be an http // url of the collector to handle POST request, typically something like: // http://hostname:14268/api/traces?format=jaeger.thrift @@ -136,6 +144,9 @@ func (c *HTTPTransport) send(spans []*j.Span) error { return err } req.Header.Set("Content-Type", "application/x-thrift") + for k, v := range c.headers { + req.Header.Set(k, v) + } if c.httpCredentials != nil { req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password) diff --git a/vendor/github.com/uber/jaeger-client-go/transport_udp.go b/vendor/github.com/uber/jaeger-client-go/transport_udp.go index 7b9ccf937..7370d8007 100644 --- a/vendor/github.com/uber/jaeger-client-go/transport_udp.go +++ b/vendor/github.com/uber/jaeger-client-go/transport_udp.go @@ -18,8 +18,8 @@ import ( "errors" "fmt" + "github.com/uber/jaeger-client-go/internal/reporterstats" "github.com/uber/jaeger-client-go/thrift" - j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/utils" ) @@ -27,12 +27,14 @@ import ( // Empirically obtained constant for how many bytes in the message are used for envelope. // The total datagram size is: // sizeof(Span) * numSpans + processByteSize + emitBatchOverhead <= maxPacketSize -// There is a unit test `TestEmitBatchOverhead` that validates this number. +// // Note that due to the use of Compact Thrift protocol, overhead grows with the number of spans // in the batch, because the length of the list is encoded as varint32, as well as SeqId. -const emitBatchOverhead = 30 +// +// There is a unit test `TestEmitBatchOverhead` that validates this number, it fails at <68. +const emitBatchOverhead = 70 -var errSpanTooLarge = errors.New("Span is too large") +var errSpanTooLarge = errors.New("span is too large") type udpSender struct { client *utils.AgentClientUDP @@ -44,9 +46,19 @@ type udpSender struct { thriftProtocol thrift.TProtocol process *j.Process processByteSize int + + // reporterStats provides access to stats that are only known to Reporter + reporterStats reporterstats.ReporterStats + + // The following counters are always non-negative, but we need to send them in signed i64 Thrift fields, + // so we keep them as signed. At 10k QPS, overflow happens in about 300 million years. + batchSeqNo int64 + tooLargeDroppedSpans int64 + failedToEmitSpans int64 } -// NewUDPTransport creates a reporter that submits spans to jaeger-agent +// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// TODO: (breaking change) move to transport/ package. func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { if len(hostPort) == 0 { hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) @@ -66,17 +78,22 @@ func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { return nil, err } - sender := &udpSender{ + return &udpSender{ client: client, maxSpanBytes: maxPacketSize - emitBatchOverhead, thriftBuffer: thriftBuffer, - thriftProtocol: thriftProtocol} - return sender, nil + thriftProtocol: thriftProtocol, + }, nil +} + +// SetReporterStats implements reporterstats.Receiver. +func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) { + s.reporterStats = rs } func (s *udpSender) calcSizeOfSerializedThrift(thriftStruct thrift.TStruct) int { s.thriftBuffer.Reset() - thriftStruct.Write(s.thriftProtocol) + _ = thriftStruct.Write(s.thriftProtocol) return s.thriftBuffer.Len() } @@ -89,6 +106,7 @@ func (s *udpSender) Append(span *Span) (int, error) { jSpan := BuildJaegerThrift(span) spanSize := s.calcSizeOfSerializedThrift(jSpan) if spanSize > s.maxSpanBytes { + s.tooLargeDroppedSpans++ return 1, errSpanTooLarge } @@ -112,9 +130,18 @@ func (s *udpSender) Flush() (int, error) { if n == 0 { return 0, nil } - err := s.client.EmitBatch(&j.Batch{Process: s.process, Spans: s.spanBuffer}) + s.batchSeqNo++ + batchSeqNo := int64(s.batchSeqNo) + err := s.client.EmitBatch(&j.Batch{ + Process: s.process, + Spans: s.spanBuffer, + SeqNo: &batchSeqNo, + Stats: s.makeStats(), + }) s.resetBuffers() - + if err != nil { + s.failedToEmitSpans += int64(n) + } return n, err } @@ -129,3 +156,15 @@ func (s *udpSender) resetBuffers() { s.spanBuffer = s.spanBuffer[:0] s.byteBufferSize = s.processByteSize } + +func (s *udpSender) makeStats() *j.ClientStats { + var dropped int64 + if s.reporterStats != nil { + dropped = s.reporterStats.SpansDroppedFromQueue() + } + return &j.ClientStats{ + FullQueueDroppedSpans: dropped, + TooLargeDroppedSpans: s.tooLargeDroppedSpans, + FailedToEmitSpans: s.failedToEmitSpans, + } +} diff --git a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go index 6f042073d..fadd73e49 100644 --- a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go +++ b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go @@ -85,7 +85,7 @@ func (a *AgentClientUDP) EmitBatch(batch *jaeger.Batch) error { return err } if a.thriftBuffer.Len() > a.maxPacketSize { - return fmt.Errorf("Data does not fit within one UDP packet; size %d, max %d, spans %d", + return fmt.Errorf("data does not fit within one UDP packet; size %d, max %d, spans %d", a.thriftBuffer.Len(), a.maxPacketSize, len(batch.Spans)) } _, err := a.connUDP.Write(a.thriftBuffer.Bytes()) diff --git a/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go b/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go index eb31c4369..73aeb000f 100644 --- a/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go +++ b/vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go @@ -40,6 +40,7 @@ var specialTagHandlers = map[string]func(*zipkinSpan, interface{}){ } // BuildZipkinThrift builds thrift span based on internal span. +// TODO: (breaking change) move to transport/zipkin and make private. func BuildZipkinThrift(s *Span) *z.Span { span := &zipkinSpan{Span: s} span.handleSpecialTags() diff --git a/vendor/modules.txt b/vendor/modules.txt index 8fa756fbb..0b4cfefa9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -478,11 +478,12 @@ github.com/stretchr/testify/require github.com/syndtr/gocapability/capability # github.com/tchap/go-patricia v2.3.0+incompatible github.com/tchap/go-patricia/patricia -# github.com/uber/jaeger-client-go v2.20.1+incompatible +# github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-client-go github.com/uber/jaeger-client-go/config github.com/uber/jaeger-client-go/internal/baggage github.com/uber/jaeger-client-go/internal/baggage/remote +github.com/uber/jaeger-client-go/internal/reporterstats github.com/uber/jaeger-client-go/internal/spanlog github.com/uber/jaeger-client-go/internal/throttler github.com/uber/jaeger-client-go/internal/throttler/remote |