aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/uber/jaeger-client-go/tracer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/uber/jaeger-client-go/tracer.go')
-rw-r--r--vendor/github.com/uber/jaeger-client-go/tracer.go127
1 files changed, 68 insertions, 59 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go
index 745a0c38a..f03372dc7 100644
--- a/vendor/github.com/uber/jaeger-client-go/tracer.go
+++ b/vendor/github.com/uber/jaeger-client-go/tracer.go
@@ -38,7 +38,7 @@ type Tracer struct {
serviceName string
hostIPv4 uint32 // this is for zipkin endpoint conversion
- sampler Sampler
+ sampler SamplerV2
reporter Reporter
metrics Metrics
logger log.Logger
@@ -74,6 +74,7 @@ type Tracer struct {
// NewTracer creates Tracer implementation that reports tracing to Jaeger.
// The returned io.Closer can be used in shutdown hooks to ensure that the internal
// queue of the Reporter is drained and all buffered spans are submitted to collectors.
+// TODO (breaking change) return *Tracer only, without closer.
func NewTracer(
serviceName string,
sampler Sampler,
@@ -82,7 +83,7 @@ func NewTracer(
) (opentracing.Tracer, io.Closer) {
t := &Tracer{
serviceName: serviceName,
- sampler: sampler,
+ sampler: samplerV1toV2(sampler),
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
@@ -261,7 +262,7 @@ func (t *Tracer) startSpanWithOptions(
rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
}
- var samplerTags []Tag
+ var internalTags []Tag
newTrace := false
if !isSelfRef {
if !hasParent || !parent.IsValid() {
@@ -272,13 +273,12 @@ func (t *Tracer) startSpanWithOptions(
}
ctx.spanID = SpanID(ctx.traceID.Low)
ctx.parentID = 0
- ctx.flags = byte(0)
+ ctx.samplingState = &samplingState{
+ localRootSpan: ctx.spanID,
+ }
if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
- ctx.flags |= (flagSampled | flagDebug)
- samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
- } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
- ctx.flags |= flagSampled
- samplerTags = tags
+ ctx.samplingState.setDebugAndSampled()
+ internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID})
}
} else {
ctx.traceID = parent.traceID
@@ -290,7 +290,11 @@ func (t *Tracer) startSpanWithOptions(
ctx.spanID = SpanID(t.randomID())
ctx.parentID = parent.spanID
}
- ctx.flags = parent.flags
+ ctx.samplingState = parent.samplingState
+ if parent.remote {
+ ctx.samplingState.setFinal()
+ ctx.samplingState.localRootSpan = ctx.spanID
+ }
}
if hasParent {
// copy baggage items
@@ -305,17 +309,30 @@ func (t *Tracer) startSpanWithOptions(
sp := t.newSpan()
sp.context = ctx
+ sp.tracer = t
+ sp.operationName = operationName
+ sp.startTime = options.StartTime
+ sp.duration = 0
+ sp.references = references
+ sp.firstInProcess = rpcServer || sp.context.parentID == 0
+
+ if !sp.isSamplingFinalized() {
+ decision := t.sampler.OnCreateSpan(sp)
+ sp.applySamplingDecision(decision, false)
+ }
sp.observer = t.observer.OnStartSpan(sp, operationName, options)
- return t.startSpanInternal(
- sp,
- operationName,
- options.StartTime,
- samplerTags,
- options.Tags,
- newTrace,
- rpcServer,
- references,
- )
+
+ if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 {
+ if sp.tags == nil || cap(sp.tags) < tagsTotalLength {
+ sp.tags = make([]Tag, 0, tagsTotalLength)
+ }
+ sp.tags = append(sp.tags, internalTags...)
+ for k, v := range options.Tags {
+ sp.setTagInternal(k, v, false)
+ }
+ }
+ t.emitNewSpanMetrics(sp, newTrace)
+ return sp
}
// Inject implements Inject() method of opentracing.Tracer
@@ -340,6 +357,7 @@ func (t *Tracer) Extract(
if err != nil {
return nil, err // ensure returned spanCtx is nil
}
+ spanCtx.remote = true
return spanCtx, nil
}
return nil, opentracing.ErrUnsupportedFormat
@@ -350,10 +368,10 @@ func (t *Tracer) Close() error {
t.reporter.Close()
t.sampler.Close()
if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
- mgr.Close()
+ _ = mgr.Close()
}
if throttler, ok := t.debugThrottler.(io.Closer); ok {
- throttler.Close()
+ _ = throttler.Close()
}
return nil
}
@@ -368,6 +386,7 @@ func (t *Tracer) Tags() []opentracing.Tag {
}
// getTag returns the value of specific tag, if not exists, return nil.
+// TODO only used by tests, move there.
func (t *Tracer) getTag(key string) (interface{}, bool) {
for _, tag := range t.tags {
if tag.key == key {
@@ -383,41 +402,21 @@ func (t *Tracer) newSpan() *Span {
return t.spanAllocator.Get()
}
-func (t *Tracer) startSpanInternal(
- sp *Span,
- operationName string,
- startTime time.Time,
- internalTags []Tag,
- tags opentracing.Tags,
- newTrace bool,
- rpcServer bool,
- references []Reference,
-) *Span {
- sp.tracer = t
- sp.operationName = operationName
- sp.startTime = startTime
- sp.duration = 0
- sp.references = references
- sp.firstInProcess = rpcServer || sp.context.parentID == 0
- if len(tags) > 0 || len(internalTags) > 0 {
- sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
- copy(sp.tags, internalTags)
- for k, v := range tags {
- sp.observer.OnSetTag(k, v)
- if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) {
- continue
- }
- sp.setTagNoLocking(k, v)
+// emitNewSpanMetrics generates metrics on the number of started spans and traces.
+// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the
+// server-side RPC span has the exact same trace/span/parent IDs as the
+// calling client-side span, but obviously the server side span is
+// no longer a root span of the trace.
+func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
+ if !sp.isSamplingFinalized() {
+ t.metrics.SpansStartedDelayedSampling.Inc(1)
+ if newTrace {
+ t.metrics.TracesStartedDelayedSampling.Inc(1)
}
- }
- // emit metrics
- if sp.context.IsSampled() {
+ // joining a trace is not possible, because sampling decision inherited from upstream is final
+ } else if sp.context.IsSampled() {
t.metrics.SpansStartedSampled.Inc(1)
if newTrace {
- // We cannot simply check for parentID==0 because in Zipkin model the
- // server-side RPC span has the exact same trace/span/parent IDs as the
- // calling client-side span, but obviously the server side span is
- // no longer a root span of the trace.
t.metrics.TracesStartedSampled.Inc(1)
} else if sp.firstInProcess {
t.metrics.TracesJoinedSampled.Inc(1)
@@ -430,15 +429,20 @@ func (t *Tracer) startSpanInternal(
t.metrics.TracesJoinedNotSampled.Inc(1)
}
}
- return sp
}
func (t *Tracer) reportSpan(sp *Span) {
- t.metrics.SpansFinished.Inc(1)
+ if !sp.isSamplingFinalized() {
+ t.metrics.SpansFinishedDelayedSampling.Inc(1)
+ } else if sp.context.IsSampled() {
+ t.metrics.SpansFinishedSampled.Inc(1)
+ } else {
+ t.metrics.SpansFinishedNotSampled.Inc(1)
+ }
- // Note: if the reporter is processing Span asynchronously need to Retain() it
- // otherwise, in the racing condition will be rewritten span data before it will be sent
- // * To remove object use method span.Release()
+ // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
+ // and then Release() it when no longer needed.
+ // Otherwise, the span may be reused for another trace and its data may be overwritten.
if sp.context.IsSampled() {
t.reporter.Report(sp)
}
@@ -466,6 +470,11 @@ func (t *Tracer) isDebugAllowed(operation string) bool {
return t.debugThrottler.IsAllowed(operation)
}
+// Sampler returns the sampler given to the tracer at creation.
+func (t *Tracer) Sampler() SamplerV2 {
+ return t.sampler
+}
+
// SelfRef creates an opentracing compliant SpanReference from a jaeger
// SpanContext. This is a factory function in order to encapsulate jaeger specific
// types.