diff options
Diffstat (limited to 'vendor/github.com/uber/jaeger-client-go/tracer.go')
-rw-r--r-- | vendor/github.com/uber/jaeger-client-go/tracer.go | 127 |
1 files changed, 68 insertions, 59 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go index 745a0c38a..f03372dc7 100644 --- a/vendor/github.com/uber/jaeger-client-go/tracer.go +++ b/vendor/github.com/uber/jaeger-client-go/tracer.go @@ -38,7 +38,7 @@ type Tracer struct { serviceName string hostIPv4 uint32 // this is for zipkin endpoint conversion - sampler Sampler + sampler SamplerV2 reporter Reporter metrics Metrics logger log.Logger @@ -74,6 +74,7 @@ type Tracer struct { // NewTracer creates Tracer implementation that reports tracing to Jaeger. // The returned io.Closer can be used in shutdown hooks to ensure that the internal // queue of the Reporter is drained and all buffered spans are submitted to collectors. +// TODO (breaking change) return *Tracer only, without closer. func NewTracer( serviceName string, sampler Sampler, @@ -82,7 +83,7 @@ func NewTracer( ) (opentracing.Tracer, io.Closer) { t := &Tracer{ serviceName: serviceName, - sampler: sampler, + sampler: samplerV1toV2(sampler), reporter: reporter, injectors: make(map[interface{}]Injector), extractors: make(map[interface{}]Extractor), @@ -261,7 +262,7 @@ func (t *Tracer) startSpanWithOptions( rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum)) } - var samplerTags []Tag + var internalTags []Tag newTrace := false if !isSelfRef { if !hasParent || !parent.IsValid() { @@ -272,13 +273,12 @@ func (t *Tracer) startSpanWithOptions( } ctx.spanID = SpanID(ctx.traceID.Low) ctx.parentID = 0 - ctx.flags = byte(0) + ctx.samplingState = &samplingState{ + localRootSpan: ctx.spanID, + } if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) { - ctx.flags |= (flagSampled | flagDebug) - samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}} - } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled { - ctx.flags |= flagSampled - samplerTags = tags + ctx.samplingState.setDebugAndSampled() + internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID}) } } else { ctx.traceID = parent.traceID @@ -290,7 +290,11 @@ func (t *Tracer) startSpanWithOptions( ctx.spanID = SpanID(t.randomID()) ctx.parentID = parent.spanID } - ctx.flags = parent.flags + ctx.samplingState = parent.samplingState + if parent.remote { + ctx.samplingState.setFinal() + ctx.samplingState.localRootSpan = ctx.spanID + } } if hasParent { // copy baggage items @@ -305,17 +309,30 @@ func (t *Tracer) startSpanWithOptions( sp := t.newSpan() sp.context = ctx + sp.tracer = t + sp.operationName = operationName + sp.startTime = options.StartTime + sp.duration = 0 + sp.references = references + sp.firstInProcess = rpcServer || sp.context.parentID == 0 + + if !sp.isSamplingFinalized() { + decision := t.sampler.OnCreateSpan(sp) + sp.applySamplingDecision(decision, false) + } sp.observer = t.observer.OnStartSpan(sp, operationName, options) - return t.startSpanInternal( - sp, - operationName, - options.StartTime, - samplerTags, - options.Tags, - newTrace, - rpcServer, - references, - ) + + if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 { + if sp.tags == nil || cap(sp.tags) < tagsTotalLength { + sp.tags = make([]Tag, 0, tagsTotalLength) + } + sp.tags = append(sp.tags, internalTags...) + for k, v := range options.Tags { + sp.setTagInternal(k, v, false) + } + } + t.emitNewSpanMetrics(sp, newTrace) + return sp } // Inject implements Inject() method of opentracing.Tracer @@ -340,6 +357,7 @@ func (t *Tracer) Extract( if err != nil { return nil, err // ensure returned spanCtx is nil } + spanCtx.remote = true return spanCtx, nil } return nil, opentracing.ErrUnsupportedFormat @@ -350,10 +368,10 @@ func (t *Tracer) Close() error { t.reporter.Close() t.sampler.Close() if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok { - mgr.Close() + _ = mgr.Close() } if throttler, ok := t.debugThrottler.(io.Closer); ok { - throttler.Close() + _ = throttler.Close() } return nil } @@ -368,6 +386,7 @@ func (t *Tracer) Tags() []opentracing.Tag { } // getTag returns the value of specific tag, if not exists, return nil. +// TODO only used by tests, move there. func (t *Tracer) getTag(key string) (interface{}, bool) { for _, tag := range t.tags { if tag.key == key { @@ -383,41 +402,21 @@ func (t *Tracer) newSpan() *Span { return t.spanAllocator.Get() } -func (t *Tracer) startSpanInternal( - sp *Span, - operationName string, - startTime time.Time, - internalTags []Tag, - tags opentracing.Tags, - newTrace bool, - rpcServer bool, - references []Reference, -) *Span { - sp.tracer = t - sp.operationName = operationName - sp.startTime = startTime - sp.duration = 0 - sp.references = references - sp.firstInProcess = rpcServer || sp.context.parentID == 0 - if len(tags) > 0 || len(internalTags) > 0 { - sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags)) - copy(sp.tags, internalTags) - for k, v := range tags { - sp.observer.OnSetTag(k, v) - if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) { - continue - } - sp.setTagNoLocking(k, v) +// emitNewSpanMetrics generates metrics on the number of started spans and traces. +// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the +// server-side RPC span has the exact same trace/span/parent IDs as the +// calling client-side span, but obviously the server side span is +// no longer a root span of the trace. +func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) { + if !sp.isSamplingFinalized() { + t.metrics.SpansStartedDelayedSampling.Inc(1) + if newTrace { + t.metrics.TracesStartedDelayedSampling.Inc(1) } - } - // emit metrics - if sp.context.IsSampled() { + // joining a trace is not possible, because sampling decision inherited from upstream is final + } else if sp.context.IsSampled() { t.metrics.SpansStartedSampled.Inc(1) if newTrace { - // We cannot simply check for parentID==0 because in Zipkin model the - // server-side RPC span has the exact same trace/span/parent IDs as the - // calling client-side span, but obviously the server side span is - // no longer a root span of the trace. t.metrics.TracesStartedSampled.Inc(1) } else if sp.firstInProcess { t.metrics.TracesJoinedSampled.Inc(1) @@ -430,15 +429,20 @@ func (t *Tracer) startSpanInternal( t.metrics.TracesJoinedNotSampled.Inc(1) } } - return sp } func (t *Tracer) reportSpan(sp *Span) { - t.metrics.SpansFinished.Inc(1) + if !sp.isSamplingFinalized() { + t.metrics.SpansFinishedDelayedSampling.Inc(1) + } else if sp.context.IsSampled() { + t.metrics.SpansFinishedSampled.Inc(1) + } else { + t.metrics.SpansFinishedNotSampled.Inc(1) + } - // Note: if the reporter is processing Span asynchronously need to Retain() it - // otherwise, in the racing condition will be rewritten span data before it will be sent - // * To remove object use method span.Release() + // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span, + // and then Release() it when no longer needed. + // Otherwise, the span may be reused for another trace and its data may be overwritten. if sp.context.IsSampled() { t.reporter.Report(sp) } @@ -466,6 +470,11 @@ func (t *Tracer) isDebugAllowed(operation string) bool { return t.debugThrottler.IsAllowed(operation) } +// Sampler returns the sampler given to the tracer at creation. +func (t *Tracer) Sampler() SamplerV2 { + return t.sampler +} + // SelfRef creates an opentracing compliant SpanReference from a jaeger // SpanContext. This is a factory function in order to encapsulate jaeger specific // types. |