aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/uber/jaeger-client-go/internal
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/uber/jaeger-client-go/internal')
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/options.go101
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/restriction_manager.go157
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/baggage/restriction_manager.go71
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/spanlog/json.go81
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/options.go99
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/throttler.go216
-rw-r--r--vendor/github.com/uber/jaeger-client-go/internal/throttler/throttler.go32
7 files changed, 757 insertions, 0 deletions
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/options.go b/vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/options.go
new file mode 100644
index 000000000..745729319
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/options.go
@@ -0,0 +1,101 @@
+// Copyright (c) 2017 Uber Technologies, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+ "time"
+
+ "github.com/uber/jaeger-client-go"
+)
+
+const (
+ defaultMaxValueLength = 2048
+ defaultRefreshInterval = time.Minute
+ defaultHostPort = "localhost:5778"
+)
+
+// Option is a function that sets some option on the RestrictionManager
+type Option func(options *options)
+
+// Options is a factory for all available options
+var Options options
+
+type options struct {
+ denyBaggageOnInitializationFailure bool
+ metrics *jaeger.Metrics
+ logger jaeger.Logger
+ hostPort string
+ refreshInterval time.Duration
+}
+
+// DenyBaggageOnInitializationFailure creates an Option that determines the startup failure mode of RestrictionManager.
+// If DenyBaggageOnInitializationFailure is true, RestrictionManager will not allow any baggage to be written until baggage
+// restrictions have been retrieved from agent.
+// If DenyBaggageOnInitializationFailure is false, RestrictionManager will allow any baggage to be written until baggage
+// restrictions have been retrieved from agent.
+func (options) DenyBaggageOnInitializationFailure(b bool) Option {
+ return func(o *options) {
+ o.denyBaggageOnInitializationFailure = b
+ }
+}
+
+// Metrics creates an Option that initializes Metrics on the RestrictionManager, which is used to emit statistics.
+func (options) Metrics(m *jaeger.Metrics) Option {
+ return func(o *options) {
+ o.metrics = m
+ }
+}
+
+// Logger creates an Option that sets the logger used by the RestrictionManager.
+func (options) Logger(logger jaeger.Logger) Option {
+ return func(o *options) {
+ o.logger = logger
+ }
+}
+
+// HostPort creates an Option that sets the hostPort of the local agent that contains the baggage restrictions.
+func (options) HostPort(hostPort string) Option {
+ return func(o *options) {
+ o.hostPort = hostPort
+ }
+}
+
+// RefreshInterval creates an Option that sets how often the RestrictionManager will poll local agent for
+// the baggage restrictions.
+func (options) RefreshInterval(refreshInterval time.Duration) Option {
+ return func(o *options) {
+ o.refreshInterval = refreshInterval
+ }
+}
+
+func applyOptions(o ...Option) options {
+ opts := options{}
+ for _, option := range o {
+ option(&opts)
+ }
+ if opts.metrics == nil {
+ opts.metrics = jaeger.NewNullMetrics()
+ }
+ if opts.logger == nil {
+ opts.logger = jaeger.NullLogger
+ }
+ if opts.hostPort == "" {
+ opts.hostPort = defaultHostPort
+ }
+ if opts.refreshInterval == 0 {
+ opts.refreshInterval = defaultRefreshInterval
+ }
+ return opts
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/restriction_manager.go b/vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/restriction_manager.go
new file mode 100644
index 000000000..a56515aca
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/baggage/remote/restriction_manager.go
@@ -0,0 +1,157 @@
+// Copyright (c) 2017 Uber Technologies, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+ "fmt"
+ "net/url"
+ "sync"
+ "time"
+
+ "github.com/uber/jaeger-client-go/internal/baggage"
+ thrift "github.com/uber/jaeger-client-go/thrift-gen/baggage"
+ "github.com/uber/jaeger-client-go/utils"
+)
+
+type httpBaggageRestrictionManagerProxy struct {
+ url string
+}
+
+func newHTTPBaggageRestrictionManagerProxy(hostPort, serviceName string) *httpBaggageRestrictionManagerProxy {
+ v := url.Values{}
+ v.Set("service", serviceName)
+ return &httpBaggageRestrictionManagerProxy{
+ url: fmt.Sprintf("http://%s/baggageRestrictions?%s", hostPort, v.Encode()),
+ }
+}
+
+func (s *httpBaggageRestrictionManagerProxy) GetBaggageRestrictions(serviceName string) ([]*thrift.BaggageRestriction, error) {
+ var out []*thrift.BaggageRestriction
+ if err := utils.GetJSON(s.url, &out); err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+// RestrictionManager manages baggage restrictions by retrieving baggage restrictions from agent
+type RestrictionManager struct {
+ options
+
+ mux sync.RWMutex
+ serviceName string
+ restrictions map[string]*baggage.Restriction
+ thriftProxy thrift.BaggageRestrictionManager
+ pollStopped sync.WaitGroup
+ stopPoll chan struct{}
+ invalidRestriction *baggage.Restriction
+ validRestriction *baggage.Restriction
+
+ // Determines if the manager has successfully retrieved baggage restrictions from agent
+ initialized bool
+}
+
+// NewRestrictionManager returns a BaggageRestrictionManager that polls the agent for the latest
+// baggage restrictions.
+func NewRestrictionManager(serviceName string, options ...Option) *RestrictionManager {
+ // TODO there is a developing use case where a single tracer can generate traces on behalf of many services.
+ // restrictionsMap will need to exist per service
+ opts := applyOptions(options...)
+ m := &RestrictionManager{
+ serviceName: serviceName,
+ options: opts,
+ restrictions: make(map[string]*baggage.Restriction),
+ thriftProxy: newHTTPBaggageRestrictionManagerProxy(opts.hostPort, serviceName),
+ stopPoll: make(chan struct{}),
+ invalidRestriction: baggage.NewRestriction(false, 0),
+ validRestriction: baggage.NewRestriction(true, defaultMaxValueLength),
+ }
+ m.pollStopped.Add(1)
+ go m.pollManager()
+ return m
+}
+
+// isReady returns true if the manager has retrieved baggage restrictions from the remote source.
+func (m *RestrictionManager) isReady() bool {
+ m.mux.RLock()
+ defer m.mux.RUnlock()
+ return m.initialized
+}
+
+// GetRestriction implements RestrictionManager#GetRestriction.
+func (m *RestrictionManager) GetRestriction(service, key string) *baggage.Restriction {
+ m.mux.RLock()
+ defer m.mux.RUnlock()
+ if !m.initialized {
+ if m.denyBaggageOnInitializationFailure {
+ return m.invalidRestriction
+ }
+ return m.validRestriction
+ }
+ if restriction, ok := m.restrictions[key]; ok {
+ return restriction
+ }
+ return m.invalidRestriction
+}
+
+// Close stops remote polling and closes the RemoteRestrictionManager.
+func (m *RestrictionManager) Close() error {
+ close(m.stopPoll)
+ m.pollStopped.Wait()
+ return nil
+}
+
+func (m *RestrictionManager) pollManager() {
+ defer m.pollStopped.Done()
+ // attempt to initialize baggage restrictions
+ if err := m.updateRestrictions(); err != nil {
+ m.logger.Error(fmt.Sprintf("Failed to initialize baggage restrictions: %s", err.Error()))
+ }
+ ticker := time.NewTicker(m.refreshInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ if err := m.updateRestrictions(); err != nil {
+ m.logger.Error(fmt.Sprintf("Failed to update baggage restrictions: %s", err.Error()))
+ }
+ case <-m.stopPoll:
+ return
+ }
+ }
+}
+
+func (m *RestrictionManager) updateRestrictions() error {
+ restrictions, err := m.thriftProxy.GetBaggageRestrictions(m.serviceName)
+ if err != nil {
+ m.metrics.BaggageRestrictionsUpdateFailure.Inc(1)
+ return err
+ }
+ newRestrictions := m.parseRestrictions(restrictions)
+ m.metrics.BaggageRestrictionsUpdateSuccess.Inc(1)
+ m.mux.Lock()
+ defer m.mux.Unlock()
+ m.initialized = true
+ m.restrictions = newRestrictions
+ return nil
+}
+
+func (m *RestrictionManager) parseRestrictions(restrictions []*thrift.BaggageRestriction) map[string]*baggage.Restriction {
+ setters := make(map[string]*baggage.Restriction, len(restrictions))
+ for _, restriction := range restrictions {
+ setters[restriction.BaggageKey] = baggage.NewRestriction(true, int(restriction.MaxValueLength))
+ }
+ return setters
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/baggage/restriction_manager.go b/vendor/github.com/uber/jaeger-client-go/internal/baggage/restriction_manager.go
new file mode 100644
index 000000000..c16a5c566
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/baggage/restriction_manager.go
@@ -0,0 +1,71 @@
+// Copyright (c) 2017 Uber Technologies, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package baggage
+
+const (
+ defaultMaxValueLength = 2048
+)
+
+// Restriction determines whether a baggage key is allowed and contains any restrictions on the baggage value.
+type Restriction struct {
+ keyAllowed bool
+ maxValueLength int
+}
+
+// NewRestriction returns a new Restriction.
+func NewRestriction(keyAllowed bool, maxValueLength int) *Restriction {
+ return &Restriction{
+ keyAllowed: keyAllowed,
+ maxValueLength: maxValueLength,
+ }
+}
+
+// KeyAllowed returns whether the baggage key for this restriction is allowed.
+func (r *Restriction) KeyAllowed() bool {
+ return r.keyAllowed
+}
+
+// MaxValueLength returns the max length for the baggage value.
+func (r *Restriction) MaxValueLength() int {
+ return r.maxValueLength
+}
+
+// RestrictionManager keeps track of valid baggage keys and their restrictions. The manager
+// will return a Restriction for a specific baggage key which will determine whether the baggage
+// key is allowed for the current service and any other applicable restrictions on the baggage
+// value.
+type RestrictionManager interface {
+ GetRestriction(service, key string) *Restriction
+}
+
+// DefaultRestrictionManager allows any baggage key.
+type DefaultRestrictionManager struct {
+ defaultRestriction *Restriction
+}
+
+// NewDefaultRestrictionManager returns a DefaultRestrictionManager.
+func NewDefaultRestrictionManager(maxValueLength int) *DefaultRestrictionManager {
+ if maxValueLength == 0 {
+ maxValueLength = defaultMaxValueLength
+ }
+ return &DefaultRestrictionManager{
+ defaultRestriction: &Restriction{keyAllowed: true, maxValueLength: maxValueLength},
+ }
+}
+
+// GetRestriction implements RestrictionManager#GetRestriction.
+func (m *DefaultRestrictionManager) GetRestriction(service, key string) *Restriction {
+ return m.defaultRestriction
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/spanlog/json.go b/vendor/github.com/uber/jaeger-client-go/internal/spanlog/json.go
new file mode 100644
index 000000000..0e10b8a5a
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/spanlog/json.go
@@ -0,0 +1,81 @@
+// Copyright (c) 2017 Uber Technologies, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package spanlog
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/opentracing/opentracing-go/log"
+)
+
+type fieldsAsMap map[string]string
+
+// MaterializeWithJSON converts log Fields into JSON string
+// TODO refactor into pluggable materializer
+func MaterializeWithJSON(logFields []log.Field) ([]byte, error) {
+ fields := fieldsAsMap(make(map[string]string, len(logFields)))
+ for _, field := range logFields {
+ field.Marshal(fields)
+ }
+ if event, ok := fields["event"]; ok && len(fields) == 1 {
+ return []byte(event), nil
+ }
+ return json.Marshal(fields)
+}
+
+func (ml fieldsAsMap) EmitString(key, value string) {
+ ml[key] = value
+}
+
+func (ml fieldsAsMap) EmitBool(key string, value bool) {
+ ml[key] = fmt.Sprintf("%t", value)
+}
+
+func (ml fieldsAsMap) EmitInt(key string, value int) {
+ ml[key] = fmt.Sprintf("%d", value)
+}
+
+func (ml fieldsAsMap) EmitInt32(key string, value int32) {
+ ml[key] = fmt.Sprintf("%d", value)
+}
+
+func (ml fieldsAsMap) EmitInt64(key string, value int64) {
+ ml[key] = fmt.Sprintf("%d", value)
+}
+
+func (ml fieldsAsMap) EmitUint32(key string, value uint32) {
+ ml[key] = fmt.Sprintf("%d", value)
+}
+
+func (ml fieldsAsMap) EmitUint64(key string, value uint64) {
+ ml[key] = fmt.Sprintf("%d", value)
+}
+
+func (ml fieldsAsMap) EmitFloat32(key string, value float32) {
+ ml[key] = fmt.Sprintf("%f", value)
+}
+
+func (ml fieldsAsMap) EmitFloat64(key string, value float64) {
+ ml[key] = fmt.Sprintf("%f", value)
+}
+
+func (ml fieldsAsMap) EmitObject(key string, value interface{}) {
+ ml[key] = fmt.Sprintf("%+v", value)
+}
+
+func (ml fieldsAsMap) EmitLazyLogger(value log.LazyLogger) {
+ value(ml)
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/options.go b/vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/options.go
new file mode 100644
index 000000000..f52c322fb
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/options.go
@@ -0,0 +1,99 @@
+// Copyright (c) 2018 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+ "time"
+
+ "github.com/uber/jaeger-client-go"
+)
+
+const (
+ defaultHostPort = "localhost:5778"
+ defaultRefreshInterval = time.Second * 5
+)
+
+// Option is a function that sets some option on the Throttler
+type Option func(options *options)
+
+// Options is a factory for all available options
+var Options options
+
+type options struct {
+ metrics *jaeger.Metrics
+ logger jaeger.Logger
+ hostPort string
+ refreshInterval time.Duration
+ synchronousInitialization bool
+}
+
+// Metrics creates an Option that initializes Metrics on the Throttler, which is used to emit statistics.
+func (options) Metrics(m *jaeger.Metrics) Option {
+ return func(o *options) {
+ o.metrics = m
+ }
+}
+
+// Logger creates an Option that sets the logger used by the Throttler.
+func (options) Logger(logger jaeger.Logger) Option {
+ return func(o *options) {
+ o.logger = logger
+ }
+}
+
+// HostPort creates an Option that sets the hostPort of the local agent that keeps track of credits.
+func (options) HostPort(hostPort string) Option {
+ return func(o *options) {
+ o.hostPort = hostPort
+ }
+}
+
+// RefreshInterval creates an Option that sets how often the Throttler will poll local agent for
+// credits.
+func (options) RefreshInterval(refreshInterval time.Duration) Option {
+ return func(o *options) {
+ o.refreshInterval = refreshInterval
+ }
+}
+
+// SynchronousInitialization creates an Option that determines whether the throttler should synchronously
+// fetch credits from the agent when an operation is seen for the first time. This should be set to true
+// if the client will be used by a short lived service that needs to ensure that credits are fetched upfront
+// such that sampling or throttling occurs.
+func (options) SynchronousInitialization(b bool) Option {
+ return func(o *options) {
+ o.synchronousInitialization = b
+ }
+}
+
+func applyOptions(o ...Option) options {
+ opts := options{}
+ for _, option := range o {
+ option(&opts)
+ }
+ if opts.metrics == nil {
+ opts.metrics = jaeger.NewNullMetrics()
+ }
+ if opts.logger == nil {
+ opts.logger = jaeger.NullLogger
+ }
+ if opts.hostPort == "" {
+ opts.hostPort = defaultHostPort
+ }
+ if opts.refreshInterval == 0 {
+ opts.refreshInterval = defaultRefreshInterval
+ }
+ return opts
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/throttler.go b/vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/throttler.go
new file mode 100644
index 000000000..20f434fe4
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/throttler/remote/throttler.go
@@ -0,0 +1,216 @@
+// Copyright (c) 2018 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+ "fmt"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+
+ "github.com/uber/jaeger-client-go"
+ "github.com/uber/jaeger-client-go/utils"
+)
+
+const (
+ // minimumCredits is the minimum amount of credits necessary to not be throttled.
+ // i.e. if currentCredits > minimumCredits, then the operation will not be throttled.
+ minimumCredits = 1.0
+)
+
+var (
+ errorUUIDNotSet = errors.New("Throttler UUID must be set")
+)
+
+type operationBalance struct {
+ Operation string `json:"operation"`
+ Balance float64 `json:"balance"`
+}
+
+type creditResponse struct {
+ Balances []operationBalance `json:"balances"`
+}
+
+type httpCreditManagerProxy struct {
+ hostPort string
+}
+
+func newHTTPCreditManagerProxy(hostPort string) *httpCreditManagerProxy {
+ return &httpCreditManagerProxy{
+ hostPort: hostPort,
+ }
+}
+
+// N.B. Operations list must not be empty.
+func (m *httpCreditManagerProxy) FetchCredits(uuid, serviceName string, operations []string) (*creditResponse, error) {
+ params := url.Values{}
+ params.Set("service", serviceName)
+ params.Set("uuid", uuid)
+ for _, op := range operations {
+ params.Add("operations", op)
+ }
+ var resp creditResponse
+ if err := utils.GetJSON(fmt.Sprintf("http://%s/credits?%s", m.hostPort, params.Encode()), &resp); err != nil {
+ return nil, errors.Wrap(err, "Failed to receive credits from agent")
+ }
+ return &resp, nil
+}
+
+// Throttler retrieves credits from agent and uses it to throttle operations.
+type Throttler struct {
+ options
+
+ mux sync.RWMutex
+ service string
+ uuid atomic.Value
+ creditManager *httpCreditManagerProxy
+ credits map[string]float64 // map of operation->credits
+ close chan struct{}
+ stopped sync.WaitGroup
+}
+
+// NewThrottler returns a Throttler that polls agent for credits and uses them to throttle
+// the service.
+func NewThrottler(service string, options ...Option) *Throttler {
+ opts := applyOptions(options...)
+ creditManager := newHTTPCreditManagerProxy(opts.hostPort)
+ t := &Throttler{
+ options: opts,
+ creditManager: creditManager,
+ service: service,
+ credits: make(map[string]float64),
+ close: make(chan struct{}),
+ }
+ t.stopped.Add(1)
+ go t.pollManager()
+ return t
+}
+
+// IsAllowed implements Throttler#IsAllowed.
+func (t *Throttler) IsAllowed(operation string) bool {
+ t.mux.Lock()
+ defer t.mux.Unlock()
+ value, ok := t.credits[operation]
+ if !ok || value == 0 {
+ if !ok {
+ // NOTE: This appears to be a no-op at first glance, but it stores
+ // the operation key in the map. Necessary for functionality of
+ // Throttler#operations method.
+ t.credits[operation] = 0
+ }
+ if !t.synchronousInitialization {
+ t.metrics.ThrottledDebugSpans.Inc(1)
+ return false
+ }
+ // If it is the first time this operation is being checked, synchronously fetch
+ // the credits.
+ credits, err := t.fetchCredits([]string{operation})
+ if err != nil {
+ // Failed to receive credits from agent, try again next time
+ t.logger.Error("Failed to fetch credits: " + err.Error())
+ return false
+ }
+ if len(credits.Balances) == 0 {
+ // This shouldn't happen but just in case
+ return false
+ }
+ for _, opBalance := range credits.Balances {
+ t.credits[opBalance.Operation] += opBalance.Balance
+ }
+ }
+ return t.isAllowed(operation)
+}
+
+// Close stops the throttler from fetching credits from remote.
+func (t *Throttler) Close() error {
+ close(t.close)
+ t.stopped.Wait()
+ return nil
+}
+
+// SetProcess implements ProcessSetter#SetProcess. It's imperative that the UUID is set before any remote
+// requests are made.
+func (t *Throttler) SetProcess(process jaeger.Process) {
+ if process.UUID != "" {
+ t.uuid.Store(process.UUID)
+ }
+}
+
+// N.B. This function must be called with the Write Lock
+func (t *Throttler) isAllowed(operation string) bool {
+ credits := t.credits[operation]
+ if credits < minimumCredits {
+ t.metrics.ThrottledDebugSpans.Inc(1)
+ return false
+ }
+ t.credits[operation] = credits - minimumCredits
+ return true
+}
+
+func (t *Throttler) pollManager() {
+ defer t.stopped.Done()
+ ticker := time.NewTicker(t.refreshInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ t.refreshCredits()
+ case <-t.close:
+ return
+ }
+ }
+}
+
+func (t *Throttler) operations() []string {
+ t.mux.RLock()
+ defer t.mux.RUnlock()
+ operations := make([]string, 0, len(t.credits))
+ for op := range t.credits {
+ operations = append(operations, op)
+ }
+ return operations
+}
+
+func (t *Throttler) refreshCredits() {
+ operations := t.operations()
+ if len(operations) == 0 {
+ return
+ }
+ newCredits, err := t.fetchCredits(operations)
+ if err != nil {
+ t.metrics.ThrottlerUpdateFailure.Inc(1)
+ t.logger.Error("Failed to fetch credits: " + err.Error())
+ return
+ }
+ t.metrics.ThrottlerUpdateSuccess.Inc(1)
+
+ t.mux.Lock()
+ defer t.mux.Unlock()
+ for _, opBalance := range newCredits.Balances {
+ t.credits[opBalance.Operation] += opBalance.Balance
+ }
+}
+
+func (t *Throttler) fetchCredits(operations []string) (*creditResponse, error) {
+ uuid := t.uuid.Load()
+ uuidStr, _ := uuid.(string)
+ if uuid == nil || uuidStr == "" {
+ return nil, errorUUIDNotSet
+ }
+ return t.creditManager.FetchCredits(uuidStr, t.service, operations)
+}
diff --git a/vendor/github.com/uber/jaeger-client-go/internal/throttler/throttler.go b/vendor/github.com/uber/jaeger-client-go/internal/throttler/throttler.go
new file mode 100644
index 000000000..196ed69ca
--- /dev/null
+++ b/vendor/github.com/uber/jaeger-client-go/internal/throttler/throttler.go
@@ -0,0 +1,32 @@
+// Copyright (c) 2018 The Jaeger Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package throttler
+
+// Throttler is used to rate limits operations. For example, given how debug spans
+// are always sampled, a throttler can be enabled per client to rate limit the amount
+// of debug spans a client can start.
+type Throttler interface {
+ // IsAllowed determines whether the operation should be allowed and not be
+ // throttled.
+ IsAllowed(operation string) bool
+}
+
+// DefaultThrottler doesn't throttle at all.
+type DefaultThrottler struct{}
+
+// IsAllowed implements Throttler#IsAllowed.
+func (t DefaultThrottler) IsAllowed(operation string) bool {
+ return true
+}