aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/uber
diff options
context:
space:
mode:
authordependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>2019-11-07 09:16:59 +0000
committerValentin Rothberg <rothberg@redhat.com>2019-11-07 14:05:10 +0000
commit75d67c49208137652adb13956691d4861b2ac15d (patch)
treeaafc00c7826fca579f7ea97dbd10ccec981dad39 /vendor/github.com/uber
parent2e2d82ce767895e80da0d6cf5e6d391901687deb (diff)
downloadpodman-75d67c49208137652adb13956691d4861b2ac15d.tar.gz
podman-75d67c49208137652adb13956691d4861b2ac15d.tar.bz2
podman-75d67c49208137652adb13956691d4861b2ac15d.zip
Bump github.com/uber/jaeger-client-go
Bumps [github.com/uber/jaeger-client-go](https://github.com/uber/jaeger-client-go) from 2.19.0+incompatible to 2.20.0+incompatible. - [Release notes](https://github.com/uber/jaeger-client-go/releases) - [Changelog](https://github.com/jaegertracing/jaeger-client-go/blob/master/CHANGELOG.md) - [Commits](https://github.com/uber/jaeger-client-go/compare/v2.19.0...v2.20.0) Signed-off-by: dependabot-preview[bot] <support@dependabot.com> Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
Diffstat (limited to 'vendor/github.com/uber')
-rw-r--r--vendor/github.com/uber/jaeger-client-go/.travis.yml9
-rw-r--r--vendor/github.com/uber/jaeger-client-go/CHANGELOG.md39
-rw-r--r--vendor/github.com/uber/jaeger-client-go/Gopkg.lock125
-rw-r--r--vendor/github.com/uber/jaeger-client-go/Gopkg.toml2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/Makefile3
-rw-r--r--vendor/github.com/uber/jaeger-client-go/README.md23
-rw-r--r--vendor/github.com/uber/jaeger-client-go/config/config.go4
-rw-r--r--vendor/github.com/uber/jaeger-client-go/config/config_env.go24
-rw-r--r--vendor/github.com/uber/jaeger-client-go/constants.go2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/metrics.go20
-rw-r--r--vendor/github.com/uber/jaeger-client-go/propagation.go8
-rw-r--r--vendor/github.com/uber/jaeger-client-go/reporter.go2
-rw-r--r--vendor/github.com/uber/jaeger-client-go/sampler.go446
-rw-r--r--vendor/github.com/uber/jaeger-client-go/sampler_remote.go334
-rw-r--r--vendor/github.com/uber/jaeger-client-go/sampler_remote_options.go (renamed from vendor/github.com/uber/jaeger-client-go/sampler_options.go)71
-rw-r--r--vendor/github.com/uber/jaeger-client-go/sampler_v2.go93
-rw-r--r--vendor/github.com/uber/jaeger-client-go/span.go144
-rw-r--r--vendor/github.com/uber/jaeger-client-go/span_context.go (renamed from vendor/github.com/uber/jaeger-client-go/context.go)161
-rw-r--r--vendor/github.com/uber/jaeger-client-go/tracer.go127
-rw-r--r--vendor/github.com/uber/jaeger-client-go/utils/rate_limiter.go93
-rw-r--r--vendor/github.com/uber/jaeger-client-go/zipkin.go5
22 files changed, 1294 insertions, 443 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/.travis.yml b/vendor/github.com/uber/jaeger-client-go/.travis.yml
index 0d7bdd9ab..e81cc8805 100644
--- a/vendor/github.com/uber/jaeger-client-go/.travis.yml
+++ b/vendor/github.com/uber/jaeger-client-go/.travis.yml
@@ -7,21 +7,22 @@ dist: trusty
matrix:
include:
- - go: 1.12.x
+ - go: 1.13.x
env:
- TESTS=true
- USE_DEP=true
- COVERAGE=true
- - go: 1.12.x
+ - go: 1.13.x
env:
- USE_DEP=true
- CROSSDOCK=true
- - go: 1.12.x
+ - go: 1.13.x
env:
- TESTS=true
- USE_DEP=false
- USE_GLIDE=true
- - go: 1.11.x
+ # test with previous version of Go
+ - go: 1.12.x
env:
- TESTS=true
- USE_DEP=true
diff --git a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md
index 31b22e40c..c4590bf93 100644
--- a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md
+++ b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md
@@ -1,6 +1,45 @@
Changes by Version
==================
+2.20.0 (2019-11-06)
+-------------------
+
+## New Features
+
+- Allow all in-process spans of a trace to share sampling state (#443) -- Prithvi Raj
+
+ Sampling state is shared between all spans of the trace that are still in memory.
+ This allows implementation of delayed sampling decisions (see below).
+
+- Support delayed sampling decisions (#449) -- Yuri Shkuro
+
+ This is a large structural change to how the samplers work.
+ It allows some samplers to be executed multiple times on different
+ span events (like setting a tag) and make a positive sampling decision
+ later in the span life cycle, or even based on children spans.
+ See [README](./README.md#delayed-sampling) for more details.
+
+ There is a related minor change in behavior of the adaptive (per-operation) sampler,
+ which will no longer re-sample the trace when `span.SetOperation()` is called, i.e. the
+ operation used to make the sampling decision is always the one provided at span creation.
+
+- Add experimental tag matching sampler (#452) -- Yuri Shkuro
+
+ A sampler that can sample a trace based on a certain tag added to the root
+ span or one of its local (in-process) children. The sampler can be used with
+ another experimental `PrioritySampler` that allows multiple samplers to try
+ to make a sampling decision, in a certain priority order.
+
+- [log/zap] Report whether a trace was sampled (#445) -- Abhinav Gupta
+- Allow config.FromEnv() to enrich an existing config object (#436) -- Vineeth Reddy
+
+## Minor patches
+
+- Expose Sampler on Tracer and accept sampler options via Configuration (#460) -- Yuri Shkuro
+- Fix github.com/uber-go/atomic import (#464) -- Yuri Shkuro
+- Add nodejs to crossdock tests (#441) -- Bhavin Gandhi
+- Bump Go compiler version to 1.13 (#453) -- Yuri Shkuro
+
2.19.0 (2019-09-23)
-------------------
diff --git a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock
index 1ed86f4a7..5a42ebf16 100644
--- a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock
+++ b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock
@@ -2,6 +2,14 @@
[[projects]]
+ digest = "1:9f3b30d9f8e0d7040f729b82dcbc8f0dead820a133b3147ce355fc451f32d761"
+ name = "github.com/BurntSushi/toml"
+ packages = ["."]
+ pruneopts = "UT"
+ revision = "3012a1dbe2e4bd1391d42b32f0577cb7bbc7f005"
+ version = "v0.3.1"
+
+[[projects]]
digest = "1:d6afaeed1502aa28e80a4ed0981d570ad91b2579193404256ce672ed0a609e0d"
name = "github.com/beorn7/perks"
packages = ["quantile"]
@@ -138,14 +146,6 @@
version = "v1.4.0"
[[projects]]
- digest = "1:a5158647b553c61877aa9ae74f4015000294e47981e6b8b07525edcbb0747c81"
- name = "github.com/uber-go/atomic"
- packages = ["."]
- pruneopts = "UT"
- revision = "df976f2515e274675050de7b3f42545de80594fd"
- version = "v1.4.0"
-
-[[projects]]
digest = "1:0ec60ffd594af00ba1660bc746aa0e443d27dd4003dee55f9d08a0b4ff5431a3"
name = "github.com/uber/jaeger-lib"
packages = [
@@ -158,23 +158,31 @@
version = "v2.2.0"
[[projects]]
- digest = "1:a5158647b553c61877aa9ae74f4015000294e47981e6b8b07525edcbb0747c81"
+ digest = "1:0bdcb0c740d79d400bd3f7946ac22a715c94db62b20bfd2e01cd50693aba0600"
name = "go.uber.org/atomic"
packages = ["."]
pruneopts = "UT"
- revision = "df976f2515e274675050de7b3f42545de80594fd"
- version = "v1.4.0"
+ revision = "9dc4df04d0d1c39369750a9f6c32c39560672089"
+ version = "v1.5.0"
[[projects]]
- digest = "1:60bf2a5e347af463c42ed31a493d817f8a72f102543060ed992754e689805d1a"
+ digest = "1:002ebc50f3ef475ac325e1904be931d9dcba6dc6d73b5682afce0c63436e3902"
name = "go.uber.org/multierr"
packages = ["."]
pruneopts = "UT"
- revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a"
- version = "v1.1.0"
+ revision = "c3fc3d02ec864719d8e25be2d7dde1e35a36aa27"
+ version = "v1.3.0"
+
+[[projects]]
+ branch = "master"
+ digest = "1:3032e90a153750ea149f68bf081f97ca738f041fba45c41c80737f572ffdf2f4"
+ name = "go.uber.org/tools"
+ packages = ["update-license"]
+ pruneopts = "UT"
+ revision = "2cfd321de3ee5d5f8a5fda2521d1703478334d98"
[[projects]]
- digest = "1:676160e6a4722b08e0e26b11521d575c2cb2b6f0c679e1ee6178c5d8dee51e5e"
+ digest = "1:6be13632ab4bd5842a097abb3aabac045a8601e19a10da4239e7d8bd83d4b83c"
name = "go.uber.org/zap"
packages = [
".",
@@ -185,8 +193,19 @@
"zapcore",
]
pruneopts = "UT"
- revision = "27376062155ad36be76b0f12cf1572a221d3a48c"
- version = "v1.10.0"
+ revision = "a6015e13fab9b744d96085308ce4e8f11bad1996"
+ version = "v1.12.0"
+
+[[projects]]
+ branch = "master"
+ digest = "1:21d7bad9b7da270fd2d50aba8971a041bd691165c95096a2a4c68db823cbc86a"
+ name = "golang.org/x/lint"
+ packages = [
+ ".",
+ "golint",
+ ]
+ pruneopts = "UT"
+ revision = "16217165b5de779cb6a5e4fc81fa9c1166fda457"
[[projects]]
branch = "master"
@@ -197,23 +216,81 @@
"context/ctxhttp",
]
pruneopts = "UT"
- revision = "aa69164e4478b84860dc6769c710c699c67058a3"
+ revision = "0deb6923b6d97481cb43bc1043fe5b72a0143032"
[[projects]]
branch = "master"
- digest = "1:712252802d318c8107d8f2136b99aa10feb17eca715245ed915199fbfc260155"
+ digest = "1:5dfb17d45415b7b8927382f53955a66f55f9d9d11557aa82f7f481d642ab247a"
name = "golang.org/x/sys"
packages = ["windows"]
pruneopts = "UT"
- revision = "0a153f010e6963173baba2306531d173aa843137"
+ revision = "f43be2a4598cf3a47be9f94f0c28197ed9eae611"
+
+[[projects]]
+ branch = "master"
+ digest = "1:bae8b3bf837d9d7f601776f37f44e031d46943677beff8fb2eb9c7317d44de2f"
+ name = "golang.org/x/tools"
+ packages = [
+ "go/analysis",
+ "go/analysis/passes/inspect",
+ "go/ast/astutil",
+ "go/ast/inspector",
+ "go/buildutil",
+ "go/gcexportdata",
+ "go/internal/gcimporter",
+ "go/internal/packagesdriver",
+ "go/packages",
+ "go/types/objectpath",
+ "go/types/typeutil",
+ "internal/fastwalk",
+ "internal/gopathwalk",
+ "internal/semver",
+ "internal/span",
+ ]
+ pruneopts = "UT"
+ revision = "8dbcdeb83d3faec5315146800b375c4962a42fc6"
[[projects]]
- digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96"
+ digest = "1:59f10c1537d2199d9115d946927fe31165959a95190849c82ff11e05803528b0"
name = "gopkg.in/yaml.v2"
packages = ["."]
pruneopts = "UT"
- revision = "51d6538a90f86fe93ac480b35f37b2be17fef232"
- version = "v2.2.2"
+ revision = "f221b8435cfb71e54062f6c6e99e9ade30b124d5"
+ version = "v2.2.4"
+
+[[projects]]
+ digest = "1:131158a88aad1f94854d0aa21a64af2802d0a470fb0f01cb33c04fafd2047111"
+ name = "honnef.co/go/tools"
+ packages = [
+ "arg",
+ "cmd/staticcheck",
+ "config",
+ "deprecated",
+ "facts",
+ "functions",
+ "go/types/typeutil",
+ "internal/cache",
+ "internal/passes/buildssa",
+ "internal/renameio",
+ "internal/sharedcheck",
+ "lint",
+ "lint/lintdsl",
+ "lint/lintutil",
+ "lint/lintutil/format",
+ "loader",
+ "printf",
+ "simple",
+ "ssa",
+ "ssautil",
+ "staticcheck",
+ "staticcheck/vrp",
+ "stylecheck",
+ "unused",
+ "version",
+ ]
+ pruneopts = "UT"
+ revision = "afd67930eec2a9ed3e9b19f684d17a062285f16a"
+ version = "2019.2.3"
[solve-meta]
analyzer-name = "dep"
@@ -229,10 +306,10 @@
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
"github.com/stretchr/testify/suite",
- "github.com/uber-go/atomic",
"github.com/uber/jaeger-lib/metrics",
"github.com/uber/jaeger-lib/metrics/metricstest",
"github.com/uber/jaeger-lib/metrics/prometheus",
+ "go.uber.org/atomic",
"go.uber.org/zap",
"go.uber.org/zap/zapcore",
]
diff --git a/vendor/github.com/uber/jaeger-client-go/Gopkg.toml b/vendor/github.com/uber/jaeger-client-go/Gopkg.toml
index 3e6ac35ae..1fed7f814 100644
--- a/vendor/github.com/uber/jaeger-client-go/Gopkg.toml
+++ b/vendor/github.com/uber/jaeger-client-go/Gopkg.toml
@@ -15,7 +15,7 @@
version = "^1.1.3"
[[constraint]]
- name = "github.com/uber-go/atomic"
+ name = "go.uber.org/atomic"
version = "^1"
[[constraint]]
diff --git a/vendor/github.com/uber/jaeger-client-go/Makefile b/vendor/github.com/uber/jaeger-client-go/Makefile
index 74e11787a..0cfe6a5f6 100644
--- a/vendor/github.com/uber/jaeger-client-go/Makefile
+++ b/vendor/github.com/uber/jaeger-client-go/Makefile
@@ -1,5 +1,5 @@
PROJECT_ROOT=github.com/uber/jaeger-client-go
-PACKAGES := $(shell go list ./... | awk -F/ 'NR>1 {print "./"$$4"/..."}' | grep -v -e ./thrift-gen/... -e ./thrift/... | sort -u)
+PACKAGES := . $(shell go list ./... | awk -F/ 'NR>1 {print "./"$$4"/..."}' | grep -v -e ./thrift-gen/... -e ./thrift/... | sort -u)
# all .go files that don't exist in hidden directories
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen -e ./thrift/ \
-e ".*/\..*" \
@@ -125,3 +125,4 @@ ifeq ($(CI_SKIP_LINT),true)
else
make lint
endif
+
diff --git a/vendor/github.com/uber/jaeger-client-go/README.md b/vendor/github.com/uber/jaeger-client-go/README.md
index 604d4b571..a3366114d 100644
--- a/vendor/github.com/uber/jaeger-client-go/README.md
+++ b/vendor/github.com/uber/jaeger-client-go/README.md
@@ -182,6 +182,29 @@ are available:
1. `RateLimitingSampler` can be used to allow only a certain fixed
number of traces to be sampled per second.
+#### Delayed sampling
+
+Version 2.20 introduced the ability to delay sampling decisions in the life cycle
+of the root span. It involves several features and architectural changes:
+ * **Shared sampling state**: the sampling state is shared across all local
+ (i.e. in-process) spans for a given trace.
+ * **New `SamplerV2` API** allows the sampler to be called at multiple points
+ in the life cycle of a span:
+ * on span creation
+ * on overwriting span operation name
+ * on setting span tags
+ * on finishing the span
+ * **Final/non-final sampling state**: the new `SamplerV2` API allows the sampler
+ to indicate if the negative sampling decision is final or not (positive sampling
+ decisions are always final). If the decision is not final, the sampler will be
+ called again on further span life cycle events, like setting tags.
+
+These new features are used in the experimental `x.TagMatchingSampler`, which
+can sample a trace based on a certain tag added to the root
+span or one of its local (in-process) children. The sampler can be used with
+another experimental `x.PrioritySampler` that allows multiple samplers to try
+to make a sampling decision, in a certain priority order.
+
### Baggage Injection
The OpenTracing spec allows for [baggage][baggage], which are key value pairs that are added
diff --git a/vendor/github.com/uber/jaeger-client-go/config/config.go b/vendor/github.com/uber/jaeger-client-go/config/config.go
index 6bce1b3b0..965f7c3ee 100644
--- a/vendor/github.com/uber/jaeger-client-go/config/config.go
+++ b/vendor/github.com/uber/jaeger-client-go/config/config.go
@@ -86,6 +86,9 @@ type SamplerConfig struct {
// jaeger-agent for the appropriate sampling strategy.
// Can be set by exporting an environment variable named JAEGER_SAMPLER_REFRESH_INTERVAL
SamplingRefreshInterval time.Duration `yaml:"samplingRefreshInterval"`
+
+ // Options can be used to programmatically pass additional options to the Remote sampler.
+ Options []jaeger.SamplerOption
}
// ReporterConfig configures the reporter. All fields are optional.
@@ -357,6 +360,7 @@ func (sc *SamplerConfig) NewSampler(
if sc.SamplingRefreshInterval != 0 {
options = append(options, jaeger.SamplerOptions.SamplingRefreshInterval(sc.SamplingRefreshInterval))
}
+ options = append(options, sc.Options...)
return jaeger.NewRemotelyControlledSampler(serviceName, options...), nil
}
return nil, fmt.Errorf("Unknown sampler type %v", sc.Type)
diff --git a/vendor/github.com/uber/jaeger-client-go/config/config_env.go b/vendor/github.com/uber/jaeger-client-go/config/config_env.go
index 14d69b11d..a729bd8fe 100644
--- a/vendor/github.com/uber/jaeger-client-go/config/config_env.go
+++ b/vendor/github.com/uber/jaeger-client-go/config/config_env.go
@@ -52,7 +52,11 @@ const (
// FromEnv uses environment variables to set the tracer's Configuration
func FromEnv() (*Configuration, error) {
c := &Configuration{}
+ return c.FromEnv()
+}
+// FromEnv uses environment variables and overrides existing tracer's Configuration
+func (c *Configuration) FromEnv() (*Configuration, error) {
if e := os.Getenv(envServiceName); e != "" {
c.ServiceName = e
}
@@ -77,13 +81,21 @@ func FromEnv() (*Configuration, error) {
c.Tags = parseTags(e)
}
- if s, err := samplerConfigFromEnv(); err == nil {
+ if c.Sampler == nil {
+ c.Sampler = &SamplerConfig{}
+ }
+
+ if s, err := c.Sampler.samplerConfigFromEnv(); err == nil {
c.Sampler = s
} else {
return nil, errors.Wrap(err, "cannot obtain sampler config from env")
}
- if r, err := reporterConfigFromEnv(); err == nil {
+ if c.Reporter == nil {
+ c.Reporter = &ReporterConfig{}
+ }
+
+ if r, err := c.Reporter.reporterConfigFromEnv(); err == nil {
c.Reporter = r
} else {
return nil, errors.Wrap(err, "cannot obtain reporter config from env")
@@ -93,9 +105,7 @@ func FromEnv() (*Configuration, error) {
}
// samplerConfigFromEnv creates a new SamplerConfig based on the environment variables
-func samplerConfigFromEnv() (*SamplerConfig, error) {
- sc := &SamplerConfig{}
-
+func (sc *SamplerConfig) samplerConfigFromEnv() (*SamplerConfig, error) {
if e := os.Getenv(envSamplerType); e != "" {
sc.Type = e
}
@@ -135,9 +145,7 @@ func samplerConfigFromEnv() (*SamplerConfig, error) {
}
// reporterConfigFromEnv creates a new ReporterConfig based on the environment variables
-func reporterConfigFromEnv() (*ReporterConfig, error) {
- rc := &ReporterConfig{}
-
+func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) {
if e := os.Getenv(envReporterMaxQueueSize); e != "" {
if value, err := strconv.ParseInt(e, 10, 0); err == nil {
rc.QueueSize = int(value)
diff --git a/vendor/github.com/uber/jaeger-client-go/constants.go b/vendor/github.com/uber/jaeger-client-go/constants.go
index e95b2ba09..0da47b02f 100644
--- a/vendor/github.com/uber/jaeger-client-go/constants.go
+++ b/vendor/github.com/uber/jaeger-client-go/constants.go
@@ -22,7 +22,7 @@ import (
const (
// JaegerClientVersion is the version of the client library reported as Span tag.
- JaegerClientVersion = "Go-2.19.0"
+ JaegerClientVersion = "Go-2.20.0"
// JaegerClientVersionTagKey is the name of the tag used to report client version.
JaegerClientVersionTagKey = "jaeger.version"
diff --git a/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go b/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
index 6ce1caf87..f0f1afe2f 100644
--- a/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
+++ b/vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
@@ -35,7 +35,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
SpanId: int64(span.context.spanID),
ParentSpanId: int64(span.context.parentID),
OperationName: span.operationName,
- Flags: int32(span.context.flags),
+ Flags: int32(span.context.samplingState.flags()),
StartTime: startTime,
Duration: duration,
Tags: buildTags(span.tags, span.tracer.options.maxTagValueLength),
diff --git a/vendor/github.com/uber/jaeger-client-go/metrics.go b/vendor/github.com/uber/jaeger-client-go/metrics.go
index e56db9b73..50e4e22d6 100644
--- a/vendor/github.com/uber/jaeger-client-go/metrics.go
+++ b/vendor/github.com/uber/jaeger-client-go/metrics.go
@@ -26,6 +26,9 @@ type Metrics struct {
// Number of traces started by this tracer as not sampled
TracesStartedNotSampled metrics.Counter `metric:"traces" tags:"state=started,sampled=n" help:"Number of traces started by this tracer as not sampled"`
+ // Number of traces started by this tracer with delayed sampling
+ TracesStartedDelayedSampling metrics.Counter `metric:"traces" tags:"state=started,sampled=n" help:"Number of traces started by this tracer with delayed sampling"`
+
// Number of externally started sampled traces this tracer joined
TracesJoinedSampled metrics.Counter `metric:"traces" tags:"state=joined,sampled=y" help:"Number of externally started sampled traces this tracer joined"`
@@ -33,13 +36,22 @@ type Metrics struct {
TracesJoinedNotSampled metrics.Counter `metric:"traces" tags:"state=joined,sampled=n" help:"Number of externally started not-sampled traces this tracer joined"`
// Number of sampled spans started by this tracer
- SpansStartedSampled metrics.Counter `metric:"started_spans" tags:"sampled=y" help:"Number of sampled spans started by this tracer"`
+ SpansStartedSampled metrics.Counter `metric:"started_spans" tags:"sampled=y" help:"Number of spans started by this tracer as sampled"`
+
+ // Number of not sampled spans started by this tracer
+ SpansStartedNotSampled metrics.Counter `metric:"started_spans" tags:"sampled=n" help:"Number of spans started by this tracer as not sampled"`
+
+ // Number of spans with delayed sampling started by this tracer
+ SpansStartedDelayedSampling metrics.Counter `metric:"started_spans" tags:"sampled=delayed" help:"Number of spans started by this tracer with delayed sampling"`
- // Number of unsampled spans started by this tracer
- SpansStartedNotSampled metrics.Counter `metric:"started_spans" tags:"sampled=n" help:"Number of unsampled spans started by this tracer"`
+ // Number of spans finished by this tracer
+ SpansFinishedSampled metrics.Counter `metric:"finished_spans" tags:"sampled=y" help:"Number of sampled spans finished by this tracer"`
+
+ // Number of spans finished by this tracer
+ SpansFinishedNotSampled metrics.Counter `metric:"finished_spans" tags:"sampled=n" help:"Number of not-sampled spans finished by this tracer"`
// Number of spans finished by this tracer
- SpansFinished metrics.Counter `metric:"finished_spans" help:"Number of spans finished by this tracer"`
+ SpansFinishedDelayedSampling metrics.Counter `metric:"finished_spans" tags:"sampled=delayed" help:"Number of spans with delayed sampling finished by this tracer"`
// Number of errors decoding tracing context
DecodingErrors metrics.Counter `metric:"span_context_decoding_errors" help:"Number of errors decoding tracing context"`
diff --git a/vendor/github.com/uber/jaeger-client-go/propagation.go b/vendor/github.com/uber/jaeger-client-go/propagation.go
index 5b50cfb71..42fd64b58 100644
--- a/vendor/github.com/uber/jaeger-client-go/propagation.go
+++ b/vendor/github.com/uber/jaeger-client-go/propagation.go
@@ -193,7 +193,7 @@ func (p *BinaryPropagator) Inject(
if err := binary.Write(carrier, binary.BigEndian, sc.parentID); err != nil {
return err
}
- if err := binary.Write(carrier, binary.BigEndian, sc.flags); err != nil {
+ if err := binary.Write(carrier, binary.BigEndian, sc.samplingState.flags()); err != nil {
return err
}
@@ -222,6 +222,7 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
return emptyContext, opentracing.ErrInvalidCarrier
}
var ctx SpanContext
+ ctx.samplingState = &samplingState{}
if err := binary.Read(carrier, binary.BigEndian, &ctx.traceID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
@@ -232,9 +233,12 @@ func (p *BinaryPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
if err := binary.Read(carrier, binary.BigEndian, &ctx.parentID); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
- if err := binary.Read(carrier, binary.BigEndian, &ctx.flags); err != nil {
+
+ var flags byte
+ if err := binary.Read(carrier, binary.BigEndian, &flags); err != nil {
return emptyContext, opentracing.ErrSpanContextCorrupted
}
+ ctx.samplingState.setFlags(flags)
// Handle the baggage items
var numBaggage int32
diff --git a/vendor/github.com/uber/jaeger-client-go/reporter.go b/vendor/github.com/uber/jaeger-client-go/reporter.go
index 27163ebe4..0b78cec20 100644
--- a/vendor/github.com/uber/jaeger-client-go/reporter.go
+++ b/vendor/github.com/uber/jaeger-client-go/reporter.go
@@ -28,6 +28,8 @@ import (
// Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
type Reporter interface {
// Report submits a new span to collectors, possibly asynchronously and/or with buffering.
+ // If the reporter is processing Span asynchronously then it needs to Retain() the span,
+ // and then Release() it when no longer needed, to avoid span data corruption.
Report(span *Span)
// Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
diff --git a/vendor/github.com/uber/jaeger-client-go/sampler.go b/vendor/github.com/uber/jaeger-client-go/sampler.go
index ea6984e02..6195d59c5 100644
--- a/vendor/github.com/uber/jaeger-client-go/sampler.go
+++ b/vendor/github.com/uber/jaeger-client-go/sampler.go
@@ -17,19 +17,14 @@ package jaeger
import (
"fmt"
"math"
- "net/url"
"sync"
- "sync/atomic"
- "time"
- "github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift-gen/sampling"
"github.com/uber/jaeger-client-go/utils"
)
const (
- defaultSamplingRefreshInterval = time.Minute
- defaultMaxOperations = 2000
+ defaultMaxOperations = 2000
)
// Sampler decides whether a new trace should be sampled or not.
@@ -47,9 +42,7 @@ type Sampler interface {
// Equal checks if the `other` sampler is functionally equivalent
// to this sampler.
- // TODO remove this function. This function is used to determine if 2 samplers are equivalent
- // which does not bode well with the adaptive sampler which has to create all the composite samplers
- // for the comparison to occur. This is expensive to do if only one sampler has changed.
+ // TODO (breaking change) remove this function. See PerOperationSampler.Equals for explanation.
Equal(other Sampler) bool
}
@@ -57,17 +50,23 @@ type Sampler interface {
// ConstSampler is a sampler that always makes the same decision.
type ConstSampler struct {
+ legacySamplerV1Base
Decision bool
tags []Tag
}
// NewConstSampler creates a ConstSampler.
-func NewConstSampler(sample bool) Sampler {
+func NewConstSampler(sample bool) *ConstSampler {
tags := []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeConst},
{key: SamplerParamTagKey, value: sample},
}
- return &ConstSampler{Decision: sample, tags: tags}
+ s := &ConstSampler{
+ Decision: sample,
+ tags: tags,
+ }
+ s.delegate = s.IsSampled
+ return s
}
// IsSampled implements IsSampled() of Sampler.
@@ -88,11 +87,17 @@ func (s *ConstSampler) Equal(other Sampler) bool {
return false
}
+// String is used to log sampler details.
+func (s *ConstSampler) String() string {
+ return fmt.Sprintf("ConstSampler(decision=%t)", s.Decision)
+}
+
// -----------------------
// ProbabilisticSampler is a sampler that randomly samples a certain percentage
// of traces.
type ProbabilisticSampler struct {
+ legacySamplerV1Base
samplingRate float64
samplingBoundary uint64
tags []Tag
@@ -114,16 +119,19 @@ func NewProbabilisticSampler(samplingRate float64) (*ProbabilisticSampler, error
}
func newProbabilisticSampler(samplingRate float64) *ProbabilisticSampler {
- samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
- tags := []Tag{
+ s := new(ProbabilisticSampler)
+ s.delegate = s.IsSampled
+ return s.init(samplingRate)
+}
+
+func (s *ProbabilisticSampler) init(samplingRate float64) *ProbabilisticSampler {
+ s.samplingRate = math.Max(0.0, math.Min(samplingRate, 1.0))
+ s.samplingBoundary = uint64(float64(maxRandomNumber) * s.samplingRate)
+ s.tags = []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeProbabilistic},
- {key: SamplerParamTagKey, value: samplingRate},
- }
- return &ProbabilisticSampler{
- samplingRate: samplingRate,
- samplingBoundary: uint64(float64(maxRandomNumber) * samplingRate),
- tags: tags,
+ {key: SamplerParamTagKey, value: s.samplingRate},
}
+ return s
}
// SamplingRate returns the sampling probability this sampled was constructed with.
@@ -149,65 +157,104 @@ func (s *ProbabilisticSampler) Equal(other Sampler) bool {
return false
}
+// Update modifies in-place the sampling rate. Locking must be done externally.
+func (s *ProbabilisticSampler) Update(samplingRate float64) error {
+ if samplingRate < 0.0 || samplingRate > 1.0 {
+ return fmt.Errorf("Sampling Rate must be between 0.0 and 1.0, received %f", samplingRate)
+ }
+ s.init(samplingRate)
+ return nil
+}
+
+// String is used to log sampler details.
+func (s *ProbabilisticSampler) String() string {
+ return fmt.Sprintf("ProbabilisticSampler(samplingRate=%v)", s.samplingRate)
+}
+
// -----------------------
-type rateLimitingSampler struct {
+// RateLimitingSampler samples at most maxTracesPerSecond. The distribution of sampled traces follows
+// burstiness of the service, i.e. a service with uniformly distributed requests will have those
+// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a
+// number of sequential requests can be sampled each second.
+type RateLimitingSampler struct {
+ legacySamplerV1Base
maxTracesPerSecond float64
- rateLimiter utils.RateLimiter
+ rateLimiter *utils.ReconfigurableRateLimiter
tags []Tag
}
-// NewRateLimitingSampler creates a sampler that samples at most maxTracesPerSecond. The distribution of sampled
-// traces follows burstiness of the service, i.e. a service with uniformly distributed requests will have those
-// requests sampled uniformly as well, but if requests are bursty, especially sub-second, then a number of
-// sequential requests can be sampled each second.
-func NewRateLimitingSampler(maxTracesPerSecond float64) Sampler {
- tags := []Tag{
+// NewRateLimitingSampler creates new RateLimitingSampler.
+func NewRateLimitingSampler(maxTracesPerSecond float64) *RateLimitingSampler {
+ s := new(RateLimitingSampler)
+ s.delegate = s.IsSampled
+ return s.init(maxTracesPerSecond)
+}
+
+func (s *RateLimitingSampler) init(maxTracesPerSecond float64) *RateLimitingSampler {
+ if s.rateLimiter == nil {
+ s.rateLimiter = utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
+ } else {
+ s.rateLimiter.Update(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0))
+ }
+ s.maxTracesPerSecond = maxTracesPerSecond
+ s.tags = []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeRateLimiting},
{key: SamplerParamTagKey, value: maxTracesPerSecond},
}
- return &rateLimitingSampler{
- maxTracesPerSecond: maxTracesPerSecond,
- rateLimiter: utils.NewRateLimiter(maxTracesPerSecond, math.Max(maxTracesPerSecond, 1.0)),
- tags: tags,
- }
+ return s
}
// IsSampled implements IsSampled() of Sampler.
-func (s *rateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
+func (s *RateLimitingSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
return s.rateLimiter.CheckCredit(1.0), s.tags
}
-func (s *rateLimitingSampler) Close() {
+// Update reconfigures the rate limiter, while preserving its accumulated balance.
+// Locking must be done externally.
+func (s *RateLimitingSampler) Update(maxTracesPerSecond float64) {
+ if s.maxTracesPerSecond != maxTracesPerSecond {
+ s.init(maxTracesPerSecond)
+ }
+}
+
+// Close does nothing.
+func (s *RateLimitingSampler) Close() {
// nothing to do
}
-func (s *rateLimitingSampler) Equal(other Sampler) bool {
- if o, ok := other.(*rateLimitingSampler); ok {
+// Equal compares with another sampler.
+func (s *RateLimitingSampler) Equal(other Sampler) bool {
+ if o, ok := other.(*RateLimitingSampler); ok {
return s.maxTracesPerSecond == o.maxTracesPerSecond
}
return false
}
+// String is used to log sampler details.
+func (s *RateLimitingSampler) String() string {
+ return fmt.Sprintf("RateLimitingSampler(maxTracesPerSecond=%v)", s.maxTracesPerSecond)
+}
+
// -----------------------
-// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both probabilisticSampler and
-// rateLimitingSampler. The rateLimitingSampler is used as a guaranteed lower bound sampler such that
+// GuaranteedThroughputProbabilisticSampler is a sampler that leverages both ProbabilisticSampler and
+// RateLimitingSampler. The RateLimitingSampler is used as a guaranteed lower bound sampler such that
// every operation is sampled at least once in a time interval defined by the lowerBound. ie a lowerBound
// of 1.0 / (60 * 10) will sample an operation at least once every 10 minutes.
//
-// The probabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both
-// samplers return true, the tags for probabilisticSampler will be used.
+// The ProbabilisticSampler is given higher priority when tags are emitted, ie. if IsSampled() for both
+// samplers return true, the tags for ProbabilisticSampler will be used.
type GuaranteedThroughputProbabilisticSampler struct {
probabilisticSampler *ProbabilisticSampler
- lowerBoundSampler Sampler
+ lowerBoundSampler *RateLimitingSampler
tags []Tag
samplingRate float64
lowerBound float64
}
// NewGuaranteedThroughputProbabilisticSampler returns a delegating sampler that applies both
-// probabilisticSampler and rateLimitingSampler.
+// ProbabilisticSampler and RateLimitingSampler.
func NewGuaranteedThroughputProbabilisticSampler(
lowerBound, samplingRate float64,
) (*GuaranteedThroughputProbabilisticSampler, error) {
@@ -224,8 +271,14 @@ func newGuaranteedThroughputProbabilisticSampler(lowerBound, samplingRate float6
}
func (s *GuaranteedThroughputProbabilisticSampler) setProbabilisticSampler(samplingRate float64) {
- if s.probabilisticSampler == nil || s.samplingRate != samplingRate {
+ if s.probabilisticSampler == nil {
s.probabilisticSampler = newProbabilisticSampler(samplingRate)
+ } else if s.samplingRate != samplingRate {
+ s.probabilisticSampler.init(samplingRate)
+ }
+ // since we don't validate samplingRate, sampler may have clamped it to [0, 1] interval
+ samplingRate = s.probabilisticSampler.SamplingRate()
+ if s.samplingRate != samplingRate || s.tags == nil {
s.samplingRate = s.probabilisticSampler.SamplingRate()
s.tags = []Tag{
{key: SamplerTypeTagKey, value: SamplerTypeLowerBound},
@@ -252,7 +305,7 @@ func (s *GuaranteedThroughputProbabilisticSampler) Close() {
// Equal implements Equal() of Sampler.
func (s *GuaranteedThroughputProbabilisticSampler) Equal(other Sampler) bool {
- // NB The Equal() function is expensive and will be removed. See adaptiveSampler.Equal() for
+ // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for
// more information.
return false
}
@@ -261,52 +314,116 @@ func (s *GuaranteedThroughputProbabilisticSampler) Equal(other Sampler) bool {
func (s *GuaranteedThroughputProbabilisticSampler) update(lowerBound, samplingRate float64) {
s.setProbabilisticSampler(samplingRate)
if s.lowerBound != lowerBound {
- s.lowerBoundSampler = NewRateLimitingSampler(lowerBound)
+ s.lowerBoundSampler.Update(lowerBound)
s.lowerBound = lowerBound
}
}
// -----------------------
-type adaptiveSampler struct {
+// PerOperationSampler is a delegating sampler that applies GuaranteedThroughputProbabilisticSampler
+// on a per-operation basis.
+type PerOperationSampler struct {
sync.RWMutex
samplers map[string]*GuaranteedThroughputProbabilisticSampler
defaultSampler *ProbabilisticSampler
lowerBound float64
maxOperations int
+
+ // see description in PerOperationSamplerParams
+ operationNameLateBinding bool
}
-// NewAdaptiveSampler returns a delegating sampler that applies both probabilisticSampler and
-// rateLimitingSampler via the guaranteedThroughputProbabilisticSampler. This sampler keeps track of all
-// operations and delegates calls to the respective guaranteedThroughputProbabilisticSampler.
-func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (Sampler, error) {
- return newAdaptiveSampler(strategies, maxOperations), nil
+// NewAdaptiveSampler returns a new PerOperationSampler.
+// Deprecated: please use NewPerOperationSampler.
+func NewAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) (*PerOperationSampler, error) {
+ return NewPerOperationSampler(PerOperationSamplerParams{
+ MaxOperations: maxOperations,
+ Strategies: strategies,
+ }), nil
}
-func newAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies, maxOperations int) Sampler {
+// PerOperationSamplerParams defines parameters when creating PerOperationSampler.
+type PerOperationSamplerParams struct {
+ // Max number of operations that will be tracked. Other operations will be given default strategy.
+ MaxOperations int
+
+ // Opt-in feature for applications that require late binding of span name via explicit call to SetOperationName.
+ // When this feature is enabled, the sampler will return retryable=true from OnCreateSpan(), thus leaving
+ // the sampling decision as non-final (and the span as writeable). This may lead to degraded performance
+ // in applications that always provide the correct span name on trace creation.
+ //
+ // For backwards compatibility this option is off by default.
+ OperationNameLateBinding bool
+
+ // Initial configuration of the sampling strategies (usually retrieved from the backend by Remote Sampler).
+ Strategies *sampling.PerOperationSamplingStrategies
+}
+
+// NewPerOperationSampler returns a new PerOperationSampler.
+func NewPerOperationSampler(params PerOperationSamplerParams) *PerOperationSampler {
samplers := make(map[string]*GuaranteedThroughputProbabilisticSampler)
- for _, strategy := range strategies.PerOperationStrategies {
+ for _, strategy := range params.Strategies.PerOperationStrategies {
sampler := newGuaranteedThroughputProbabilisticSampler(
- strategies.DefaultLowerBoundTracesPerSecond,
+ params.Strategies.DefaultLowerBoundTracesPerSecond,
strategy.ProbabilisticSampling.SamplingRate,
)
samplers[strategy.Operation] = sampler
}
- return &adaptiveSampler{
- samplers: samplers,
- defaultSampler: newProbabilisticSampler(strategies.DefaultSamplingProbability),
- lowerBound: strategies.DefaultLowerBoundTracesPerSecond,
- maxOperations: maxOperations,
+ return &PerOperationSampler{
+ samplers: samplers,
+ defaultSampler: newProbabilisticSampler(params.Strategies.DefaultSamplingProbability),
+ lowerBound: params.Strategies.DefaultLowerBoundTracesPerSecond,
+ maxOperations: params.MaxOperations,
+ operationNameLateBinding: params.OperationNameLateBinding,
}
}
-func (s *adaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
+// IsSampled is not used and only exists to match Sampler V1 API.
+// TODO (breaking change) remove when upgrading everything to SamplerV2
+func (s *PerOperationSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
+ return false, nil
+}
+
+func (s *PerOperationSampler) trySampling(span *Span, operationName string) (bool, []Tag) {
+ samplerV1 := s.getSamplerForOperation(operationName)
+ var sampled bool
+ var tags []Tag
+ if span.context.samplingState.isLocalRootSpan(span.context.spanID) {
+ sampled, tags = samplerV1.IsSampled(span.context.TraceID(), operationName)
+ }
+ return sampled, tags
+}
+
+// OnCreateSpan implements OnCreateSpan of SamplerV2.
+func (s *PerOperationSampler) OnCreateSpan(span *Span) SamplingDecision {
+ sampled, tags := s.trySampling(span, span.OperationName())
+ return SamplingDecision{Sample: sampled, Retryable: s.operationNameLateBinding, Tags: tags}
+}
+
+// OnSetOperationName implements OnSetOperationName of SamplerV2.
+func (s *PerOperationSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
+ sampled, tags := s.trySampling(span, operationName)
+ return SamplingDecision{Sample: sampled, Retryable: false, Tags: tags}
+}
+
+// OnSetTag implements OnSetTag of SamplerV2.
+func (s *PerOperationSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
+ return SamplingDecision{Sample: false, Retryable: true}
+}
+
+// OnFinishSpan implements OnFinishSpan of SamplerV2.
+func (s *PerOperationSampler) OnFinishSpan(span *Span) SamplingDecision {
+ return SamplingDecision{Sample: false, Retryable: true}
+}
+
+func (s *PerOperationSampler) getSamplerForOperation(operation string) Sampler {
s.RLock()
sampler, ok := s.samplers[operation]
if ok {
defer s.RUnlock()
- return sampler.IsSampled(id, operation)
+ return sampler
}
s.RUnlock()
s.Lock()
@@ -315,18 +432,19 @@ func (s *adaptiveSampler) IsSampled(id TraceID, operation string) (bool, []Tag)
// Check if sampler has already been created
sampler, ok = s.samplers[operation]
if ok {
- return sampler.IsSampled(id, operation)
+ return sampler
}
// Store only up to maxOperations of unique ops.
if len(s.samplers) >= s.maxOperations {
- return s.defaultSampler.IsSampled(id, operation)
+ return s.defaultSampler
}
newSampler := newGuaranteedThroughputProbabilisticSampler(s.lowerBound, s.defaultSampler.SamplingRate())
s.samplers[operation] = newSampler
- return newSampler.IsSampled(id, operation)
+ return newSampler
}
-func (s *adaptiveSampler) Close() {
+// Close invokes Close on all underlying samplers.
+func (s *PerOperationSampler) Close() {
s.Lock()
defer s.Unlock()
for _, sampler := range s.samplers {
@@ -335,16 +453,18 @@ func (s *adaptiveSampler) Close() {
s.defaultSampler.Close()
}
-func (s *adaptiveSampler) Equal(other Sampler) bool {
- // NB The Equal() function is overly expensive for adaptiveSampler since it's composed of multiple
+// Equal is not used.
+// TODO (breaking change) remove this in the future
+func (s *PerOperationSampler) Equal(other Sampler) bool {
+ // NB The Equal() function is overly expensive for PerOperationSampler since it's composed of multiple
// samplers which all need to be initialized before this function can be called for a comparison.
- // Therefore, adaptiveSampler uses the update() function to only alter the samplers that need
+ // Therefore, PerOperationSampler uses the update() function to only alter the samplers that need
// changing. Hence this function always returns false so that the update function can be called.
// Once the Equal() function is removed from the Sampler API, this will no longer be needed.
return false
}
-func (s *adaptiveSampler) update(strategies *sampling.PerOperationSamplingStrategies) {
+func (s *PerOperationSampler) update(strategies *sampling.PerOperationSamplingStrategies) {
s.Lock()
defer s.Unlock()
newSamplers := map[string]*GuaranteedThroughputProbabilisticSampler{}
@@ -369,191 +489,3 @@ func (s *adaptiveSampler) update(strategies *sampling.PerOperationSamplingStrate
}
s.samplers = newSamplers
}
-
-// -----------------------
-
-// RemotelyControlledSampler is a delegating sampler that polls a remote server
-// for the appropriate sampling strategy, constructs a corresponding sampler and
-// delegates to it for sampling decisions.
-type RemotelyControlledSampler struct {
- // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
- // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
- closed int64 // 0 - not closed, 1 - closed
-
- sync.RWMutex
- samplerOptions
-
- serviceName string
- manager sampling.SamplingManager
- doneChan chan *sync.WaitGroup
-}
-
-type httpSamplingManager struct {
- serverURL string
-}
-
-func (s *httpSamplingManager) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
- var out sampling.SamplingStrategyResponse
- v := url.Values{}
- v.Set("service", serviceName)
- if err := utils.GetJSON(s.serverURL+"?"+v.Encode(), &out); err != nil {
- return nil, err
- }
- return &out, nil
-}
-
-// NewRemotelyControlledSampler creates a sampler that periodically pulls
-// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).
-func NewRemotelyControlledSampler(
- serviceName string,
- opts ...SamplerOption,
-) *RemotelyControlledSampler {
- options := applySamplerOptions(opts...)
- sampler := &RemotelyControlledSampler{
- samplerOptions: options,
- serviceName: serviceName,
- manager: &httpSamplingManager{serverURL: options.samplingServerURL},
- doneChan: make(chan *sync.WaitGroup),
- }
- go sampler.pollController()
- return sampler
-}
-
-func applySamplerOptions(opts ...SamplerOption) samplerOptions {
- options := samplerOptions{}
- for _, option := range opts {
- option(&options)
- }
- if options.sampler == nil {
- options.sampler = newProbabilisticSampler(0.001)
- }
- if options.logger == nil {
- options.logger = log.NullLogger
- }
- if options.maxOperations <= 0 {
- options.maxOperations = defaultMaxOperations
- }
- if options.samplingServerURL == "" {
- options.samplingServerURL = DefaultSamplingServerURL
- }
- if options.metrics == nil {
- options.metrics = NewNullMetrics()
- }
- if options.samplingRefreshInterval <= 0 {
- options.samplingRefreshInterval = defaultSamplingRefreshInterval
- }
- return options
-}
-
-// IsSampled implements IsSampled() of Sampler.
-func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
- s.RLock()
- defer s.RUnlock()
- return s.sampler.IsSampled(id, operation)
-}
-
-// Close implements Close() of Sampler.
-func (s *RemotelyControlledSampler) Close() {
- if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
- s.logger.Error("Repeated attempt to close the sampler is ignored")
- return
- }
-
- var wg sync.WaitGroup
- wg.Add(1)
- s.doneChan <- &wg
- wg.Wait()
-}
-
-// Equal implements Equal() of Sampler.
-func (s *RemotelyControlledSampler) Equal(other Sampler) bool {
- // NB The Equal() function is expensive and will be removed. See adaptiveSampler.Equal() for
- // more information.
- if o, ok := other.(*RemotelyControlledSampler); ok {
- s.RLock()
- o.RLock()
- defer s.RUnlock()
- defer o.RUnlock()
- return s.sampler.Equal(o.sampler)
- }
- return false
-}
-
-func (s *RemotelyControlledSampler) pollController() {
- ticker := time.NewTicker(s.samplingRefreshInterval)
- defer ticker.Stop()
- s.pollControllerWithTicker(ticker)
-}
-
-func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) {
- for {
- select {
- case <-ticker.C:
- s.updateSampler()
- case wg := <-s.doneChan:
- wg.Done()
- return
- }
- }
-}
-
-func (s *RemotelyControlledSampler) getSampler() Sampler {
- s.Lock()
- defer s.Unlock()
- return s.sampler
-}
-
-func (s *RemotelyControlledSampler) setSampler(sampler Sampler) {
- s.Lock()
- defer s.Unlock()
- s.sampler = sampler
-}
-
-func (s *RemotelyControlledSampler) updateSampler() {
- res, err := s.manager.GetSamplingStrategy(s.serviceName)
- if err != nil {
- s.metrics.SamplerQueryFailure.Inc(1)
- s.logger.Infof("Unable to query sampling strategy: %v", err)
- return
- }
- s.Lock()
- defer s.Unlock()
-
- s.metrics.SamplerRetrieved.Inc(1)
- if strategies := res.GetOperationSampling(); strategies != nil {
- s.updateAdaptiveSampler(strategies)
- } else {
- err = s.updateRateLimitingOrProbabilisticSampler(res)
- }
- if err != nil {
- s.metrics.SamplerUpdateFailure.Inc(1)
- s.logger.Infof("Unable to handle sampling strategy response %+v. Got error: %v", res, err)
- return
- }
- s.metrics.SamplerUpdated.Inc(1)
-}
-
-// NB: this function should only be called while holding a Write lock
-func (s *RemotelyControlledSampler) updateAdaptiveSampler(strategies *sampling.PerOperationSamplingStrategies) {
- if adaptiveSampler, ok := s.sampler.(*adaptiveSampler); ok {
- adaptiveSampler.update(strategies)
- } else {
- s.sampler = newAdaptiveSampler(strategies, s.maxOperations)
- }
-}
-
-// NB: this function should only be called while holding a Write lock
-func (s *RemotelyControlledSampler) updateRateLimitingOrProbabilisticSampler(res *sampling.SamplingStrategyResponse) error {
- var newSampler Sampler
- if probabilistic := res.GetProbabilisticSampling(); probabilistic != nil {
- newSampler = newProbabilisticSampler(probabilistic.SamplingRate)
- } else if rateLimiting := res.GetRateLimitingSampling(); rateLimiting != nil {
- newSampler = NewRateLimitingSampler(float64(rateLimiting.MaxTracesPerSecond))
- } else {
- return fmt.Errorf("Unsupported sampling strategy type %v", res.GetStrategyType())
- }
- if !s.sampler.Equal(newSampler) {
- s.sampler = newSampler
- }
- return nil
-}
diff --git a/vendor/github.com/uber/jaeger-client-go/sampler_remote.go b/vendor/github.com/uber/jaeger-client-go/sampler_remote.go
new file mode 100644
index 000000000..9bd0c9822
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/sampler_remote.go
@@ -0,0 +1,334 @@
+// Copyright (c) 2017 Uber Technologies, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jaeger
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/uber/jaeger-client-go/thrift-gen/sampling"
+)
+
+const (
+ defaultSamplingRefreshInterval = time.Minute
+)
+
+// SamplingStrategyFetcher is used to fetch sampling strategy updates from remote server.
+type SamplingStrategyFetcher interface {
+ Fetch(service string) ([]byte, error)
+}
+
+// SamplingStrategyParser is used to parse sampling strategy updates. The output object
+// should be of the type that is recognized by the SamplerUpdaters.
+type SamplingStrategyParser interface {
+ Parse(response []byte) (interface{}, error)
+}
+
+// SamplerUpdater is used by RemotelyControlledSampler to apply sampling strategies,
+// retrieved from remote config server, to the current sampler. The updater can modify
+// the sampler in-place if sampler supports it, or create a new one.
+//
+// If the strategy does not contain configuration for the sampler in question,
+// updater must return modifiedSampler=nil to give other updaters a chance to inspect
+// the sampling strategy response.
+//
+// RemotelyControlledSampler invokes the updaters while holding a lock on the main sampler.
+type SamplerUpdater interface {
+ Update(sampler SamplerV2, strategy interface{}) (modified SamplerV2, err error)
+}
+
+// RemotelyControlledSampler is a delegating sampler that polls a remote server
+// for the appropriate sampling strategy, constructs a corresponding sampler and
+// delegates to it for sampling decisions.
+type RemotelyControlledSampler struct {
+ // These fields must be first in the struct because `sync/atomic` expects 64-bit alignment.
+ // Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
+ closed int64 // 0 - not closed, 1 - closed
+
+ sync.RWMutex
+ samplerOptions
+
+ serviceName string
+ doneChan chan *sync.WaitGroup
+}
+
+// NewRemotelyControlledSampler creates a sampler that periodically pulls
+// the sampling strategy from an HTTP sampling server (e.g. jaeger-agent).
+func NewRemotelyControlledSampler(
+ serviceName string,
+ opts ...SamplerOption,
+) *RemotelyControlledSampler {
+ options := new(samplerOptions).applyOptionsAndDefaults(opts...)
+ sampler := &RemotelyControlledSampler{
+ samplerOptions: *options,
+ serviceName: serviceName,
+ doneChan: make(chan *sync.WaitGroup),
+ }
+ go sampler.pollController()
+ return sampler
+}
+
+// IsSampled implements IsSampled() of Sampler.
+// TODO (breaking change) remove when Sampler V1 is removed
+func (s *RemotelyControlledSampler) IsSampled(id TraceID, operation string) (bool, []Tag) {
+ return false, nil
+}
+
+// OnCreateSpan implements OnCreateSpan of SamplerV2.
+func (s *RemotelyControlledSampler) OnCreateSpan(span *Span) SamplingDecision {
+ return s.sampler.OnCreateSpan(span)
+}
+
+// OnSetOperationName implements OnSetOperationName of SamplerV2.
+func (s *RemotelyControlledSampler) OnSetOperationName(span *Span, operationName string) SamplingDecision {
+ return s.sampler.OnSetOperationName(span, operationName)
+}
+
+// OnSetTag implements OnSetTag of SamplerV2.
+func (s *RemotelyControlledSampler) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
+ return s.sampler.OnSetTag(span, key, value)
+}
+
+// OnFinishSpan implements OnFinishSpan of SamplerV2.
+func (s *RemotelyControlledSampler) OnFinishSpan(span *Span) SamplingDecision {
+ return s.sampler.OnFinishSpan(span)
+}
+
+// Close implements Close() of Sampler.
+func (s *RemotelyControlledSampler) Close() {
+ if swapped := atomic.CompareAndSwapInt64(&s.closed, 0, 1); !swapped {
+ s.logger.Error("Repeated attempt to close the sampler is ignored")
+ return
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ s.doneChan <- &wg
+ wg.Wait()
+}
+
+// Equal implements Equal() of Sampler.
+func (s *RemotelyControlledSampler) Equal(other Sampler) bool {
+ // NB The Equal() function is expensive and will be removed. See PerOperationSampler.Equal() for
+ // more information.
+ return false
+}
+
+func (s *RemotelyControlledSampler) pollController() {
+ ticker := time.NewTicker(s.samplingRefreshInterval)
+ defer ticker.Stop()
+ s.pollControllerWithTicker(ticker)
+}
+
+func (s *RemotelyControlledSampler) pollControllerWithTicker(ticker *time.Ticker) {
+ for {
+ select {
+ case <-ticker.C:
+ s.UpdateSampler()
+ case wg := <-s.doneChan:
+ wg.Done()
+ return
+ }
+ }
+}
+
+// Sampler returns the currently active sampler.
+func (s *RemotelyControlledSampler) Sampler() SamplerV2 {
+ s.Lock()
+ defer s.Unlock()
+ return s.sampler
+}
+
+func (s *RemotelyControlledSampler) setSampler(sampler SamplerV2) {
+ s.Lock()
+ defer s.Unlock()
+ s.sampler = sampler
+}
+
+// UpdateSampler forces the sampler to fetch sampling strategy from backend server.
+// This function is called automatically on a timer, but can also be safely called manually, e.g. from tests.
+func (s *RemotelyControlledSampler) UpdateSampler() {
+ res, err := s.samplingFetcher.Fetch(s.serviceName)
+ if err != nil {
+ s.metrics.SamplerQueryFailure.Inc(1)
+ s.logger.Infof("failed to fetch sampling strategy: %v", err)
+ return
+ }
+ strategy, err := s.samplingParser.Parse(res)
+ if err != nil {
+ s.metrics.SamplerUpdateFailure.Inc(1)
+ s.logger.Infof("failed to parse sampling strategy response: %v", err)
+ return
+ }
+
+ s.Lock()
+ defer s.Unlock()
+
+ s.metrics.SamplerRetrieved.Inc(1)
+ if err := s.updateSamplerViaUpdaters(strategy); err != nil {
+ s.metrics.SamplerUpdateFailure.Inc(1)
+ s.logger.Infof("failed to handle sampling strategy response %+v. Got error: %v", res, err)
+ return
+ }
+ s.metrics.SamplerUpdated.Inc(1)
+}
+
+// NB: this function should only be called while holding a Write lock
+func (s *RemotelyControlledSampler) updateSamplerViaUpdaters(strategy interface{}) error {
+ for _, updater := range s.updaters {
+ sampler, err := updater.Update(s.sampler, strategy)
+ if err != nil {
+ return err
+ }
+ if sampler != nil {
+ s.sampler = sampler
+ return nil
+ }
+ }
+ return fmt.Errorf("unsupported sampling strategy %+v", strategy)
+}
+
+// -----------------------
+
+// ProbabilisticSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
+type ProbabilisticSamplerUpdater struct{}
+
+// Update implements Update of SamplerUpdater.
+func (u *ProbabilisticSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
+ type response interface {
+ GetProbabilisticSampling() *sampling.ProbabilisticSamplingStrategy
+ }
+ var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
+ if resp, ok := strategy.(response); ok {
+ if probabilistic := resp.GetProbabilisticSampling(); probabilistic != nil {
+ if ps, ok := sampler.(*ProbabilisticSampler); ok {
+ if err := ps.Update(probabilistic.SamplingRate); err != nil {
+ return nil, err
+ }
+ return sampler, nil
+ }
+ return newProbabilisticSampler(probabilistic.SamplingRate), nil
+ }
+ }
+ return nil, nil
+}
+
+// -----------------------
+
+// RateLimitingSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
+type RateLimitingSamplerUpdater struct{}
+
+// Update implements Update of SamplerUpdater.
+func (u *RateLimitingSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
+ type response interface {
+ GetRateLimitingSampling() *sampling.RateLimitingSamplingStrategy
+ }
+ var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
+ if resp, ok := strategy.(response); ok {
+ if rateLimiting := resp.GetRateLimitingSampling(); rateLimiting != nil {
+ rateLimit := float64(rateLimiting.MaxTracesPerSecond)
+ if rl, ok := sampler.(*RateLimitingSampler); ok {
+ rl.Update(rateLimit)
+ return rl, nil
+ }
+ return NewRateLimitingSampler(rateLimit), nil
+ }
+ }
+ return nil, nil
+}
+
+// -----------------------
+
+// AdaptiveSamplerUpdater is used by RemotelyControlledSampler to parse sampling configuration.
+type AdaptiveSamplerUpdater struct {
+ MaxOperations int // required
+ OperationNameLateBinding bool
+}
+
+// Update implements Update of SamplerUpdater.
+func (u *AdaptiveSamplerUpdater) Update(sampler SamplerV2, strategy interface{}) (SamplerV2, error) {
+ type response interface {
+ GetOperationSampling() *sampling.PerOperationSamplingStrategies
+ }
+ var _ response = new(sampling.SamplingStrategyResponse) // sanity signature check
+ if p, ok := strategy.(response); ok {
+ if operations := p.GetOperationSampling(); operations != nil {
+ if as, ok := sampler.(*PerOperationSampler); ok {
+ as.update(operations)
+ return as, nil
+ }
+ return NewPerOperationSampler(PerOperationSamplerParams{
+ MaxOperations: u.MaxOperations,
+ OperationNameLateBinding: u.OperationNameLateBinding,
+ Strategies: operations,
+ }), nil
+ }
+ }
+ return nil, nil
+}
+
+// -----------------------
+
+type httpSamplingStrategyFetcher struct {
+ serverURL string
+ logger Logger
+}
+
+func (f *httpSamplingStrategyFetcher) Fetch(serviceName string) ([]byte, error) {
+ v := url.Values{}
+ v.Set("service", serviceName)
+ uri := f.serverURL + "?" + v.Encode()
+
+ // TODO create and reuse http.Client with proper timeout settings, etc.
+ resp, err := http.Get(uri)
+ if err != nil {
+ return nil, err
+ }
+
+ defer func() {
+ if err := resp.Body.Close(); err != nil {
+ f.logger.Error(fmt.Sprintf("failed to close HTTP response body: %+v", err))
+ }
+ }()
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.StatusCode >= 400 {
+ return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body)
+ }
+
+ return body, nil
+}
+
+// -----------------------
+
+type samplingStrategyParser struct{}
+
+func (p *samplingStrategyParser) Parse(response []byte) (interface{}, error) {
+ strategy := new(sampling.SamplingStrategyResponse)
+ if err := json.Unmarshal(response, strategy); err != nil {
+ return nil, err
+ }
+ return strategy, nil
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/sampler_options.go b/vendor/github.com/uber/jaeger-client-go/sampler_remote_options.go
index 75d28a561..7a292effc 100644
--- a/vendor/github.com/uber/jaeger-client-go/sampler_options.go
+++ b/vendor/github.com/uber/jaeger-client-go/sampler_remote_options.go
@@ -16,6 +16,8 @@ package jaeger
import (
"time"
+
+ "github.com/uber/jaeger-client-go/log"
)
// SamplerOption is a function that sets some option on the sampler
@@ -27,10 +29,13 @@ var SamplerOptions samplerOptions
type samplerOptions struct {
metrics *Metrics
maxOperations int
- sampler Sampler
+ sampler SamplerV2
logger Logger
samplingServerURL string
samplingRefreshInterval time.Duration
+ samplingFetcher SamplingStrategyFetcher
+ samplingParser SamplingStrategyParser
+ updaters []SamplerUpdater
}
// Metrics creates a SamplerOption that initializes Metrics on the sampler,
@@ -53,7 +58,7 @@ func (samplerOptions) MaxOperations(maxOperations int) SamplerOption {
// to use before a remote sampler is created and used.
func (samplerOptions) InitialSampler(sampler Sampler) SamplerOption {
return func(o *samplerOptions) {
- o.sampler = sampler
+ o.sampler = samplerV1toV2(sampler)
}
}
@@ -79,3 +84,65 @@ func (samplerOptions) SamplingRefreshInterval(samplingRefreshInterval time.Durat
o.samplingRefreshInterval = samplingRefreshInterval
}
}
+
+// SamplingStrategyFetcher creates a SamplerOption that initializes sampling strategy fetcher.
+func (samplerOptions) SamplingStrategyFetcher(fetcher SamplingStrategyFetcher) SamplerOption {
+ return func(o *samplerOptions) {
+ o.samplingFetcher = fetcher
+ }
+}
+
+// SamplingStrategyParser creates a SamplerOption that initializes sampling strategy parser.
+func (samplerOptions) SamplingStrategyParser(parser SamplingStrategyParser) SamplerOption {
+ return func(o *samplerOptions) {
+ o.samplingParser = parser
+ }
+}
+
+// Updaters creates a SamplerOption that initializes sampler updaters.
+func (samplerOptions) Updaters(updaters ...SamplerUpdater) SamplerOption {
+ return func(o *samplerOptions) {
+ o.updaters = updaters
+ }
+}
+
+func (o *samplerOptions) applyOptionsAndDefaults(opts ...SamplerOption) *samplerOptions {
+ for _, option := range opts {
+ option(o)
+ }
+ if o.sampler == nil {
+ o.sampler = newProbabilisticSampler(0.001)
+ }
+ if o.logger == nil {
+ o.logger = log.NullLogger
+ }
+ if o.maxOperations <= 0 {
+ o.maxOperations = defaultMaxOperations
+ }
+ if o.samplingServerURL == "" {
+ o.samplingServerURL = DefaultSamplingServerURL
+ }
+ if o.metrics == nil {
+ o.metrics = NewNullMetrics()
+ }
+ if o.samplingRefreshInterval <= 0 {
+ o.samplingRefreshInterval = defaultSamplingRefreshInterval
+ }
+ if o.samplingFetcher == nil {
+ o.samplingFetcher = &httpSamplingStrategyFetcher{
+ serverURL: o.samplingServerURL,
+ logger: o.logger,
+ }
+ }
+ if o.samplingParser == nil {
+ o.samplingParser = new(samplingStrategyParser)
+ }
+ if o.updaters == nil {
+ o.updaters = []SamplerUpdater{
+ &AdaptiveSamplerUpdater{MaxOperations: o.maxOperations},
+ new(ProbabilisticSamplerUpdater),
+ new(RateLimitingSamplerUpdater),
+ }
+ }
+ return o
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/sampler_v2.go b/vendor/github.com/uber/jaeger-client-go/sampler_v2.go
new file mode 100644
index 000000000..a50671a23
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/sampler_v2.go
@@ -0,0 +1,93 @@
+// Copyright (c) 2019 Uber Technologies, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package jaeger
+
+// SamplingDecision is returned by the V2 samplers.
+type SamplingDecision struct {
+ Sample bool
+ Retryable bool
+ Tags []Tag
+}
+
+// SamplerV2 is an extension of the V1 samplers that allows sampling decisions
+// be made at different points of the span lifecycle.
+type SamplerV2 interface {
+ OnCreateSpan(span *Span) SamplingDecision
+ OnSetOperationName(span *Span, operationName string) SamplingDecision
+ OnSetTag(span *Span, key string, value interface{}) SamplingDecision
+ OnFinishSpan(span *Span) SamplingDecision
+
+ // Close does a clean shutdown of the sampler, stopping any background
+ // go-routines it may have started.
+ Close()
+}
+
+// samplerV1toV2 wraps legacy V1 sampler into an adapter that make it look like V2.
+func samplerV1toV2(s Sampler) SamplerV2 {
+ if s2, ok := s.(SamplerV2); ok {
+ return s2
+ }
+ type legacySamplerV1toV2Adapter struct {
+ legacySamplerV1Base
+ }
+ return &legacySamplerV1toV2Adapter{
+ legacySamplerV1Base: legacySamplerV1Base{
+ delegate: s.IsSampled,
+ },
+ }
+}
+
+// SamplerV2Base can be used by V2 samplers to implement dummy V1 methods.
+// Supporting V1 API is required because Tracer configuration only accepts V1 Sampler
+// for backwards compatibility reasons.
+// TODO (breaking change) remove this in the next major release
+type SamplerV2Base struct{}
+
+// IsSampled implements IsSampled of Sampler.
+func (SamplerV2Base) IsSampled(id TraceID, operation string) (sampled bool, tags []Tag) {
+ return false, nil
+}
+
+// Close implements Close of Sampler.
+func (SamplerV2Base) Close() {}
+
+// Equal implements Equal of Sampler.
+func (SamplerV2Base) Equal(other Sampler) bool { return false }
+
+// legacySamplerV1Base is used as a base for simple samplers that only implement
+// the legacy isSampled() function that is not sensitive to its arguments.
+type legacySamplerV1Base struct {
+ delegate func(id TraceID, operation string) (sampled bool, tags []Tag)
+}
+
+func (s *legacySamplerV1Base) OnCreateSpan(span *Span) SamplingDecision {
+ isSampled, tags := s.delegate(span.context.traceID, span.operationName)
+ return SamplingDecision{Sample: isSampled, Retryable: false, Tags: tags}
+}
+
+func (s *legacySamplerV1Base) OnSetOperationName(span *Span, operationName string) SamplingDecision {
+ isSampled, tags := s.delegate(span.context.traceID, span.operationName)
+ return SamplingDecision{Sample: isSampled, Retryable: false, Tags: tags}
+}
+
+func (s *legacySamplerV1Base) OnSetTag(span *Span, key string, value interface{}) SamplingDecision {
+ return SamplingDecision{Sample: false, Retryable: true}
+}
+
+func (s *legacySamplerV1Base) OnFinishSpan(span *Span) SamplingDecision {
+ return SamplingDecision{Sample: false, Retryable: true}
+}
+
+func (s *legacySamplerV1Base) Close() {}
diff --git a/vendor/github.com/uber/jaeger-client-go/span.go b/vendor/github.com/uber/jaeger-client-go/span.go
index 9df8b6017..bbf6fb068 100644
--- a/vendor/github.com/uber/jaeger-client-go/span.go
+++ b/vendor/github.com/uber/jaeger-client-go/span.go
@@ -34,6 +34,7 @@ type Span struct {
tracer *Tracer
+ // TODO: (breaking change) change to use a pointer
context SpanContext
// The name of the "operation" this span is an instance of.
@@ -65,18 +66,26 @@ type Span struct {
}
// Tag is a simple key value wrapper.
-// TODO deprecate in the next major release, use opentracing.Tag instead.
+// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
type Tag struct {
key string
value interface{}
}
+// NewTag creates a new Tag.
+// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
+func NewTag(key string, value interface{}) Tag {
+ return Tag{key: key, value: value}
+}
+
// SetOperationName sets or changes the operation name.
func (s *Span) SetOperationName(operationName string) opentracing.Span {
s.Lock()
- defer s.Unlock()
- if s.context.IsSampled() {
- s.operationName = operationName
+ s.operationName = operationName
+ s.Unlock()
+ if !s.isSamplingFinalized() {
+ decision := s.tracer.sampler.OnSetOperationName(s, operationName)
+ s.applySamplingDecision(decision, true)
}
s.observer.OnSetOperationName(operationName)
return s
@@ -84,14 +93,24 @@ func (s *Span) SetOperationName(operationName string) opentracing.Span {
// SetTag implements SetTag() of opentracing.Span
func (s *Span) SetTag(key string, value interface{}) opentracing.Span {
+ return s.setTagInternal(key, value, true)
+}
+
+func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span {
s.observer.OnSetTag(key, value)
if key == string(ext.SamplingPriority) && !setSamplingPriority(s, value) {
return s
}
- s.Lock()
- defer s.Unlock()
- if s.context.IsSampled() {
- s.setTagNoLocking(key, value)
+ if !s.isSamplingFinalized() {
+ decision := s.tracer.sampler.OnSetTag(s, key, value)
+ s.applySamplingDecision(decision, lock)
+ }
+ if s.isWriteable() {
+ if lock {
+ s.Lock()
+ defer s.Unlock()
+ }
+ s.appendTagNoLocking(key, value)
}
return s
}
@@ -121,14 +140,38 @@ func (s *Span) Duration() time.Duration {
func (s *Span) Tags() opentracing.Tags {
s.Lock()
defer s.Unlock()
- var result = make(opentracing.Tags)
+ var result = make(opentracing.Tags, len(s.tags))
for _, tag := range s.tags {
result[tag.key] = tag.value
}
return result
}
-func (s *Span) setTagNoLocking(key string, value interface{}) {
+// Logs returns micro logs for span
+func (s *Span) Logs() []opentracing.LogRecord {
+ s.Lock()
+ defer s.Unlock()
+
+ return append([]opentracing.LogRecord(nil), s.logs...)
+}
+
+// References returns references for this span
+func (s *Span) References() []opentracing.SpanReference {
+ s.Lock()
+ defer s.Unlock()
+
+ if s.references == nil || len(s.references) == 0 {
+ return nil
+ }
+
+ result := make([]opentracing.SpanReference, len(s.references))
+ for i, r := range s.references {
+ result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context}
+ }
+ return result
+}
+
+func (s *Span) appendTagNoLocking(key string, value interface{}) {
s.tags = append(s.tags, Tag{key: key, value: value})
}
@@ -148,7 +191,7 @@ func (s *Span) logFieldsNoLocking(fields ...log.Field) {
Fields: fields,
Timestamp: time.Now(),
}
- s.appendLog(lr)
+ s.appendLogNoLocking(lr)
}
// LogKV implements opentracing.Span API
@@ -185,12 +228,12 @@ func (s *Span) Log(ld opentracing.LogData) {
if ld.Timestamp.IsZero() {
ld.Timestamp = s.tracer.timeNow()
}
- s.appendLog(ld.ToLogRecord())
+ s.appendLogNoLocking(ld.ToLogRecord())
}
}
// this function should only be called while holding a Write lock
-func (s *Span) appendLog(lr opentracing.LogRecord) {
+func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) {
// TODO add logic to limit number of logs per span (issue #46)
s.logs = append(s.logs, lr)
}
@@ -224,17 +267,25 @@ func (s *Span) FinishWithOptions(options opentracing.FinishOptions) {
}
s.observer.OnFinish(options)
s.Lock()
+ s.duration = options.FinishTime.Sub(s.startTime)
+ s.Unlock()
+ if !s.isSamplingFinalized() {
+ decision := s.tracer.sampler.OnFinishSpan(s)
+ s.applySamplingDecision(decision, true)
+ }
if s.context.IsSampled() {
- s.duration = options.FinishTime.Sub(s.startTime)
- // Note: bulk logs are not subject to maxLogsPerSpan limit
- if options.LogRecords != nil {
- s.logs = append(s.logs, options.LogRecords...)
- }
- for _, ld := range options.BulkLogData {
- s.logs = append(s.logs, ld.ToLogRecord())
+ if len(options.LogRecords) > 0 || len(options.BulkLogData) > 0 {
+ s.Lock()
+ // Note: bulk logs are not subject to maxLogsPerSpan limit
+ if options.LogRecords != nil {
+ s.logs = append(s.logs, options.LogRecords...)
+ }
+ for _, ld := range options.BulkLogData {
+ s.logs = append(s.logs, ld.ToLogRecord())
+ }
+ s.Unlock()
}
}
- s.Unlock()
// call reportSpan even for non-sampled traces, to return span to the pool
// and update metrics counter
s.tracer.reportSpan(s)
@@ -300,23 +351,62 @@ func (s *Span) serviceName() string {
return s.tracer.serviceName
}
+func (s *Span) applySamplingDecision(decision SamplingDecision, lock bool) {
+ if !decision.Retryable {
+ s.context.samplingState.setFinal()
+ }
+ if decision.Sample {
+ s.context.samplingState.setSampled()
+ if len(decision.Tags) > 0 {
+ if lock {
+ s.Lock()
+ defer s.Unlock()
+ }
+ for _, tag := range decision.Tags {
+ s.appendTagNoLocking(tag.key, tag.value)
+ }
+ }
+ }
+}
+
+// Span can be written to if it is sampled or the sampling decision has not been finalized.
+func (s *Span) isWriteable() bool {
+ state := s.context.samplingState
+ return !state.isFinal() || state.isSampled()
+}
+
+func (s *Span) isSamplingFinalized() bool {
+ return s.context.samplingState.isFinal()
+}
+
// setSamplingPriority returns true if the flag was updated successfully, false otherwise.
+// The behavior of setSamplingPriority is surprising
+// If noDebugFlagOnForcedSampling is set
+// setSamplingPriority(span, 1) always sets only flagSampled
+// If noDebugFlagOnForcedSampling is unset, and isDebugAllowed passes
+// setSamplingPriority(span, 1) sets both flagSampled and flagDebug
+// However,
+// setSamplingPriority(span, 0) always only resets flagSampled
+//
+// This means that doing a setSamplingPriority(span, 1) followed by setSamplingPriority(span, 0) can
+// leave flagDebug set
func setSamplingPriority(s *Span, value interface{}) bool {
val, ok := value.(uint16)
if !ok {
return false
}
- s.Lock()
- defer s.Unlock()
if val == 0 {
- s.context.flags = s.context.flags & (^flagSampled)
+ s.context.samplingState.unsetSampled()
+ s.context.samplingState.setFinal()
return true
}
if s.tracer.options.noDebugFlagOnForcedSampling {
- s.context.flags = s.context.flags | flagSampled
+ s.context.samplingState.setSampled()
+ s.context.samplingState.setFinal()
return true
} else if s.tracer.isDebugAllowed(s.operationName) {
- s.context.flags = s.context.flags | flagDebug | flagSampled
+ s.context.samplingState.setDebugAndSampled()
+ s.context.samplingState.setFinal()
return true
}
return false
@@ -326,5 +416,5 @@ func setSamplingPriority(s *Span, value interface{}) bool {
func EnableFirehose(s *Span) {
s.Lock()
defer s.Unlock()
- s.context.flags |= flagFirehose
+ s.context.samplingState.setFirehose()
}
diff --git a/vendor/github.com/uber/jaeger-client-go/context.go b/vendor/github.com/uber/jaeger-client-go/span_context.go
index 43553655a..b7230abfe 100644
--- a/vendor/github.com/uber/jaeger-client-go/context.go
+++ b/vendor/github.com/uber/jaeger-client-go/span_context.go
@@ -19,12 +19,15 @@ import (
"fmt"
"strconv"
"strings"
+ "sync"
+
+ "go.uber.org/atomic"
)
const (
- flagSampled = byte(1)
- flagDebug = byte(2)
- flagFirehose = byte(8)
+ flagSampled = 1
+ flagDebug = 2
+ flagFirehose = 8
)
var (
@@ -56,9 +59,6 @@ type SpanContext struct {
// Should be 0 if the current span is a root span.
parentID SpanID
- // flags is a bitmap containing such bits as 'sampled' and 'debug'.
- flags byte
-
// Distributed Context baggage. The is a snapshot in time.
baggage map[string]string
@@ -67,6 +67,102 @@ type SpanContext struct {
//
// See JaegerDebugHeader in constants.go
debugID string
+
+ // samplingState is shared across all spans
+ samplingState *samplingState
+
+ // remote indicates that span context represents a remote parent
+ remote bool
+}
+
+type samplingState struct {
+ // Span context's state flags that are propagated across processes. Only lower 8 bits are used.
+ // We use an int32 instead of byte to be able to use CAS operations.
+ stateFlags atomic.Int32
+
+ // When state is not final, sampling will be retried on other span write operations,
+ // like SetOperationName / SetTag, and the spans will remain writable.
+ final atomic.Bool
+
+ // localRootSpan stores the SpanID of the first span created in this process for a given trace.
+ localRootSpan SpanID
+
+ // extendedState allows samplers to keep intermediate state.
+ // The keys and values in this map are completely opaque: interface{} -> interface{}.
+ extendedState sync.Map
+}
+
+func (s *samplingState) isLocalRootSpan(id SpanID) bool {
+ return id == s.localRootSpan
+}
+
+func (s *samplingState) setFlag(newFlag int32) {
+ swapped := false
+ for !swapped {
+ old := s.stateFlags.Load()
+ swapped = s.stateFlags.CAS(old, old|newFlag)
+ }
+}
+
+func (s *samplingState) unsetFlag(newFlag int32) {
+ swapped := false
+ for !swapped {
+ old := s.stateFlags.Load()
+ swapped = s.stateFlags.CAS(old, old&^newFlag)
+ }
+}
+
+func (s *samplingState) setSampled() {
+ s.setFlag(flagSampled)
+}
+
+func (s *samplingState) unsetSampled() {
+ s.unsetFlag(flagSampled)
+}
+
+func (s *samplingState) setDebugAndSampled() {
+ s.setFlag(flagDebug | flagSampled)
+}
+
+func (s *samplingState) setFirehose() {
+ s.setFlag(flagFirehose)
+}
+
+func (s *samplingState) setFlags(flags byte) {
+ s.stateFlags.Store(int32(flags))
+}
+
+func (s *samplingState) setFinal() {
+ s.final.Store(true)
+}
+
+func (s *samplingState) flags() byte {
+ return byte(s.stateFlags.Load())
+}
+
+func (s *samplingState) isSampled() bool {
+ return s.stateFlags.Load()&flagSampled == flagSampled
+}
+
+func (s *samplingState) isDebug() bool {
+ return s.stateFlags.Load()&flagDebug == flagDebug
+}
+
+func (s *samplingState) isFirehose() bool {
+ return s.stateFlags.Load()&flagFirehose == flagFirehose
+}
+
+func (s *samplingState) isFinal() bool {
+ return s.final.Load()
+}
+
+func (s *samplingState) extendedStateForKey(key interface{}, initValue func() interface{}) interface{} {
+ if value, ok := s.extendedState.Load(key); ok {
+ return value
+ }
+ value := initValue()
+ value, _ = s.extendedState.LoadOrStore(key, value)
+ return value
}
// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
@@ -81,17 +177,28 @@ func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) {
// IsSampled returns whether this trace was chosen for permanent storage
// by the sampling mechanism of the tracer.
func (c SpanContext) IsSampled() bool {
- return (c.flags & flagSampled) == flagSampled
+ return c.samplingState.isSampled()
}
// IsDebug indicates whether sampling was explicitly requested by the service.
func (c SpanContext) IsDebug() bool {
- return (c.flags & flagDebug) == flagDebug
+ return c.samplingState.isDebug()
+}
+
+// IsSamplingFinalized indicates whether the sampling decision has been finalized.
+func (c SpanContext) IsSamplingFinalized() bool {
+ return c.samplingState.isFinal()
}
// IsFirehose indicates whether the firehose flag was set
func (c SpanContext) IsFirehose() bool {
- return (c.flags & flagFirehose) == flagFirehose
+ return c.samplingState.isFirehose()
+}
+
+// ExtendedSamplingState returns the custom state object for a given key. If the value for this key does not exist,
+// it is initialized via initValue function. This state can be used by samplers (e.g. x.PrioritySampler).
+func (c SpanContext) ExtendedSamplingState(key interface{}, initValue func() interface{}) interface{} {
+ return c.samplingState.extendedStateForKey(key, initValue)
}
// IsValid indicates whether this context actually represents a valid trace.
@@ -99,11 +206,16 @@ func (c SpanContext) IsValid() bool {
return c.traceID.IsValid() && c.spanID != 0
}
+// SetFirehose enables firehose mode for this trace.
+func (c SpanContext) SetFirehose() {
+ c.samplingState.setFirehose()
+}
+
func (c SpanContext) String() string {
if c.traceID.High == 0 {
- return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
+ return fmt.Sprintf("%x:%x:%x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
- return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.flags)
+ return fmt.Sprintf("%x%016x:%x:%x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
}
// ContextFromString reconstructs the Context encoded in a string
@@ -130,7 +242,8 @@ func ContextFromString(value string) (SpanContext, error) {
if err != nil {
return emptyContext, err
}
- context.flags = byte(flags)
+ context.samplingState = &samplingState{}
+ context.samplingState.setFlags(byte(flags))
return context, nil
}
@@ -149,18 +262,24 @@ func (c SpanContext) ParentID() SpanID {
return c.parentID
}
+// Flags returns the bitmap containing such bits as 'sampled' and 'debug'.
+func (c SpanContext) Flags() byte {
+ return c.samplingState.flags()
+}
+
// NewSpanContext creates a new instance of SpanContext
func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext {
- flags := byte(0)
+ samplingState := &samplingState{}
if sampled {
- flags = flagSampled
+ samplingState.setSampled()
}
+
return SpanContext{
- traceID: traceID,
- spanID: spanID,
- parentID: parentID,
- flags: flags,
- baggage: baggage}
+ traceID: traceID,
+ spanID: spanID,
+ parentID: parentID,
+ samplingState: samplingState,
+ baggage: baggage}
}
// CopyFrom copies data from ctx into this context, including span identity and baggage.
@@ -169,7 +288,7 @@ func (c *SpanContext) CopyFrom(ctx *SpanContext) {
c.traceID = ctx.traceID
c.spanID = ctx.spanID
c.parentID = ctx.parentID
- c.flags = ctx.flags
+ c.samplingState = ctx.samplingState
if l := len(ctx.baggage); l > 0 {
c.baggage = make(map[string]string, l)
for k, v := range ctx.baggage {
@@ -193,7 +312,7 @@ func (c SpanContext) WithBaggageItem(key, value string) SpanContext {
newBaggage[key] = value
}
// Use positional parameters so the compiler will help catch new fields.
- return SpanContext{c.traceID, c.spanID, c.parentID, c.flags, newBaggage, ""}
+ return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState, c.remote}
}
// isDebugIDContainerOnly returns true when the instance of the context is only
diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go
index 745a0c38a..f03372dc7 100644
--- a/vendor/github.com/uber/jaeger-client-go/tracer.go
+++ b/vendor/github.com/uber/jaeger-client-go/tracer.go
@@ -38,7 +38,7 @@ type Tracer struct {
serviceName string
hostIPv4 uint32 // this is for zipkin endpoint conversion
- sampler Sampler
+ sampler SamplerV2
reporter Reporter
metrics Metrics
logger log.Logger
@@ -74,6 +74,7 @@ type Tracer struct {
// NewTracer creates Tracer implementation that reports tracing to Jaeger.
// The returned io.Closer can be used in shutdown hooks to ensure that the internal
// queue of the Reporter is drained and all buffered spans are submitted to collectors.
+// TODO (breaking change) return *Tracer only, without closer.
func NewTracer(
serviceName string,
sampler Sampler,
@@ -82,7 +83,7 @@ func NewTracer(
) (opentracing.Tracer, io.Closer) {
t := &Tracer{
serviceName: serviceName,
- sampler: sampler,
+ sampler: samplerV1toV2(sampler),
reporter: reporter,
injectors: make(map[interface{}]Injector),
extractors: make(map[interface{}]Extractor),
@@ -261,7 +262,7 @@ func (t *Tracer) startSpanWithOptions(
rpcServer = (v == ext.SpanKindRPCServerEnum || v == string(ext.SpanKindRPCServerEnum))
}
- var samplerTags []Tag
+ var internalTags []Tag
newTrace := false
if !isSelfRef {
if !hasParent || !parent.IsValid() {
@@ -272,13 +273,12 @@ func (t *Tracer) startSpanWithOptions(
}
ctx.spanID = SpanID(ctx.traceID.Low)
ctx.parentID = 0
- ctx.flags = byte(0)
+ ctx.samplingState = &samplingState{
+ localRootSpan: ctx.spanID,
+ }
if hasParent && parent.isDebugIDContainerOnly() && t.isDebugAllowed(operationName) {
- ctx.flags |= (flagSampled | flagDebug)
- samplerTags = []Tag{{key: JaegerDebugHeader, value: parent.debugID}}
- } else if sampled, tags := t.sampler.IsSampled(ctx.traceID, operationName); sampled {
- ctx.flags |= flagSampled
- samplerTags = tags
+ ctx.samplingState.setDebugAndSampled()
+ internalTags = append(internalTags, Tag{key: JaegerDebugHeader, value: parent.debugID})
}
} else {
ctx.traceID = parent.traceID
@@ -290,7 +290,11 @@ func (t *Tracer) startSpanWithOptions(
ctx.spanID = SpanID(t.randomID())
ctx.parentID = parent.spanID
}
- ctx.flags = parent.flags
+ ctx.samplingState = parent.samplingState
+ if parent.remote {
+ ctx.samplingState.setFinal()
+ ctx.samplingState.localRootSpan = ctx.spanID
+ }
}
if hasParent {
// copy baggage items
@@ -305,17 +309,30 @@ func (t *Tracer) startSpanWithOptions(
sp := t.newSpan()
sp.context = ctx
+ sp.tracer = t
+ sp.operationName = operationName
+ sp.startTime = options.StartTime
+ sp.duration = 0
+ sp.references = references
+ sp.firstInProcess = rpcServer || sp.context.parentID == 0
+
+ if !sp.isSamplingFinalized() {
+ decision := t.sampler.OnCreateSpan(sp)
+ sp.applySamplingDecision(decision, false)
+ }
sp.observer = t.observer.OnStartSpan(sp, operationName, options)
- return t.startSpanInternal(
- sp,
- operationName,
- options.StartTime,
- samplerTags,
- options.Tags,
- newTrace,
- rpcServer,
- references,
- )
+
+ if tagsTotalLength := len(options.Tags) + len(internalTags); tagsTotalLength > 0 {
+ if sp.tags == nil || cap(sp.tags) < tagsTotalLength {
+ sp.tags = make([]Tag, 0, tagsTotalLength)
+ }
+ sp.tags = append(sp.tags, internalTags...)
+ for k, v := range options.Tags {
+ sp.setTagInternal(k, v, false)
+ }
+ }
+ t.emitNewSpanMetrics(sp, newTrace)
+ return sp
}
// Inject implements Inject() method of opentracing.Tracer
@@ -340,6 +357,7 @@ func (t *Tracer) Extract(
if err != nil {
return nil, err // ensure returned spanCtx is nil
}
+ spanCtx.remote = true
return spanCtx, nil
}
return nil, opentracing.ErrUnsupportedFormat
@@ -350,10 +368,10 @@ func (t *Tracer) Close() error {
t.reporter.Close()
t.sampler.Close()
if mgr, ok := t.baggageRestrictionManager.(io.Closer); ok {
- mgr.Close()
+ _ = mgr.Close()
}
if throttler, ok := t.debugThrottler.(io.Closer); ok {
- throttler.Close()
+ _ = throttler.Close()
}
return nil
}
@@ -368,6 +386,7 @@ func (t *Tracer) Tags() []opentracing.Tag {
}
// getTag returns the value of specific tag, if not exists, return nil.
+// TODO only used by tests, move there.
func (t *Tracer) getTag(key string) (interface{}, bool) {
for _, tag := range t.tags {
if tag.key == key {
@@ -383,41 +402,21 @@ func (t *Tracer) newSpan() *Span {
return t.spanAllocator.Get()
}
-func (t *Tracer) startSpanInternal(
- sp *Span,
- operationName string,
- startTime time.Time,
- internalTags []Tag,
- tags opentracing.Tags,
- newTrace bool,
- rpcServer bool,
- references []Reference,
-) *Span {
- sp.tracer = t
- sp.operationName = operationName
- sp.startTime = startTime
- sp.duration = 0
- sp.references = references
- sp.firstInProcess = rpcServer || sp.context.parentID == 0
- if len(tags) > 0 || len(internalTags) > 0 {
- sp.tags = make([]Tag, len(internalTags), len(tags)+len(internalTags))
- copy(sp.tags, internalTags)
- for k, v := range tags {
- sp.observer.OnSetTag(k, v)
- if k == string(ext.SamplingPriority) && !setSamplingPriority(sp, v) {
- continue
- }
- sp.setTagNoLocking(k, v)
+// emitNewSpanMetrics generates metrics on the number of started spans and traces.
+// newTrace param: we cannot simply check for parentID==0 because in Zipkin model the
+// server-side RPC span has the exact same trace/span/parent IDs as the
+// calling client-side span, but obviously the server side span is
+// no longer a root span of the trace.
+func (t *Tracer) emitNewSpanMetrics(sp *Span, newTrace bool) {
+ if !sp.isSamplingFinalized() {
+ t.metrics.SpansStartedDelayedSampling.Inc(1)
+ if newTrace {
+ t.metrics.TracesStartedDelayedSampling.Inc(1)
}
- }
- // emit metrics
- if sp.context.IsSampled() {
+ // joining a trace is not possible, because sampling decision inherited from upstream is final
+ } else if sp.context.IsSampled() {
t.metrics.SpansStartedSampled.Inc(1)
if newTrace {
- // We cannot simply check for parentID==0 because in Zipkin model the
- // server-side RPC span has the exact same trace/span/parent IDs as the
- // calling client-side span, but obviously the server side span is
- // no longer a root span of the trace.
t.metrics.TracesStartedSampled.Inc(1)
} else if sp.firstInProcess {
t.metrics.TracesJoinedSampled.Inc(1)
@@ -430,15 +429,20 @@ func (t *Tracer) startSpanInternal(
t.metrics.TracesJoinedNotSampled.Inc(1)
}
}
- return sp
}
func (t *Tracer) reportSpan(sp *Span) {
- t.metrics.SpansFinished.Inc(1)
+ if !sp.isSamplingFinalized() {
+ t.metrics.SpansFinishedDelayedSampling.Inc(1)
+ } else if sp.context.IsSampled() {
+ t.metrics.SpansFinishedSampled.Inc(1)
+ } else {
+ t.metrics.SpansFinishedNotSampled.Inc(1)
+ }
- // Note: if the reporter is processing Span asynchronously need to Retain() it
- // otherwise, in the racing condition will be rewritten span data before it will be sent
- // * To remove object use method span.Release()
+ // Note: if the reporter is processing Span asynchronously then it needs to Retain() the span,
+ // and then Release() it when no longer needed.
+ // Otherwise, the span may be reused for another trace and its data may be overwritten.
if sp.context.IsSampled() {
t.reporter.Report(sp)
}
@@ -466,6 +470,11 @@ func (t *Tracer) isDebugAllowed(operation string) bool {
return t.debugThrottler.IsAllowed(operation)
}
+// Sampler returns the sampler given to the tracer at creation.
+func (t *Tracer) Sampler() SamplerV2 {
+ return t.sampler
+}
+
// SelfRef creates an opentracing compliant SpanReference from a jaeger
// SpanContext. This is a factory function in order to encapsulate jaeger specific
// types.
diff --git a/vendor/github.com/uber/jaeger-client-go/utils/rate_limiter.go b/vendor/github.com/uber/jaeger-client-go/utils/rate_limiter.go
index 1b8db9758..bf2f13165 100644
--- a/vendor/github.com/uber/jaeger-client-go/utils/rate_limiter.go
+++ b/vendor/github.com/uber/jaeger-client-go/utils/rate_limiter.go
@@ -20,22 +20,15 @@ import (
)
// RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits.
+//
+// TODO (breaking change) remove this interface in favor of public struct below
+//
+// Deprecated, use ReconfigurableRateLimiter.
type RateLimiter interface {
CheckCredit(itemCost float64) bool
}
-type rateLimiter struct {
- sync.Mutex
-
- creditsPerSecond float64
- balance float64
- maxBalance float64
- lastTick time.Time
-
- timeNow func() time.Time
-}
-
-// NewRateLimiter creates a new rate limiter based on leaky bucket algorithm, formulated in terms of a
+// ReconfigurableRateLimiter is a rate limiter based on leaky bucket algorithm, formulated in terms of a
// credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional
// to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost
// of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased"
@@ -47,31 +40,73 @@ type rateLimiter struct {
//
// It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput
// as bytes/second, and calling CheckCredit() with the actual message size.
-func NewRateLimiter(creditsPerSecond, maxBalance float64) RateLimiter {
- return &rateLimiter{
+//
+// TODO (breaking change) rename to RateLimiter once the interface is removed
+type ReconfigurableRateLimiter struct {
+ lock sync.Mutex
+
+ creditsPerSecond float64
+ balance float64
+ maxBalance float64
+ lastTick time.Time
+
+ timeNow func() time.Time
+}
+
+// NewRateLimiter creates a new ReconfigurableRateLimiter.
+func NewRateLimiter(creditsPerSecond, maxBalance float64) *ReconfigurableRateLimiter {
+ return &ReconfigurableRateLimiter{
creditsPerSecond: creditsPerSecond,
balance: maxBalance,
maxBalance: maxBalance,
lastTick: time.Now(),
- timeNow: time.Now}
+ timeNow: time.Now,
+ }
}
-func (b *rateLimiter) CheckCredit(itemCost float64) bool {
- b.Lock()
- defer b.Unlock()
- // calculate how much time passed since the last tick, and update current tick
- currentTime := b.timeNow()
- elapsedTime := currentTime.Sub(b.lastTick)
- b.lastTick = currentTime
- // calculate how much credit have we accumulated since the last tick
- b.balance += elapsedTime.Seconds() * b.creditsPerSecond
- if b.balance > b.maxBalance {
- b.balance = b.maxBalance
- }
+// CheckCredit tries to reduce the current balance by itemCost provided that the current balance
+// is not lest than itemCost.
+func (rl *ReconfigurableRateLimiter) CheckCredit(itemCost float64) bool {
+ rl.lock.Lock()
+ defer rl.lock.Unlock()
+
// if we have enough credits to pay for current item, then reduce balance and allow
- if b.balance >= itemCost {
- b.balance -= itemCost
+ if rl.balance >= itemCost {
+ rl.balance -= itemCost
+ return true
+ }
+ // otherwise check if balance can be increased due to time elapsed, and try again
+ rl.updateBalance()
+ if rl.balance >= itemCost {
+ rl.balance -= itemCost
return true
}
return false
}
+
+// updateBalance recalculates current balance based on time elapsed. Must be called while holding a lock.
+func (rl *ReconfigurableRateLimiter) updateBalance() {
+ // calculate how much time passed since the last tick, and update current tick
+ currentTime := rl.timeNow()
+ elapsedTime := currentTime.Sub(rl.lastTick)
+ rl.lastTick = currentTime
+ // calculate how much credit have we accumulated since the last tick
+ rl.balance += elapsedTime.Seconds() * rl.creditsPerSecond
+ if rl.balance > rl.maxBalance {
+ rl.balance = rl.maxBalance
+ }
+}
+
+// Update changes the main parameters of the rate limiter in-place, while retaining
+// the current accumulated balance (pro-rated to the new maxBalance value). Using this method
+// instead of creating a new rate limiter helps to avoid thundering herd when sampling
+// strategies are updated.
+func (rl *ReconfigurableRateLimiter) Update(creditsPerSecond, maxBalance float64) {
+ rl.lock.Lock()
+ defer rl.lock.Unlock()
+
+ rl.updateBalance() // get up to date balance
+ rl.balance = rl.balance * maxBalance / rl.maxBalance
+ rl.creditsPerSecond = creditsPerSecond
+ rl.maxBalance = maxBalance
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/zipkin.go b/vendor/github.com/uber/jaeger-client-go/zipkin.go
index 636952b7f..98cab4b6e 100644
--- a/vendor/github.com/uber/jaeger-client-go/zipkin.go
+++ b/vendor/github.com/uber/jaeger-client-go/zipkin.go
@@ -55,7 +55,7 @@ func (p *zipkinPropagator) Inject(
carrier.SetTraceID(ctx.TraceID().Low) // TODO this cannot work with 128bit IDs
carrier.SetSpanID(uint64(ctx.SpanID()))
carrier.SetParentID(uint64(ctx.ParentID()))
- carrier.SetFlags(ctx.flags)
+ carrier.SetFlags(ctx.samplingState.flags())
return nil
}
@@ -71,6 +71,7 @@ func (p *zipkinPropagator) Extract(abstractCarrier interface{}) (SpanContext, er
ctx.traceID.Low = carrier.TraceID()
ctx.spanID = SpanID(carrier.SpanID())
ctx.parentID = SpanID(carrier.ParentID())
- ctx.flags = carrier.Flags()
+ ctx.samplingState = &samplingState{}
+ ctx.samplingState.setFlags(carrier.Flags())
return ctx, nil
}