From 4113d2c2986922cb3a89c9a5ca04366110d505a3 Mon Sep 17 00:00:00 2001 From: "dependabot-preview[bot]" <27856297+dependabot-preview[bot]@users.noreply.github.com> Date: Tue, 14 Jul 2020 17:02:43 +0000 Subject: Bump github.com/uber/jaeger-client-go Bumps [github.com/uber/jaeger-client-go](https://github.com/uber/jaeger-client-go) from 2.24.0+incompatible to 2.25.0+incompatible. - [Release notes](https://github.com/uber/jaeger-client-go/releases) - [Changelog](https://github.com/jaegertracing/jaeger-client-go/blob/master/CHANGELOG.md) - [Commits](https://github.com/uber/jaeger-client-go/compare/v2.24.0...v2.25.0) Signed-off-by: dependabot-preview[bot] Signed-off-by: Daniel J Walsh --- .../github.com/uber/jaeger-client-go/CHANGELOG.md | 13 ++ vendor/github.com/uber/jaeger-client-go/Gopkg.lock | 75 +++++++- vendor/github.com/uber/jaeger-client-go/README.md | 2 + .../uber/jaeger-client-go/config/config.go | 25 ++- .../uber/jaeger-client-go/config/config_env.go | 57 ++++--- .../github.com/uber/jaeger-client-go/constants.go | 2 +- .../uber/jaeger-client-go/span_context.go | 8 +- vendor/github.com/uber/jaeger-client-go/tracer.go | 17 +- .../uber/jaeger-client-go/transport_udp.go | 41 ++++- .../utils/reconnecting_udp_conn.go | 189 +++++++++++++++++++++ .../uber/jaeger-client-go/utils/udp_client.go | 87 ++++++++-- 11 files changed, 456 insertions(+), 60 deletions(-) create mode 100644 vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go (limited to 'vendor/github.com/uber/jaeger-client-go') diff --git a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md index 6a7e3c5ca..cab87e9d6 100644 --- a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md +++ b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md @@ -1,6 +1,19 @@ Changes by Version ================== +2.25.0 (2020-07-13) +------------------- +## Breaking changes +- [feat] Periodically re-resolve UDP server address, with opt-out (#520) -- Trevor Foster + + The re-resolving of UDP address is now enabled by default, to make the client more robust in Kubernetes deployments. + The old resolve-once behavior can be restored by setting DisableAttemptReconnecting=true in the Configuration struct, + or via JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED=true environment variable. + +## Bug fixes +- Do not add invalid context to references (#521) -- Yuri Shkuro + + 2.24.0 (2020-06-14) ------------------- - Mention FromEnv() in the README, docs, and examples (#518) -- Martin Lercher diff --git a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock index 2a5215a50..387958b12 100644 --- a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock +++ b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock @@ -142,10 +142,19 @@ version = "v0.0.5" [[projects]] - digest = "1:0496f0e99014b7fd0a560c539f51d0882731137b85494142f47e550e4657176a" + digest = "1:ac83cf90d08b63ad5f7e020ef480d319ae890c208f8524622a2f3136e2686b02" + name = "github.com/stretchr/objx" + packages = ["."] + pruneopts = "UT" + revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c" + version = "v0.1.1" + +[[projects]] + digest = "1:d88ba57c4e8f5db6ce9ab6605a89f4542ee751b576884ba5271c2ba3d4b6f2d2" name = "github.com/stretchr/testify" packages = [ "assert", + "mock", "require", "suite", ] @@ -153,6 +162,42 @@ revision = "221dbe5ed46703ee255b1da0dec05086f5035f62" version = "v1.4.0" +[[projects]] + digest = "1:5b98956718573850caf7e0fd00b571a6657c4ef1f345ddf0c96b43ce355fe862" + name = "github.com/uber/jaeger-client-go" + packages = [ + ".", + "config", + "crossdock/client", + "crossdock/common", + "crossdock/endtoend", + "crossdock/log", + "crossdock/server", + "crossdock/thrift/tracetest", + "internal/baggage", + "internal/baggage/remote", + "internal/reporterstats", + "internal/spanlog", + "internal/throttler", + "internal/throttler/remote", + "log", + "log/zap/mock_opentracing", + "rpcmetrics", + "testutils", + "thrift", + "thrift-gen/agent", + "thrift-gen/baggage", + "thrift-gen/jaeger", + "thrift-gen/sampling", + "thrift-gen/zipkincore", + "transport", + "transport/zipkin", + "utils", + ] + pruneopts = "UT" + revision = "66c008c3d6ad856cac92a0af53186efbffa8e6a5" + version = "v2.24.0" + [[projects]] digest = "1:0ec60ffd594af00ba1660bc746aa0e443d27dd4003dee55f9d08a0b4ff5431a3" name = "github.com/uber/jaeger-lib" @@ -314,8 +359,36 @@ "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", "github.com/stretchr/testify/require", "github.com/stretchr/testify/suite", + "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", + "github.com/uber/jaeger-client-go/crossdock/client", + "github.com/uber/jaeger-client-go/crossdock/common", + "github.com/uber/jaeger-client-go/crossdock/endtoend", + "github.com/uber/jaeger-client-go/crossdock/log", + "github.com/uber/jaeger-client-go/crossdock/server", + "github.com/uber/jaeger-client-go/crossdock/thrift/tracetest", + "github.com/uber/jaeger-client-go/internal/baggage", + "github.com/uber/jaeger-client-go/internal/baggage/remote", + "github.com/uber/jaeger-client-go/internal/reporterstats", + "github.com/uber/jaeger-client-go/internal/spanlog", + "github.com/uber/jaeger-client-go/internal/throttler", + "github.com/uber/jaeger-client-go/internal/throttler/remote", + "github.com/uber/jaeger-client-go/log", + "github.com/uber/jaeger-client-go/log/zap/mock_opentracing", + "github.com/uber/jaeger-client-go/rpcmetrics", + "github.com/uber/jaeger-client-go/testutils", + "github.com/uber/jaeger-client-go/thrift", + "github.com/uber/jaeger-client-go/thrift-gen/agent", + "github.com/uber/jaeger-client-go/thrift-gen/baggage", + "github.com/uber/jaeger-client-go/thrift-gen/jaeger", + "github.com/uber/jaeger-client-go/thrift-gen/sampling", + "github.com/uber/jaeger-client-go/thrift-gen/zipkincore", + "github.com/uber/jaeger-client-go/transport", + "github.com/uber/jaeger-client-go/transport/zipkin", + "github.com/uber/jaeger-client-go/utils", "github.com/uber/jaeger-lib/metrics", "github.com/uber/jaeger-lib/metrics/metricstest", "github.com/uber/jaeger-lib/metrics/prometheus", diff --git a/vendor/github.com/uber/jaeger-client-go/README.md b/vendor/github.com/uber/jaeger-client-go/README.md index e7b13b1c2..687f5780c 100644 --- a/vendor/github.com/uber/jaeger-client-go/README.md +++ b/vendor/github.com/uber/jaeger-client-go/README.md @@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`). JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`). JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`). +JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`). +JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`). JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/. JAEGER_SAMPLER_PARAM | The sampler parameter (number). JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler. diff --git a/vendor/github.com/uber/jaeger-client-go/config/config.go b/vendor/github.com/uber/jaeger-client-go/config/config.go index e6ffb987d..bb1228294 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/utils" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/internal/baggage/remote" @@ -124,6 +125,17 @@ type ReporterConfig struct { // Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT LocalAgentHostPort string `yaml:"localAgentHostPort"` + // DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves + // the agent's hostname and reconnects if there was a change. This option only + // applies if LocalAgentHostPort is specified. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED + DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"` + + // AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname + // in order to detect address changes. This option only applies if DisableAttemptReconnecting is false. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL + AttemptReconnectInterval time.Duration + // CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL. // Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT CollectorEndpoint string `yaml:"collectorEndpoint"` @@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter( metrics *jaeger.Metrics, logger jaeger.Logger, ) (jaeger.Reporter, error) { - sender, err := rc.newTransport() + sender, err := rc.newTransport(logger) if err != nil { return nil, err } @@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter( return reporter, err } -func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { +func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) { switch { case rc.CollectorEndpoint != "": httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)} @@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { } return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil default: - return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) + return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: rc.LocalAgentHostPort, + Logger: logger, + DisableAttemptReconnecting: rc.DisableAttemptReconnecting, + AttemptReconnectInterval: rc.AttemptReconnectInterval, + }, + }) } } diff --git a/vendor/github.com/uber/jaeger-client-go/config/config_env.go b/vendor/github.com/uber/jaeger-client-go/config/config_env.go index f38eb9d93..92d60cd59 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config_env.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config_env.go @@ -24,30 +24,31 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/uber/jaeger-client-go" ) const ( // environment variable names - envServiceName = "JAEGER_SERVICE_NAME" - envDisabled = "JAEGER_DISABLED" - envRPCMetrics = "JAEGER_RPC_METRICS" - envTags = "JAEGER_TAGS" - envSamplerType = "JAEGER_SAMPLER_TYPE" - envSamplerParam = "JAEGER_SAMPLER_PARAM" - envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint - envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" - envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" - envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" - envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" - envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" - envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" - envEndpoint = "JAEGER_ENDPOINT" - envUser = "JAEGER_USER" - envPassword = "JAEGER_PASSWORD" - envAgentHost = "JAEGER_AGENT_HOST" - envAgentPort = "JAEGER_AGENT_PORT" + envServiceName = "JAEGER_SERVICE_NAME" + envDisabled = "JAEGER_DISABLED" + envRPCMetrics = "JAEGER_RPC_METRICS" + envTags = "JAEGER_TAGS" + envSamplerType = "JAEGER_SAMPLER_TYPE" + envSamplerParam = "JAEGER_SAMPLER_PARAM" + envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint + envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" + envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" + envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" + envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" + envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" + envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" + envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED" + envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL" + envEndpoint = "JAEGER_ENDPOINT" + envUser = "JAEGER_USER" + envPassword = "JAEGER_PASSWORD" + envAgentHost = "JAEGER_AGENT_HOST" + envAgentPort = "JAEGER_AGENT_PORT" ) // FromEnv uses environment variables to set the tracer's Configuration @@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) { if useEnv || rc.LocalAgentHostPort == "" { rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port) } + + if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" { + if value, err := strconv.ParseBool(e); err == nil { + rc.DisableAttemptReconnecting = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e) + } + } + + if !rc.DisableAttemptReconnecting { + if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" { + if value, err := time.ParseDuration(e); err == nil { + rc.AttemptReconnectInterval = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e) + } + } + } } return rc, nil diff --git a/vendor/github.com/uber/jaeger-client-go/constants.go b/vendor/github.com/uber/jaeger-client-go/constants.go index feaf344ad..2f63d5909 100644 --- a/vendor/github.com/uber/jaeger-client-go/constants.go +++ b/vendor/github.com/uber/jaeger-client-go/constants.go @@ -22,7 +22,7 @@ import ( const ( // JaegerClientVersion is the version of the client library reported as Span tag. - JaegerClientVersion = "Go-2.24.0" + JaegerClientVersion = "Go-2.25.0" // JaegerClientVersionTagKey is the name of the tag used to report client version. JaegerClientVersionTagKey = "jaeger.version" diff --git a/vendor/github.com/uber/jaeger-client-go/span_context.go b/vendor/github.com/uber/jaeger-client-go/span_context.go index 1b44f3f8c..ae9d94a9a 100644 --- a/vendor/github.com/uber/jaeger-client-go/span_context.go +++ b/vendor/github.com/uber/jaeger-client-go/span_context.go @@ -212,10 +212,14 @@ func (c SpanContext) SetFirehose() { } func (c SpanContext) String() string { + var flags int32 + if c.samplingState != nil { + flags = c.samplingState.stateFlags.Load() + } if c.traceID.High == 0 { - return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags) } - return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags) } // ContextFromString reconstructs the Context encoded in a string diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go index 8a3fc97ab..477c6eae3 100644 --- a/vendor/github.com/uber/jaeger-client-go/tracer.go +++ b/vendor/github.com/uber/jaeger-client-go/tracer.go @@ -216,10 +216,10 @@ func (t *Tracer) startSpanWithOptions( options.StartTime = t.timeNow() } - // Predicate whether the given span context is a valid reference - // which may be used as parent / debug ID / baggage items source - isValidReference := func(ctx SpanContext) bool { - return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0 + // Predicate whether the given span context is an empty reference + // or may be used as parent / debug ID / baggage items source + isEmptyReference := func(ctx SpanContext) bool { + return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0 } var references []Reference @@ -235,7 +235,7 @@ func (t *Tracer) startSpanWithOptions( reflect.ValueOf(ref.ReferencedContext))) continue } - if !isValidReference(ctxRef) { + if isEmptyReference(ctxRef) { continue } @@ -245,14 +245,17 @@ func (t *Tracer) startSpanWithOptions( continue } - references = append(references, Reference{Type: ref.Type, Context: ctxRef}) + if ctxRef.IsValid() { + // we don't want empty context that contains only debug-id or baggage + references = append(references, Reference{Type: ref.Type, Context: ctxRef}) + } if !hasParent { parent = ctxRef hasParent = ref.Type == opentracing.ChildOfRef } } - if !hasParent && isValidReference(parent) { + if !hasParent && !isEmptyReference(parent) { // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from // the FollowFromRef as the parent hasParent = true diff --git a/vendor/github.com/uber/jaeger-client-go/transport_udp.go b/vendor/github.com/uber/jaeger-client-go/transport_udp.go index 7370d8007..5734819ab 100644 --- a/vendor/github.com/uber/jaeger-client-go/transport_udp.go +++ b/vendor/github.com/uber/jaeger-client-go/transport_udp.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/uber/jaeger-client-go/internal/reporterstats" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/utils" @@ -57,35 +58,57 @@ type udpSender struct { failedToEmitSpans int64 } -// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should +// be passed to NewUDPTransportWithParams. +type UDPTransportParams struct { + utils.AgentClientUDPParams +} + +// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent. // TODO: (breaking change) move to transport/ package. -func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { - if len(hostPort) == 0 { - hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) +func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) { + if len(params.HostPort) == 0 { + params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) } - if maxPacketSize == 0 { - maxPacketSize = utils.UDPPacketMaxLength + + if params.Logger == nil { + params.Logger = log.StdLogger + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = utils.UDPPacketMaxLength } protocolFactory := thrift.NewTCompactProtocolFactory() // Each span is first written to thriftBuffer to determine its size in bytes. - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) thriftProtocol := protocolFactory.GetProtocol(thriftBuffer) - client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize) + client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams) if err != nil { return nil, err } return &udpSender{ client: client, - maxSpanBytes: maxPacketSize - emitBatchOverhead, + maxSpanBytes: params.MaxPacketSize - emitBatchOverhead, thriftBuffer: thriftBuffer, thriftProtocol: thriftProtocol, }, nil } +// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// TODO: (breaking change) move to transport/ package. +func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { + return NewUDPTransportWithParams(UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }, + }) +} + // SetReporterStats implements reporterstats.Receiver. func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) { s.reporterStats = rs diff --git a/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go b/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go new file mode 100644 index 000000000..0dffc7fa2 --- /dev/null +++ b/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go @@ -0,0 +1,189 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/uber/jaeger-client-go/log" +) + +// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +type reconnectingUDPConn struct { + hostPort string + resolveFunc resolveFunc + dialFunc dialFunc + logger log.Logger + bufferBytes int64 + + connMtx sync.RWMutex + conn *net.UDPConn + destAddr *net.UDPAddr + closeChan chan struct{} +} + +type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) +type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) + +// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) { + conn := &reconnectingUDPConn{ + hostPort: hostPort, + resolveFunc: resolveFunc, + dialFunc: dialFunc, + logger: logger, + closeChan: make(chan struct{}), + } + + if err := conn.attemptResolveAndDial(); err != nil { + logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)) + } + + go conn.reconnectLoop(resolveTimeout) + + return conn, nil +} + +func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { + ticker := time.NewTicker(resolveTimeout) + defer ticker.Stop() + + for { + select { + case <-c.closeChan: + return + case <-ticker.C: + if err := c.attemptResolveAndDial(); err != nil { + c.logger.Error(err.Error()) + } + } + } +} + +func (c *reconnectingUDPConn) attemptResolveAndDial() error { + newAddr, err := c.resolveFunc("udp", c.hostPort) + if err != nil { + return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) + } + + c.connMtx.RLock() + curAddr := c.destAddr + c.connMtx.RUnlock() + + // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn + if curAddr != nil && newAddr.String() == curAddr.String() { + return nil + } + + if err := c.attemptDialNewAddr(newAddr); err != nil { + return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) + } + + return nil +} + +func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { + connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) + if err != nil { + return err + } + + if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { + if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { + return err + } + } + + c.connMtx.Lock() + c.destAddr = newAddr + // store prev to close later + prevConn := c.conn + c.conn = connUDP + c.connMtx.Unlock() + + if prevConn != nil { + return prevConn.Close() + } + + return nil +} + +// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning +func (c *reconnectingUDPConn) Write(b []byte) (int, error) { + var bytesWritten int + var err error + + c.connMtx.RLock() + if c.conn == nil { + // if connection is not initialized indicate this with err in order to hook into retry logic + err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") + } else { + bytesWritten, err = c.conn.Write(b) + } + c.connMtx.RUnlock() + + if err == nil { + return bytesWritten, nil + } + + // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again + if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { + c.connMtx.RLock() + defer c.connMtx.RUnlock() + return c.conn.Write(b) + } + + // return original error if reconn fails + return bytesWritten, err +} + +// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation +func (c *reconnectingUDPConn) Close() error { + close(c.closeChan) + + // acquire rw lock before closing conn to ensure calls to Write drain + c.connMtx.Lock() + defer c.connMtx.Unlock() + + if c.conn != nil { + return c.conn.Close() + } + + return nil +} + +// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held +// and SetWriteBuffer is called store bufferBytes to be set for new conns +func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { + var err error + + c.connMtx.RLock() + if c.conn != nil { + err = c.conn.SetWriteBuffer(bytes) + } + c.connMtx.RUnlock() + + if err == nil { + atomic.StoreInt64(&c.bufferBytes, int64(bytes)) + } + + return err +} diff --git a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go index fadd73e49..2352643ce 100644 --- a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go +++ b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go @@ -19,7 +19,9 @@ import ( "fmt" "io" "net" + "time" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" "github.com/uber/jaeger-client-go/thrift-gen/agent" @@ -35,41 +37,90 @@ type AgentClientUDP struct { agent.Agent io.Closer - connUDP *net.UDPConn + connUDP udpConn client *agent.AgentClient maxPacketSize int // max size of datagram in bytes thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span } -// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. -func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { - if maxPacketSize == 0 { - maxPacketSize = UDPPacketMaxLength +type udpConn interface { + Write([]byte) (int, error) + SetWriteBuffer(int) error + Close() error +} + +// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should +// be passed to NewAgentClientUDPWithParams. +type AgentClientUDPParams struct { + HostPort string + MaxPacketSize int + Logger log.Logger + DisableAttemptReconnecting bool + AttemptReconnectInterval time.Duration +} + +// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) { + // validate hostport + if _, _, err := net.SplitHostPort(params.HostPort); err != nil { + return nil, err + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = UDPPacketMaxLength + } + + if params.Logger == nil { + params.Logger = log.StdLogger } - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 { + params.AttemptReconnectInterval = time.Second * 30 + } + + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) protocolFactory := thrift.NewTCompactProtocolFactory() client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory) - destAddr, err := net.ResolveUDPAddr("udp", hostPort) - if err != nil { - return nil, err - } + var connUDP udpConn + var err error - connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr) - if err != nil { - return nil, err + if params.DisableAttemptReconnecting { + destAddr, err := net.ResolveUDPAddr("udp", params.HostPort) + if err != nil { + return nil, err + } + + connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) + if err != nil { + return nil, err + } + } else { + // host is hostname, setup resolver loop in case host record changes during operation + connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) + if err != nil { + return nil, err + } } - if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil { + + if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { return nil, err } - clientUDP := &AgentClientUDP{ + return &AgentClientUDP{ connUDP: connUDP, client: client, - maxPacketSize: maxPacketSize, - thriftBuffer: thriftBuffer} - return clientUDP, nil + maxPacketSize: params.MaxPacketSize, + thriftBuffer: thriftBuffer, + }, nil +} + +// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { + return NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }) } // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface -- cgit v1.2.3-54-g00ecf