diff options
Diffstat (limited to 'vendor/github.com/uber/jaeger-client-go/sampler.go')
-rw-r--r-- | vendor/github.com/uber/jaeger-client-go/sampler.go | 446 |
1 files changed, 189 insertions, 257 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/sampler.go b/vendor/github.com/uber/jaeger-client-go/sampler.go index ea6984e02..6195d59c5 100644 --- a/vendor/github.com/uber/jaeger-client-go/sampler.go +++ b/vendor/github.com/uber/jaeger-client-go/sampler.go @@ -17,19 +17,14 @@ package jaeger import ( "fmt" "math" - "net/url" "sync" - "sync/atomic" - "time" - "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift-gen/sampling" "github.com/uber/jaeger-client-go/utils" ) const ( - defaultSamplingRefreshInterval = time.Minute - defaultMaxOperations = 2000 + defaultMaxOperations = 2000 ) // Sampler decides whether a new trace should be sampled or not. @@ -47,9 +42,7 @@ type Sampler interface { // Equal checks if the `other` sampler is functionally equivalent // to this sampler. - // TODO remove this function. This function is used to determine if 2 samplers are equivalent - // which does not bode well with the adaptive sampler which has to create all the composite samplers - // for the comparison to occur. This is expensive to do if only one sampler has changed. + // TODO (breaking change) remove this function. See PerOperationSampler.Equals for explanation. Equal(other Sampler) bool } @@ -57,17 +50,23 @@ type Sampler interface { // ConstSampler is a sampler that always makes the same decision. type ConstSampler struct { + legacySamplerV1Base Decision bool tags []Tag } // NewConstSampler creates a ConstSampler. -func NewConstSampler(sample bool) Sampler { +func NewConstSampler(sample bool) *ConstSampler { tags := []Tag{ {key: SamplerTypeTagKey, value: SamplerTypeConst}, {key: SamplerParamTagKey, value: sample}, } - return &ConstSampler{Decision: sample, tags: tags} + s := &ConstSampler{ + Decision: sample, + tags: tags, + } + s.delegate = s.IsSampled + return s } // IsSampled implements IsSampled() of Sampler. @@ -88,11 +87,17 @@ func (s *ConstSampler) Equal(other Sampler) bool { return false } +// String is used to log sampler details. +func (s *ConstSampler) String() string { + return fmt.Sprintf("ConstSampler(decision=%t)", s.Decision) +} + // ----------------------- // ProbabilisticSampler is a sampler that randomly samples a certain percentage // of traces. type ProbabilisticSampler struct { + legacySamplerV1Base samplingRate float64 samplingBoundary uint64 tags []Tag @@ -114,16 +119,19 @@ func NewProbabilisticSampler(samplingRate float64) (*ProbabilisticSampler, error } func newProbabilisticSampler(samplingRate float64) *ProbabilisticSampler { - samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0)) - tags := []Tag{ + s := new(ProbabilisticSampler) + s.delegate = s.IsSampled + return s.init(samplingRate) +} + +func (s *ProbabilisticSampler) init(samplingRate float64) *ProbabilisticSampler { + s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0)) + s.samplingBoundary = uint64(float64(maxRandomNumber) * s.samplingRate) + s.tags = []Tag{ {key: SamplerTypeTagKey, value: SamplerTypeProbabilistic}, - {key: SamplerParamTagKey, value: samplingRate}, - } - return &ProbabilisticSampler{ - samplingRate: samplingRate, - samplingBoundary: uint64(float64(maxRandomNumber) * samplingRate), - tags: tags, + {key: SamplerParamTagKey, value: s.samplingRate}, } + return s } // SamplingRate returns the sampling probability this sampled was constructed with. @@ -149,65 +157,104 @@ func (s *ProbabilisticSampler) Equal(other Sampler) bool { return false } +// Update modifies in-place the sampling rate. Locking must be done externally. +func (s *ProbabilisticSampler) Update(samplingRate float64) error { + if samplingRate < 0.0 || samplingRate > 1.0 { + return fmt.Errorf("Sampling Rate must be between 0.0 and 1.0, received %f", samplingRate) + } + s.init(samplingRate) + return nil +} + +// String is used to log sampler details. +func (s *ProbabilisticSampler) String() string { + return fmt.Sprintf("ProbabilisticSampler(samplingRate=%v)", s.samplingRate) +} + // ----------------------- -type rateLimitingSampler struct { +// RateLimitingSampler samples at most maxTracesPerSecond. The distribution of sampled traces follows +// burstiness of the service, i.e. a service with uniformly distributed requests will have those +// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a +// number of sequential requests can be sampled each second. +type RateLimitingSampler struct { + legacySamplerV1Base maxTracesPerSecond float64 - rateLimiter utils.RateLimiter + rateLimiter *utils.ReconfigurableRateLimiter tags []Tag } -// NewRateLimitingSampler creates a sampler that samples at most maxTracesPerSecond. The distribution of sampled -// traces follows burstiness of the service, i.e. a service with uniformly distributed requests will have those -// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a number of -// sequential requests can be sampled each second. -func NewRateLimitingSampler(maxTracesPerSecond float64) Sampler { - tags := []Tag{ +// NewRateLimitingSampler creates new RateLimitingSampler. +func NewRateLimitingSampler(maxTracesPerSecond float64) *RateLimitingSampler { + s := new(RateLimitingSampler) + s.delegate = s.IsSampled + return s.init(maxTracesPerSecond) +} + +func (s *RateLimitingSampler) init(maxTracesPerSecond float64) *RateLimitingSampler { + if s.rateLimiter == nil { + s.rateLimiter = utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0)) + } else { + s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0)) + } + s.maxTracesPerSecond = maxTracesPerSecond + s.tags = []Tag{ {key: SamplerTypeTagKey, value: SamplerTypeRateLimiting}, {key: SamplerParamTagKey, value: maxTracesPerSecond}, } - return &rateLimitingSampler{ - maxTracesPerSecond: maxTracesPerSecond, - rateLimiter: utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0)), - tags: tags, - } + return s } // IsSampled implements IsSampled() of Sampler. -func (s *rateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { +func (s *RateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { return s.rateLimiter.CheckCredit(1.0), s.tags } -func (s *rateLimitingSampler) Close() { +// Update reconfigures the rate limiter, while preserving its accumulated balance. +// Locking must be done externally. +func (s *RateLimitingSampler) Update(maxTracesPerSecond float64) { + if s.maxTracesPerSecond != maxTracesPerSecond { + s.init(maxTracesPerSecond) + } +} + +// Close does nothing. +func (s *RateLimitingSampler) Close() { // nothing to do } -func (s *rateLimitingSampler) Equal(other Sampler) bool { - if o, ok := other.(*rateLimitingSampler); ok { +// Equal compares with another sampler. +func (s *RateLimitingSampler) Equal(other Sampler) bool { + if o, ok := other.(*RateLimitingSampler); ok { return s.maxTracesPerSecond == o.maxTracesPerSecond } return false } +// String is used to log sampler details. +func (s *RateLimitingSampler) String() string { + return fmt.Sprintf("RateLimitingSampler(maxTracesPerSecond=%v)", s.maxTracesPerSecond) +} + // ----------------------- -// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both probabilisticSampler and -// rateLimitingSampler. The rateLimitingSampler is used as a guaranteed lower bound sampler such that +// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both ProbabilisticSampler and +// RateLimitingSampler. The RateLimitingSampler is used as a guaranteed lower bound sampler such that // every operation is sampled at least once in a time interval defined by the lowerBound. ie a lowerBound // of 1.0 / (60 * 10) will sample an operation at least once every 10 minutes. // -// The probabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both -// samplers return true, the tags for probabilisticSampler will be used. +// The ProbabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both +// samplers return true, the tags for ProbabilisticSampler will be used. type GuaranteedThroughputProbabilisticSampler struct { probabilisticSampler *ProbabilisticSampler - lowerBoundSampler Sampler + lowerBoundSampler *RateLimitingSampler tags []Tag samplingRate float64 lowerBound float64 } // NewGuaranteedThroughputProbabilisticSampler returns a delegating sampler that applies both -// probabilisticSampler and rateLimitingSampler. +// ProbabilisticSampler and RateLimitingSampler. func NewGuaranteedThroughputProbabilisticSampler( lowerBound, samplingRate float64, ) (*GuaranteedThroughputProbabilisticSampler, error) { @@ -224,8 +271,14 @@ func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float6 } func (s *GuaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) { - if s.probabilisticSampler == nil || s.samplingRate != samplingRate { + if s.probabilisticSampler == nil { s.probabilisticSampler = newProbabilisticSampler(samplingRate) + } else if s.samplingRate != samplingRate { + s.probabilisticSampler.init(samplingRate) + } + // since we don't validate samplingRate, sampler may have clamped it to [0, 1] interval + samplingRate = s.probabilisticSampler.SamplingRate() + if s.samplingRate != samplingRate || s.tags == nil { s.samplingRate = s.probabilisticSampler.SamplingRate() s.tags = []Tag{ {key: SamplerTypeTagKey, value: SamplerTypeLowerBound}, @@ -252,7 +305,7 @@ func (s *GuaranteedThroughputProbabilisticSampler) Close() { // Equal implements Equal() of Sampler. func (s *GuaranteedThroughputProbabilisticSampler) Equal(other Sampler) bool { - // NB The Equal() function is expensive and will be removed. See adaptiveSampler.Equal() for + // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for // more information. return false } @@ -261,52 +314,116 @@ func (s *GuaranteedThroughputProbabilisticSampler) Equal(other Sampler) bool { func (s *GuaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRate float64) { s.setProbabilisticSampler(samplingRate) if s.lowerBound != lowerBound { - s.lowerBoundSampler = NewRateLimitingSampler(lowerBound) + s.lowerBoundSampler.Update(lowerBound) s.lowerBound = lowerBound } } // ----------------------- -type adaptiveSampler struct { +// PerOperationSampler is a delegating sampler that applies GuaranteedThroughputProbabilisticSampler +// on a per-operation basis. +type PerOperationSampler struct { sync.RWMutex samplers map[string]*GuaranteedThroughputProbabilisticSampler defaultSampler *ProbabilisticSampler lowerBound float64 maxOperations int + + // see description in PerOperationSamplerParams + operationNameLateBinding bool } -// NewAdaptiveSampler returns a delegating sampler that applies both probabilisticSampler and -// rateLimitingSampler via the guaranteedThroughputProbabilisticSampler. This sampler keeps track of all -// operations and delegates calls to the respective guaranteedThroughputProbabilisticSampler. -func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (Sampler, error) { - return newAdaptiveSampler(strategies, maxOperations), nil +// NewAdaptiveSampler returns a new PerOperationSampler. +// Deprecated: please use NewPerOperationSampler. +func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (*PerOperationSampler, error) { + return NewPerOperationSampler(PerOperationSamplerParams{ + MaxOperations: maxOperations, + Strategies: strategies, + }), nil } -func newAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) Sampler { +// PerOperationSamplerParams defines parameters when creating PerOperationSampler. +type PerOperationSamplerParams struct { + // Max number of operations that will be tracked. Other operations will be given default strategy. + MaxOperations int + + // Opt-in feature for applications that require late binding of span name via explicit call to SetOperationName. + // When this feature is enabled, the sampler will return retryable=true from OnCreateSpan(), thus leaving + // the sampling decision as non-final (and the span as writeable). This may lead to degraded performance + // in applications that always provide the correct span name on trace creation. + // + // For backwards compatibility this option is off by default. + OperationNameLateBinding bool + + // Initial configuration of the sampling strategies (usually retrieved from the backend by Remote Sampler). + Strategies *sampling.PerOperationSamplingStrategies +} + +// NewPerOperationSampler returns a new PerOperationSampler. +func NewPerOperationSampler(params PerOperationSamplerParams) *PerOperationSampler { samplers := make(map[string]*GuaranteedThroughputProbabilisticSampler) - for _, strategy := range strategies.PerOperationStrategies { + for _, strategy := range params.Strategies.PerOperationStrategies { sampler := newGuaranteedThroughputProbabilisticSampler( - strategies.DefaultLowerBoundTracesPerSecond, + params.Strategies.DefaultLowerBoundTracesPerSecond, strategy.ProbabilisticSampling.SamplingRate, ) samplers[strategy.Operation] = sampler } - return &adaptiveSampler{ - samplers: samplers, - defaultSampler: newProbabilisticSampler(strategies.DefaultSamplingProbability), - lowerBound: strategies.DefaultLowerBoundTracesPerSecond, - maxOperations: maxOperations, + return &PerOperationSampler{ + samplers: samplers, + defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability), + lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond, + maxOperations: params.MaxOperations, + operationNameLateBinding: params.OperationNameLateBinding, } } -func (s *adaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { +// IsSampled is not used and only exists to match Sampler V1 API. +// TODO (breaking change) remove when upgrading everything to SamplerV2 +func (s *PerOperationSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { + return false, nil +} + +func (s *PerOperationSampler) trySampling(span *Span, operationName string) (bool, []Tag) { + samplerV1 := s.getSamplerForOperation(operationName) + var sampled bool + var tags []Tag + if span.context.samplingState.isLocalRootSpan(span.context.spanID) { + sampled, tags = samplerV1.IsSampled(span.context.TraceID(), operationName) + } + return sampled, tags +} + +// OnCreateSpan implements OnCreateSpan of SamplerV2. +func (s *PerOperationSampler) OnCreateSpan(span *Span) SamplingDecision { + sampled, tags := s.trySampling(span, span.OperationName()) + return SamplingDecision{Sample: sampled, Retryable: s.operationNameLateBinding, Tags: tags} +} + +// OnSetOperationName implements OnSetOperationName of SamplerV2. +func (s *PerOperationSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision { + sampled, tags := s.trySampling(span, operationName) + return SamplingDecision{Sample: sampled, Retryable: false, Tags: tags} +} + +// OnSetTag implements OnSetTag of SamplerV2. +func (s *PerOperationSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision { + return SamplingDecision{Sample: false, Retryable: true} +} + +// OnFinishSpan implements OnFinishSpan of SamplerV2. +func (s *PerOperationSampler) OnFinishSpan(span *Span) SamplingDecision { + return SamplingDecision{Sample: false, Retryable: true} +} + +func (s *PerOperationSampler) getSamplerForOperation(operation string) Sampler { s.RLock() sampler, ok := s.samplers[operation] if ok { defer s.RUnlock() - return sampler.IsSampled(id, operation) + return sampler } s.RUnlock() s.Lock() @@ -315,18 +432,19 @@ func (s *adaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag) // Check if sampler has already been created sampler, ok = s.samplers[operation] if ok { - return sampler.IsSampled(id, operation) + return sampler } // Store only up to maxOperations of unique ops. if len(s.samplers) >= s.maxOperations { - return s.defaultSampler.IsSampled(id, operation) + return s.defaultSampler } newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate()) s.samplers[operation] = newSampler - return newSampler.IsSampled(id, operation) + return newSampler } -func (s *adaptiveSampler) Close() { +// Close invokes Close on all underlying samplers. +func (s *PerOperationSampler) Close() { s.Lock() defer s.Unlock() for _, sampler := range s.samplers { @@ -335,16 +453,18 @@ func (s *adaptiveSampler) Close() { s.defaultSampler.Close() } -func (s *adaptiveSampler) Equal(other Sampler) bool { - // NB The Equal() function is overly expensive for adaptiveSampler since it's composed of multiple +// Equal is not used. +// TODO (breaking change) remove this in the future +func (s *PerOperationSampler) Equal(other Sampler) bool { + // NB The Equal() function is overly expensive for PerOperationSampler since it's composed of multiple // samplers which all need to be initialized before this function can be called for a comparison. - // Therefore, adaptiveSampler uses the update() function to only alter the samplers that need + // Therefore, PerOperationSampler uses the update() function to only alter the samplers that need // changing. Hence this function always returns false so that the update function can be called. // Once the Equal() function is removed from the Sampler API, this will no longer be needed. return false } -func (s *adaptiveSampler) update(strategies *sampling.PerOperationSamplingStrategies) { +func (s *PerOperationSampler) update(strategies *sampling.PerOperationSamplingStrategies) { s.Lock() defer s.Unlock() newSamplers := map[string]*GuaranteedThroughputProbabilisticSampler{} @@ -369,191 +489,3 @@ func (s *adaptiveSampler) update(strategies *sampling.PerOperationSamplingStrate } s.samplers = newSamplers } - -// ----------------------- - -// RemotelyControlledSampler is a delegating sampler that polls a remote server -// for the appropriate sampling strategy, constructs a corresponding sampler and -// delegates to it for sampling decisions. -type RemotelyControlledSampler struct { - // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment. - // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq - closed int64 // 0 - not closed, 1 - closed - - sync.RWMutex - samplerOptions - - serviceName string - manager sampling.SamplingManager - doneChan chan *sync.WaitGroup -} - -type httpSamplingManager struct { - serverURL string -} - -func (s *httpSamplingManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { - var out sampling.SamplingStrategyResponse - v := url.Values{} - v.Set("service", serviceName) - if err := utils.GetJSON(s.serverURL+"?"+v.Encode(), &out); err != nil { - return nil, err - } - return &out, nil -} - -// NewRemotelyControlledSampler creates a sampler that periodically pulls -// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent). -func NewRemotelyControlledSampler( - serviceName string, - opts ...SamplerOption, -) *RemotelyControlledSampler { - options := applySamplerOptions(opts...) - sampler := &RemotelyControlledSampler{ - samplerOptions: options, - serviceName: serviceName, - manager: &httpSamplingManager{serverURL: options.samplingServerURL}, - doneChan: make(chan *sync.WaitGroup), - } - go sampler.pollController() - return sampler -} - -func applySamplerOptions(opts ...SamplerOption) samplerOptions { - options := samplerOptions{} - for _, option := range opts { - option(&options) - } - if options.sampler == nil { - options.sampler = newProbabilisticSampler(0.001) - } - if options.logger == nil { - options.logger = log.NullLogger - } - if options.maxOperations <= 0 { - options.maxOperations = defaultMaxOperations - } - if options.samplingServerURL == "" { - options.samplingServerURL = DefaultSamplingServerURL - } - if options.metrics == nil { - options.metrics = NewNullMetrics() - } - if options.samplingRefreshInterval <= 0 { - options.samplingRefreshInterval = defaultSamplingRefreshInterval - } - return options -} - -// IsSampled implements IsSampled() of Sampler. -func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) { - s.RLock() - defer s.RUnlock() - return s.sampler.IsSampled(id, operation) -} - -// Close implements Close() of Sampler. -func (s *RemotelyControlledSampler) Close() { - if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped { - s.logger.Error("Repeated attempt to close the sampler is ignored") - return - } - - var wg sync.WaitGroup - wg.Add(1) - s.doneChan <- &wg - wg.Wait() -} - -// Equal implements Equal() of Sampler. -func (s *RemotelyControlledSampler) Equal(other Sampler) bool { - // NB The Equal() function is expensive and will be removed. See adaptiveSampler.Equal() for - // more information. - if o, ok := other.(*RemotelyControlledSampler); ok { - s.RLock() - o.RLock() - defer s.RUnlock() - defer o.RUnlock() - return s.sampler.Equal(o.sampler) - } - return false -} - -func (s *RemotelyControlledSampler) pollController() { - ticker := time.NewTicker(s.samplingRefreshInterval) - defer ticker.Stop() - s.pollControllerWithTicker(ticker) -} - -func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) { - for { - select { - case <-ticker.C: - s.updateSampler() - case wg := <-s.doneChan: - wg.Done() - return - } - } -} - -func (s *RemotelyControlledSampler) getSampler() Sampler { - s.Lock() - defer s.Unlock() - return s.sampler -} - -func (s *RemotelyControlledSampler) setSampler(sampler Sampler) { - s.Lock() - defer s.Unlock() - s.sampler = sampler -} - -func (s *RemotelyControlledSampler) updateSampler() { - res, err := s.manager.GetSamplingStrategy(s.serviceName) - if err != nil { - s.metrics.SamplerQueryFailure.Inc(1) - s.logger.Infof("Unable to query sampling strategy: %v", err) - return - } - s.Lock() - defer s.Unlock() - - s.metrics.SamplerRetrieved.Inc(1) - if strategies := res.GetOperationSampling(); strategies != nil { - s.updateAdaptiveSampler(strategies) - } else { - err = s.updateRateLimitingOrProbabilisticSampler(res) - } - if err != nil { - s.metrics.SamplerUpdateFailure.Inc(1) - s.logger.Infof("Unable to handle sampling strategy response %+v. Got error: %v", res, err) - return - } - s.metrics.SamplerUpdated.Inc(1) -} - -// NB: this function should only be called while holding a Write lock -func (s *RemotelyControlledSampler) updateAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies) { - if adaptiveSampler, ok := s.sampler.(*adaptiveSampler); ok { - adaptiveSampler.update(strategies) - } else { - s.sampler = newAdaptiveSampler(strategies, s.maxOperations) - } -} - -// NB: this function should only be called while holding a Write lock -func (s *RemotelyControlledSampler) updateRateLimitingOrProbabilisticSampler(res *sampling.SamplingStrategyResponse) error { - var newSampler Sampler - if probabilistic := res.GetProbabilisticSampling(); probabilistic != nil { - newSampler = newProbabilisticSampler(probabilistic.SamplingRate) - } else if rateLimiting := res.GetRateLimitingSampling(); rateLimiting != nil { - newSampler = NewRateLimitingSampler(float64(rateLimiting.MaxTracesPerSecond)) - } else { - return fmt.Errorf("Unsupported sampling strategy type %v", res.GetStrategyType()) - } - if !s.sampler.Equal(newSampler) { - s.sampler = newSampler - } - return nil -} |