diff options
Diffstat (limited to 'vendor/github.com')
25 files changed, 703 insertions, 104 deletions
diff --git a/vendor/github.com/containers/common/pkg/config/config.go b/vendor/github.com/containers/common/pkg/config/config.go index c652a66f2..80c478505 100644 --- a/vendor/github.com/containers/common/pkg/config/config.go +++ b/vendor/github.com/containers/common/pkg/config/config.go @@ -165,9 +165,12 @@ type ContainersConfig struct { // ShmSize holds the size of /dev/shm. ShmSize string `toml:"shm_size,omitempty"` - //TZ sets the timezone inside the container + // TZ sets the timezone inside the container TZ string `toml:"tz,omitempty"` + // Umask is the umask inside the container. + Umask string `toml:"umask,omitempty"` + // UTSNS indicates how to create a UTS namespace for the container UTSNS string `toml:"utsns,omitempty"` @@ -198,7 +201,7 @@ type EngineConfig struct { // The first path pointing to a valid file will be used. ConmonPath []string `toml:"conmon_path,omitempty"` - //DetachKeys is the sequence of keys used to detach a container. + // DetachKeys is the sequence of keys used to detach a container. DetachKeys string `toml:"detach_keys,omitempty"` // EnablePortReservation determines whether engine will reserve ports on the @@ -272,12 +275,20 @@ type EngineConfig struct { // Indicates whether the application should be running in Remote mode Remote bool `toml:"-"` + // RemoteURI is deprecated, see ActiveService // RemoteURI containers connection information used to connect to remote system. RemoteURI string `toml:"remote_uri,omitempty"` - // Identity key file for RemoteURI + // RemoteIdentity is deprecated, ServiceDestinations + // RemoteIdentity key file for RemoteURI RemoteIdentity string `toml:"remote_identity,omitempty"` + // ActiveService index to Destinations added v2.0.3 + ActiveService string `toml:"active_service,omitempty"` + + // Destinations mapped by service Names + ServiceDestinations map[string]Destination `toml:"service_destinations,omitempty"` + // RuntimePath is the path to OCI runtime binary for launching containers. // The first path pointing to a valid file will be used This is used only // when there are no OCIRuntime/OCIRuntimes defined. It is used only to be @@ -393,6 +404,15 @@ type NetworkConfig struct { NetworkConfigDir string `toml:"network_config_dir,omitempty"` } +// Destination represents destination for remote service +type Destination struct { + // URI, required. Example: ssh://root@example.com:22/run/podman/podman.sock + URI string `toml:"uri"` + + // Identity file with ssh key, optional + Identity string `toml:"identity,omitempty"` +} + // NewConfig creates a new Config. It starts with an empty config and, if // specified, merges the config at `userConfigPath` path. Depending if we're // running as root or rootless, we then merge the system configuration followed @@ -582,6 +602,10 @@ func (c *ContainersConfig) Validate() error { return err } + if err := c.validateUmask(); err != nil { + return err + } + if c.LogSizeMax >= 0 && c.LogSizeMax < OCIBufSize { return fmt.Errorf("log size max should be negative or >= %d", OCIBufSize) } @@ -598,9 +622,17 @@ func (c *ContainersConfig) Validate() error { // execution checks. It returns an `error` on validation failure, otherwise // `nil`. func (c *NetworkConfig) Validate() error { - if c.NetworkConfigDir != _cniConfigDir { - err := isDirectory(c.NetworkConfigDir) + expectedConfigDir := _cniConfigDir + if unshare.IsRootless() { + home, err := unshare.HomeDir() if err != nil { + return err + } + expectedConfigDir = filepath.Join(home, _cniConfigDirRootless) + } + if c.NetworkConfigDir != expectedConfigDir { + err := isDirectory(c.NetworkConfigDir) + if err != nil && !os.IsNotExist(err) { return errors.Wrapf(err, "invalid network_config_dir: %s", c.NetworkConfigDir) } } @@ -828,9 +860,9 @@ func stringsEq(a, b []string) bool { } var ( - configOnce sync.Once - configErr error - config *Config + configErr error + configMutex sync.Mutex + config *Config ) // Default returns the default container config. @@ -845,9 +877,16 @@ var ( // The system defaults container config files can be overwritten using the // CONTAINERS_CONF environment variable. This is usually done for testing. func Default() (*Config, error) { - configOnce.Do(func() { - config, configErr = NewConfig("") - }) + configMutex.Lock() + defer configMutex.Unlock() + if config != nil || configErr != nil { + return config, configErr + } + return defConfig() +} + +func defConfig() (*Config, error) { + config, configErr = NewConfig("") return config, configErr } @@ -879,8 +918,8 @@ func customConfigFile() (string, error) { return OverrideContainersConfig, nil } -//ReadCustomConfig reads the custom config and only generates a config based on it -//If the custom config file does not exists, function will return an empty config +// ReadCustomConfig reads the custom config and only generates a config based on it +// If the custom config file does not exists, function will return an empty config func ReadCustomConfig() (*Config, error) { path, err := customConfigFile() if err != nil { @@ -936,3 +975,34 @@ func (c *Config) Write() error { } return nil } + +// Reload clean the cached config and reloads the configuration from containers.conf files +// This function is meant to be used for long-running processes that need to reload potential changes made to +// the cached containers.conf files. +func Reload() (*Config, error) { + configMutex.Lock() + defer configMutex.Unlock() + return defConfig() +} + +func (c *Config) ActiveDestination() (string, string, error) { + if uri, found := os.LookupEnv("CONTAINER_HOST"); found { + var ident string + if v, found := os.LookupEnv("CONTAINER_SSHKEY"); found { + ident = v + } + return uri, ident, nil + } + + switch { + case c.Engine.ActiveService != "": + d, found := c.Engine.ServiceDestinations[c.Engine.ActiveService] + if !found { + return "", "", errors.Errorf("%q service destination not found", c.Engine.ActiveService) + } + return d.URI, d.Identity, nil + case c.Engine.RemoteURI != "": + return c.Engine.RemoteURI, c.Engine.RemoteIdentity, nil + } + return "", "", errors.New("no service destination configured") +} diff --git a/vendor/github.com/containers/common/pkg/config/config_local.go b/vendor/github.com/containers/common/pkg/config/config_local.go index a6ab33c50..282eb80b7 100644 --- a/vendor/github.com/containers/common/pkg/config/config_local.go +++ b/vendor/github.com/containers/common/pkg/config/config_local.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "regexp" "syscall" units "github.com/docker/go-units" @@ -88,6 +89,14 @@ func (c *ContainersConfig) validateTZ() error { return nil } +func (c *ContainersConfig) validateUmask() error { + validUmask := regexp.MustCompile(`^[0-7]{1,4}$`) + if !validUmask.MatchString(c.Umask) { + return fmt.Errorf("Not a valid Umask %s", c.Umask) + } + return nil +} + func isRemote() bool { return false } diff --git a/vendor/github.com/containers/common/pkg/config/config_remote.go b/vendor/github.com/containers/common/pkg/config/config_remote.go index 61dd159ad..7fd9202bb 100644 --- a/vendor/github.com/containers/common/pkg/config/config_remote.go +++ b/vendor/github.com/containers/common/pkg/config/config_remote.go @@ -27,3 +27,7 @@ func (c *ContainersConfig) validateUlimits() error { func (c *ContainersConfig) validateTZ() error { return nil } + +func (c *ContainersConfig) validateUmask() error { + return nil +} diff --git a/vendor/github.com/containers/common/pkg/config/containers.conf b/vendor/github.com/containers/common/pkg/config/containers.conf index 80afbb9bc..780df2a22 100644 --- a/vendor/github.com/containers/common/pkg/config/containers.conf +++ b/vendor/github.com/containers/common/pkg/config/containers.conf @@ -210,6 +210,10 @@ # # tz = "" +# Set umask inside the container +# +# umask="0022" + # Default way to to create a UTS namespace for the container # Options are: # `private` Create private UTS Namespace for the container. @@ -340,14 +344,6 @@ # Whether to pull new image before running a container # pull_policy = "missing" -# Default Remote URI to access the Podman service. -# Examples: -# rootless "unix://run/user/$UID/podman/podman.sock" (Default) -# rootfull "unix://run/podman/podman.sock.(Default) -# remote rootless ssh://engineering.lab.company.com/run/user/1000/podman/podman.sock -# remote rootfull ssh://root@10.10.1.136:22/run/podman/podman.sock -# remote_uri= "" - # Directory for persistent engine files (database, etc) # By default, this will be configured relative to where the containers/storage # stores containers @@ -386,6 +382,22 @@ # Number of seconds to wait for container to exit before sending kill signal. # stop_timeout = 10 +# Index to the active service +# active_service = production + +# map of service destinations +# [service_destinations] +# [service_destinations.production] +# URI to access the Podman service +# Examples: +# rootless "unix://run/user/$UID/podman/podman.sock" (Default) +# rootfull "unix://run/podman/podman.sock (Default) +# remote rootless ssh://engineering.lab.company.com/run/user/1000/podman/podman.sock +# remote rootfull ssh://root@10.10.1.136:22/run/podman/podman.sock +# uri="ssh://user@production.example.com/run/user/1001/podman/podman.sock" +# Path to file containing ssh identity key +# identity = "~/.ssh/id_rsa" + # Paths to look for a valid OCI runtime (runc, runv, kata, etc) [engine.runtimes] # runc = [ diff --git a/vendor/github.com/containers/common/pkg/config/default.go b/vendor/github.com/containers/common/pkg/config/default.go index e7a1836bd..57b703f53 100644 --- a/vendor/github.com/containers/common/pkg/config/default.go +++ b/vendor/github.com/containers/common/pkg/config/default.go @@ -92,8 +92,10 @@ const ( // InstallPrefix is the prefix where podman will be installed. // It can be overridden at build time. _installPrefix = "/usr" - // _cniConfigDir is the directory where cni plugins are found + // _cniConfigDir is the directory where cni configuration is found _cniConfigDir = "/etc/cni/net.d/" + // _cniConfigDirRootless is the directory where cni plugins are found + _cniConfigDirRootless = ".config/cni/net.d/" // CgroupfsCgroupsManager represents cgroupfs native cgroup manager CgroupfsCgroupsManager = "cgroupfs" // DefaultApparmorProfile specifies the default apparmor profile for the container. @@ -138,6 +140,8 @@ func DefaultConfig() (*Config, error) { netns := "bridge" + cniConfig := _cniConfigDir + defaultEngineConfig.SignaturePolicyPath = DefaultSignaturePolicyPath if unshare.IsRootless() { home, err := unshare.HomeDir() @@ -152,6 +156,7 @@ func DefaultConfig() (*Config, error) { } } netns = "slirp4netns" + cniConfig = filepath.Join(home, _cniConfigDirRootless) } cgroupNS := "host" @@ -191,13 +196,14 @@ func DefaultConfig() (*Config, error) { SeccompProfile: SeccompDefaultPath, ShmSize: DefaultShmSize, TZ: "", + Umask: "0022", UTSNS: "private", UserNS: "host", UserNSSize: DefaultUserNSSize, }, Network: NetworkConfig{ DefaultNetwork: "podman", - NetworkConfigDir: _cniConfigDir, + NetworkConfigDir: cniConfig, CNIPluginDirs: cniBinDir, }, Engine: *defaultEngineConfig, @@ -504,3 +510,7 @@ func (c *Config) DetachKeys() string { func (c *Config) TZ() string { return c.Containers.TZ } + +func (c *Config) Umask() string { + return c.Containers.Umask +} diff --git a/vendor/github.com/containers/common/pkg/config/systemd.go b/vendor/github.com/containers/common/pkg/config/systemd.go index e02f52192..02e5c4ac2 100644 --- a/vendor/github.com/containers/common/pkg/config/systemd.go +++ b/vendor/github.com/containers/common/pkg/config/systemd.go @@ -2,7 +2,17 @@ package config +import ( + "github.com/containers/common/pkg/cgroupv2" + "github.com/containers/storage/pkg/unshare" +) + func defaultCgroupManager() string { + enabled, err := cgroupv2.Enabled() + if err == nil && !enabled && unshare.IsRootless() { + return CgroupfsCgroupsManager + } + return SystemdCgroupsManager } func defaultEventsLogger() string { diff --git a/vendor/github.com/containers/common/version/version.go b/vendor/github.com/containers/common/version/version.go index 741e9bd03..6b226eabe 100644 --- a/vendor/github.com/containers/common/version/version.go +++ b/vendor/github.com/containers/common/version/version.go @@ -1,4 +1,4 @@ package version // Version is the version of the build. -const Version = "0.15.2" +const Version = "0.18.0" diff --git a/vendor/github.com/containers/storage/VERSION b/vendor/github.com/containers/storage/VERSION index 284497740..0369d0b1e 100644 --- a/vendor/github.com/containers/storage/VERSION +++ b/vendor/github.com/containers/storage/VERSION @@ -1 +1 @@ -1.21.1 +1.21.2 diff --git a/vendor/github.com/containers/storage/drivers/devmapper/deviceset.go b/vendor/github.com/containers/storage/drivers/devmapper/deviceset.go index b7a95bfc2..cba3d05ea 100644 --- a/vendor/github.com/containers/storage/drivers/devmapper/deviceset.go +++ b/vendor/github.com/containers/storage/drivers/devmapper/deviceset.go @@ -1749,7 +1749,7 @@ func (devices *DeviceSet) initDevmapper(doInit bool) (retErr error) { // - Managed by container storage // - The target of this device is at major <maj> and minor <min> // - If <inode> is defined, use that file inside the device as a loopback image. Otherwise use the device itself. - devices.devicePrefix = fmt.Sprintf("container-%d:%d-%d", major(st.Dev), minor(st.Dev), st.Ino) + devices.devicePrefix = fmt.Sprintf("container-%d:%d-%d", major(uint64(st.Dev)), minor(uint64(st.Dev)), st.Ino) logrus.Debugf("devmapper: Generated prefix: %s", devices.devicePrefix) // Check for the existence of the thin-pool device diff --git a/vendor/github.com/containers/storage/pkg/archive/archive.go b/vendor/github.com/containers/storage/pkg/archive/archive.go index 863465456..78744e0f3 100644 --- a/vendor/github.com/containers/storage/pkg/archive/archive.go +++ b/vendor/github.com/containers/storage/pkg/archive/archive.go @@ -393,13 +393,15 @@ func fillGo18FileTypeBits(mode int64, fi os.FileInfo) int64 { // ReadSecurityXattrToTarHeader reads security.capability, security,image // xattrs from filesystem to a tar header func ReadSecurityXattrToTarHeader(path string, hdr *tar.Header) error { + if hdr.Xattrs == nil { + hdr.Xattrs = make(map[string]string) + } for _, xattr := range []string{"security.capability", "security.ima"} { capability, err := system.Lgetxattr(path, xattr) if err != nil && err != system.EOPNOTSUPP && err != system.ErrNotSupportedPlatform { return errors.Wrapf(err, "failed to read %q attribute from %q", xattr, path) } if capability != nil { - hdr.Xattrs = make(map[string]string) hdr.Xattrs[xattr] = string(capability) } } diff --git a/vendor/github.com/containers/storage/pkg/unshare/unshare.go b/vendor/github.com/containers/storage/pkg/unshare/unshare.go index 1eff82e8e..a08fb674d 100644 --- a/vendor/github.com/containers/storage/pkg/unshare/unshare.go +++ b/vendor/github.com/containers/storage/pkg/unshare/unshare.go @@ -4,19 +4,30 @@ import ( "fmt" "os" "os/user" + "sync" "github.com/pkg/errors" ) +var ( + homeDirOnce sync.Once + homeDirErr error + homeDir string +) + // HomeDir returns the home directory for the current user. func HomeDir() (string, error) { - home := os.Getenv("HOME") - if home == "" { - usr, err := user.LookupId(fmt.Sprintf("%d", GetRootlessUID())) - if err != nil { - return "", errors.Wrapf(err, "unable to resolve HOME directory") + homeDirOnce.Do(func() { + home := os.Getenv("HOME") + if home == "" { + usr, err := user.LookupId(fmt.Sprintf("%d", GetRootlessUID())) + if err != nil { + homeDir, homeDirErr = "", errors.Wrapf(err, "unable to resolve HOME directory") + return + } + homeDir, homeDirErr = usr.HomeDir, nil } - home = usr.HomeDir - } - return home, nil + homeDir, homeDirErr = home, nil + }) + return homeDir, homeDirErr } diff --git a/vendor/github.com/opencontainers/runtime-tools/generate/generate.go b/vendor/github.com/opencontainers/runtime-tools/generate/generate.go index 6d3268902..c757c20e0 100644 --- a/vendor/github.com/opencontainers/runtime-tools/generate/generate.go +++ b/vendor/github.com/opencontainers/runtime-tools/generate/generate.go @@ -29,6 +29,9 @@ var ( type Generator struct { Config *rspec.Spec HostSpecific bool + // This is used to keep a cache of the ENVs added to improve + // performance when adding a huge number of ENV variables + envMap map[string]int } // ExportOptions have toggles for exporting only certain parts of the specification @@ -236,7 +239,12 @@ func New(os string) (generator Generator, err error) { } } - return Generator{Config: &config}, nil + envCache := map[string]int{} + if config.Process != nil { + envCache = createEnvCacheMap(config.Process.Env) + } + + return Generator{Config: &config, envMap: envCache}, nil } // NewFromSpec creates a configuration Generator from a given @@ -246,8 +254,14 @@ func New(os string) (generator Generator, err error) { // // generator := Generator{Config: config} func NewFromSpec(config *rspec.Spec) Generator { + envCache := map[string]int{} + if config != nil && config.Process != nil { + envCache = createEnvCacheMap(config.Process.Env) + } + return Generator{ Config: config, + envMap: envCache, } } @@ -273,11 +287,27 @@ func NewFromTemplate(r io.Reader) (Generator, error) { if err := json.NewDecoder(r).Decode(&config); err != nil { return Generator{}, err } + + envCache := map[string]int{} + if config.Process != nil { + envCache = createEnvCacheMap(config.Process.Env) + } + return Generator{ Config: &config, + envMap: envCache, }, nil } +// createEnvCacheMap creates a hash map with the ENV variables given by the config +func createEnvCacheMap(env []string) map[string]int { + envMap := make(map[string]int, len(env)) + for i, val := range env { + envMap[val] = i + } + return envMap +} + // SetSpec sets the configuration in the Generator g. // // Deprecated: Replace with: @@ -414,6 +444,12 @@ func (g *Generator) SetProcessUsername(username string) { g.Config.Process.User.Username = username } +// SetProcessUmask sets g.Config.Process.User.Umask. +func (g *Generator) SetProcessUmask(umask uint32) { + g.initConfigProcess() + g.Config.Process.User.Umask = umask +} + // SetProcessGID sets g.Config.Process.User.GID. func (g *Generator) SetProcessGID(gid uint32) { g.initConfigProcess() @@ -456,21 +492,44 @@ func (g *Generator) ClearProcessEnv() { return } g.Config.Process.Env = []string{} + // Clear out the env cache map as well + g.envMap = map[string]int{} } // AddProcessEnv adds name=value into g.Config.Process.Env, or replaces an // existing entry with the given name. func (g *Generator) AddProcessEnv(name, value string) { + if name == "" { + return + } + g.initConfigProcess() + g.addEnv(fmt.Sprintf("%s=%s", name, value), name) +} - env := fmt.Sprintf("%s=%s", name, value) - for idx := range g.Config.Process.Env { - if strings.HasPrefix(g.Config.Process.Env[idx], name+"=") { - g.Config.Process.Env[idx] = env - return - } +// AddMultipleProcessEnv adds multiple name=value into g.Config.Process.Env, or replaces +// existing entries with the given name. +func (g *Generator) AddMultipleProcessEnv(envs []string) { + g.initConfigProcess() + + for _, val := range envs { + split := strings.SplitN(val, "=", 2) + g.addEnv(val, split[0]) + } +} + +// addEnv looks through adds ENV to the Process and checks envMap for +// any duplicates +// This is called by both AddMultipleProcessEnv and AddProcessEnv +func (g *Generator) addEnv(env, key string) { + if idx, ok := g.envMap[key]; ok { + // The ENV exists in the cache, so change its value in g.Config.Process.Env + g.Config.Process.Env[idx] = env + } else { + // else the env doesn't exist, so add it and add it's index to g.envMap + g.Config.Process.Env = append(g.Config.Process.Env, env) + g.envMap[key] = len(g.Config.Process.Env) - 1 } - g.Config.Process.Env = append(g.Config.Process.Env, env) } // AddProcessRlimits adds rlimit into g.Config.Process.Rlimits. @@ -1443,7 +1502,7 @@ func (g *Generator) AddDevice(device rspec.LinuxDevice) { return } if dev.Type == device.Type && dev.Major == device.Major && dev.Minor == device.Minor { - fmt.Fprintln(os.Stderr, "WARNING: The same type, major and minor should not be used for multiple devices.") + fmt.Fprintf(os.Stderr, "WARNING: Creating device %q with same type, major and minor as existing %q.\n", device.Path, dev.Path) } } diff --git a/vendor/github.com/opencontainers/runtime-tools/generate/seccomp/seccomp_default.go b/vendor/github.com/opencontainers/runtime-tools/generate/seccomp/seccomp_default.go index 5fee5a3b2..8a8dc3970 100644 --- a/vendor/github.com/opencontainers/runtime-tools/generate/seccomp/seccomp_default.go +++ b/vendor/github.com/opencontainers/runtime-tools/generate/seccomp/seccomp_default.go @@ -566,6 +566,20 @@ func DefaultProfile(rs *specs.Spec) *rspec.LinuxSeccomp { }, }...) /* Flags parameter of the clone syscall is the 2nd on s390 */ + syscalls = append(syscalls, []rspec.LinuxSyscall{ + { + Names: []string{"clone"}, + Action: rspec.ActAllow, + Args: []rspec.LinuxSeccompArg{ + { + Index: 1, + Value: 2080505856, + ValueTwo: 0, + Op: rspec.OpMaskedEqual, + }, + }, + }, + }...) } return &rspec.LinuxSeccomp{ diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go index 7cce235a6..112a926c3 100644 --- a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go @@ -119,11 +119,13 @@ func (d *childDriver) handleConnectRequest(c *net.UnixConn, req *msg.Request) er if err != nil { return err } + defer targetConnFile.Close() oob := unix.UnixRights(int(targetConnFile.Fd())) f, err := c.File() if err != nil { return err } + defer f.Close() for { err = unix.Sendmsg(int(f.Fd()), []byte("dummy"), oob, nil, 0) if err != unix.EINTR { diff --git a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md index 6a7e3c5ca..cab87e9d6 100644 --- a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md +++ b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md @@ -1,6 +1,19 @@ Changes by Version ================== +2.25.0 (2020-07-13) +------------------- +## Breaking changes +- [feat] Periodically re-resolve UDP server address, with opt-out (#520) -- Trevor Foster + + The re-resolving of UDP address is now enabled by default, to make the client more robust in Kubernetes deployments. + The old resolve-once behavior can be restored by setting DisableAttemptReconnecting=true in the Configuration struct, + or via JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED=true environment variable. + +## Bug fixes +- Do not add invalid context to references (#521) -- Yuri Shkuro + + 2.24.0 (2020-06-14) ------------------- - Mention FromEnv() in the README, docs, and examples (#518) -- Martin Lercher diff --git a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock index 2a5215a50..387958b12 100644 --- a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock +++ b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock @@ -142,10 +142,19 @@ version = "v0.0.5" [[projects]] - digest = "1:0496f0e99014b7fd0a560c539f51d0882731137b85494142f47e550e4657176a" + digest = "1:ac83cf90d08b63ad5f7e020ef480d319ae890c208f8524622a2f3136e2686b02" + name = "github.com/stretchr/objx" + packages = ["."] + pruneopts = "UT" + revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c" + version = "v0.1.1" + +[[projects]] + digest = "1:d88ba57c4e8f5db6ce9ab6605a89f4542ee751b576884ba5271c2ba3d4b6f2d2" name = "github.com/stretchr/testify" packages = [ "assert", + "mock", "require", "suite", ] @@ -154,6 +163,42 @@ version = "v1.4.0" [[projects]] + digest = "1:5b98956718573850caf7e0fd00b571a6657c4ef1f345ddf0c96b43ce355fe862" + name = "github.com/uber/jaeger-client-go" + packages = [ + ".", + "config", + "crossdock/client", + "crossdock/common", + "crossdock/endtoend", + "crossdock/log", + "crossdock/server", + "crossdock/thrift/tracetest", + "internal/baggage", + "internal/baggage/remote", + "internal/reporterstats", + "internal/spanlog", + "internal/throttler", + "internal/throttler/remote", + "log", + "log/zap/mock_opentracing", + "rpcmetrics", + "testutils", + "thrift", + "thrift-gen/agent", + "thrift-gen/baggage", + "thrift-gen/jaeger", + "thrift-gen/sampling", + "thrift-gen/zipkincore", + "transport", + "transport/zipkin", + "utils", + ] + pruneopts = "UT" + revision = "66c008c3d6ad856cac92a0af53186efbffa8e6a5" + version = "v2.24.0" + +[[projects]] digest = "1:0ec60ffd594af00ba1660bc746aa0e443d27dd4003dee55f9d08a0b4ff5431a3" name = "github.com/uber/jaeger-lib" packages = [ @@ -314,8 +359,36 @@ "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", "github.com/stretchr/testify/require", "github.com/stretchr/testify/suite", + "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", + "github.com/uber/jaeger-client-go/crossdock/client", + "github.com/uber/jaeger-client-go/crossdock/common", + "github.com/uber/jaeger-client-go/crossdock/endtoend", + "github.com/uber/jaeger-client-go/crossdock/log", + "github.com/uber/jaeger-client-go/crossdock/server", + "github.com/uber/jaeger-client-go/crossdock/thrift/tracetest", + "github.com/uber/jaeger-client-go/internal/baggage", + "github.com/uber/jaeger-client-go/internal/baggage/remote", + "github.com/uber/jaeger-client-go/internal/reporterstats", + "github.com/uber/jaeger-client-go/internal/spanlog", + "github.com/uber/jaeger-client-go/internal/throttler", + "github.com/uber/jaeger-client-go/internal/throttler/remote", + "github.com/uber/jaeger-client-go/log", + "github.com/uber/jaeger-client-go/log/zap/mock_opentracing", + "github.com/uber/jaeger-client-go/rpcmetrics", + "github.com/uber/jaeger-client-go/testutils", + "github.com/uber/jaeger-client-go/thrift", + "github.com/uber/jaeger-client-go/thrift-gen/agent", + "github.com/uber/jaeger-client-go/thrift-gen/baggage", + "github.com/uber/jaeger-client-go/thrift-gen/jaeger", + "github.com/uber/jaeger-client-go/thrift-gen/sampling", + "github.com/uber/jaeger-client-go/thrift-gen/zipkincore", + "github.com/uber/jaeger-client-go/transport", + "github.com/uber/jaeger-client-go/transport/zipkin", + "github.com/uber/jaeger-client-go/utils", "github.com/uber/jaeger-lib/metrics", "github.com/uber/jaeger-lib/metrics/metricstest", "github.com/uber/jaeger-lib/metrics/prometheus", diff --git a/vendor/github.com/uber/jaeger-client-go/README.md b/vendor/github.com/uber/jaeger-client-go/README.md index e7b13b1c2..687f5780c 100644 --- a/vendor/github.com/uber/jaeger-client-go/README.md +++ b/vendor/github.com/uber/jaeger-client-go/README.md @@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`). JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`). JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`). +JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`). +JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`). JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/. JAEGER_SAMPLER_PARAM | The sampler parameter (number). JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler. diff --git a/vendor/github.com/uber/jaeger-client-go/config/config.go b/vendor/github.com/uber/jaeger-client-go/config/config.go index e6ffb987d..bb1228294 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/utils" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/internal/baggage/remote" @@ -124,6 +125,17 @@ type ReporterConfig struct { // Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT LocalAgentHostPort string `yaml:"localAgentHostPort"` + // DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves + // the agent's hostname and reconnects if there was a change. This option only + // applies if LocalAgentHostPort is specified. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED + DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"` + + // AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname + // in order to detect address changes. This option only applies if DisableAttemptReconnecting is false. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL + AttemptReconnectInterval time.Duration + // CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL. // Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT CollectorEndpoint string `yaml:"collectorEndpoint"` @@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter( metrics *jaeger.Metrics, logger jaeger.Logger, ) (jaeger.Reporter, error) { - sender, err := rc.newTransport() + sender, err := rc.newTransport(logger) if err != nil { return nil, err } @@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter( return reporter, err } -func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { +func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) { switch { case rc.CollectorEndpoint != "": httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)} @@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { } return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil default: - return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) + return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: rc.LocalAgentHostPort, + Logger: logger, + DisableAttemptReconnecting: rc.DisableAttemptReconnecting, + AttemptReconnectInterval: rc.AttemptReconnectInterval, + }, + }) } } diff --git a/vendor/github.com/uber/jaeger-client-go/config/config_env.go b/vendor/github.com/uber/jaeger-client-go/config/config_env.go index f38eb9d93..92d60cd59 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config_env.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config_env.go @@ -24,30 +24,31 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/uber/jaeger-client-go" ) const ( // environment variable names - envServiceName = "JAEGER_SERVICE_NAME" - envDisabled = "JAEGER_DISABLED" - envRPCMetrics = "JAEGER_RPC_METRICS" - envTags = "JAEGER_TAGS" - envSamplerType = "JAEGER_SAMPLER_TYPE" - envSamplerParam = "JAEGER_SAMPLER_PARAM" - envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint - envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" - envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" - envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" - envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" - envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" - envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" - envEndpoint = "JAEGER_ENDPOINT" - envUser = "JAEGER_USER" - envPassword = "JAEGER_PASSWORD" - envAgentHost = "JAEGER_AGENT_HOST" - envAgentPort = "JAEGER_AGENT_PORT" + envServiceName = "JAEGER_SERVICE_NAME" + envDisabled = "JAEGER_DISABLED" + envRPCMetrics = "JAEGER_RPC_METRICS" + envTags = "JAEGER_TAGS" + envSamplerType = "JAEGER_SAMPLER_TYPE" + envSamplerParam = "JAEGER_SAMPLER_PARAM" + envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint + envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" + envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" + envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" + envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" + envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" + envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" + envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED" + envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL" + envEndpoint = "JAEGER_ENDPOINT" + envUser = "JAEGER_USER" + envPassword = "JAEGER_PASSWORD" + envAgentHost = "JAEGER_AGENT_HOST" + envAgentPort = "JAEGER_AGENT_PORT" ) // FromEnv uses environment variables to set the tracer's Configuration @@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) { if useEnv || rc.LocalAgentHostPort == "" { rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port) } + + if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" { + if value, err := strconv.ParseBool(e); err == nil { + rc.DisableAttemptReconnecting = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e) + } + } + + if !rc.DisableAttemptReconnecting { + if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" { + if value, err := time.ParseDuration(e); err == nil { + rc.AttemptReconnectInterval = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e) + } + } + } } return rc, nil diff --git a/vendor/github.com/uber/jaeger-client-go/constants.go b/vendor/github.com/uber/jaeger-client-go/constants.go index feaf344ad..2f63d5909 100644 --- a/vendor/github.com/uber/jaeger-client-go/constants.go +++ b/vendor/github.com/uber/jaeger-client-go/constants.go @@ -22,7 +22,7 @@ import ( const ( // JaegerClientVersion is the version of the client library reported as Span tag. - JaegerClientVersion = "Go-2.24.0" + JaegerClientVersion = "Go-2.25.0" // JaegerClientVersionTagKey is the name of the tag used to report client version. JaegerClientVersionTagKey = "jaeger.version" diff --git a/vendor/github.com/uber/jaeger-client-go/span_context.go b/vendor/github.com/uber/jaeger-client-go/span_context.go index 1b44f3f8c..ae9d94a9a 100644 --- a/vendor/github.com/uber/jaeger-client-go/span_context.go +++ b/vendor/github.com/uber/jaeger-client-go/span_context.go @@ -212,10 +212,14 @@ func (c SpanContext) SetFirehose() { } func (c SpanContext) String() string { + var flags int32 + if c.samplingState != nil { + flags = c.samplingState.stateFlags.Load() + } if c.traceID.High == 0 { - return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags) } - return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags) } // ContextFromString reconstructs the Context encoded in a string diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go index 8a3fc97ab..477c6eae3 100644 --- a/vendor/github.com/uber/jaeger-client-go/tracer.go +++ b/vendor/github.com/uber/jaeger-client-go/tracer.go @@ -216,10 +216,10 @@ func (t *Tracer) startSpanWithOptions( options.StartTime = t.timeNow() } - // Predicate whether the given span context is a valid reference - // which may be used as parent / debug ID / baggage items source - isValidReference := func(ctx SpanContext) bool { - return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0 + // Predicate whether the given span context is an empty reference + // or may be used as parent / debug ID / baggage items source + isEmptyReference := func(ctx SpanContext) bool { + return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0 } var references []Reference @@ -235,7 +235,7 @@ func (t *Tracer) startSpanWithOptions( reflect.ValueOf(ref.ReferencedContext))) continue } - if !isValidReference(ctxRef) { + if isEmptyReference(ctxRef) { continue } @@ -245,14 +245,17 @@ func (t *Tracer) startSpanWithOptions( continue } - references = append(references, Reference{Type: ref.Type, Context: ctxRef}) + if ctxRef.IsValid() { + // we don't want empty context that contains only debug-id or baggage + references = append(references, Reference{Type: ref.Type, Context: ctxRef}) + } if !hasParent { parent = ctxRef hasParent = ref.Type == opentracing.ChildOfRef } } - if !hasParent && isValidReference(parent) { + if !hasParent && !isEmptyReference(parent) { // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from // the FollowFromRef as the parent hasParent = true diff --git a/vendor/github.com/uber/jaeger-client-go/transport_udp.go b/vendor/github.com/uber/jaeger-client-go/transport_udp.go index 7370d8007..5734819ab 100644 --- a/vendor/github.com/uber/jaeger-client-go/transport_udp.go +++ b/vendor/github.com/uber/jaeger-client-go/transport_udp.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/uber/jaeger-client-go/internal/reporterstats" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/utils" @@ -57,35 +58,57 @@ type udpSender struct { failedToEmitSpans int64 } -// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should +// be passed to NewUDPTransportWithParams. +type UDPTransportParams struct { + utils.AgentClientUDPParams +} + +// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent. // TODO: (breaking change) move to transport/ package. -func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { - if len(hostPort) == 0 { - hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) +func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) { + if len(params.HostPort) == 0 { + params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) } - if maxPacketSize == 0 { - maxPacketSize = utils.UDPPacketMaxLength + + if params.Logger == nil { + params.Logger = log.StdLogger + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = utils.UDPPacketMaxLength } protocolFactory := thrift.NewTCompactProtocolFactory() // Each span is first written to thriftBuffer to determine its size in bytes. - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) thriftProtocol := protocolFactory.GetProtocol(thriftBuffer) - client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize) + client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams) if err != nil { return nil, err } return &udpSender{ client: client, - maxSpanBytes: maxPacketSize - emitBatchOverhead, + maxSpanBytes: params.MaxPacketSize - emitBatchOverhead, thriftBuffer: thriftBuffer, thriftProtocol: thriftProtocol, }, nil } +// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// TODO: (breaking change) move to transport/ package. +func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { + return NewUDPTransportWithParams(UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }, + }) +} + // SetReporterStats implements reporterstats.Receiver. func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) { s.reporterStats = rs diff --git a/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go b/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go new file mode 100644 index 000000000..0dffc7fa2 --- /dev/null +++ b/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go @@ -0,0 +1,189 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/uber/jaeger-client-go/log" +) + +// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +type reconnectingUDPConn struct { + hostPort string + resolveFunc resolveFunc + dialFunc dialFunc + logger log.Logger + bufferBytes int64 + + connMtx sync.RWMutex + conn *net.UDPConn + destAddr *net.UDPAddr + closeChan chan struct{} +} + +type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) +type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) + +// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) { + conn := &reconnectingUDPConn{ + hostPort: hostPort, + resolveFunc: resolveFunc, + dialFunc: dialFunc, + logger: logger, + closeChan: make(chan struct{}), + } + + if err := conn.attemptResolveAndDial(); err != nil { + logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)) + } + + go conn.reconnectLoop(resolveTimeout) + + return conn, nil +} + +func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { + ticker := time.NewTicker(resolveTimeout) + defer ticker.Stop() + + for { + select { + case <-c.closeChan: + return + case <-ticker.C: + if err := c.attemptResolveAndDial(); err != nil { + c.logger.Error(err.Error()) + } + } + } +} + +func (c *reconnectingUDPConn) attemptResolveAndDial() error { + newAddr, err := c.resolveFunc("udp", c.hostPort) + if err != nil { + return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) + } + + c.connMtx.RLock() + curAddr := c.destAddr + c.connMtx.RUnlock() + + // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn + if curAddr != nil && newAddr.String() == curAddr.String() { + return nil + } + + if err := c.attemptDialNewAddr(newAddr); err != nil { + return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) + } + + return nil +} + +func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { + connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) + if err != nil { + return err + } + + if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { + if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { + return err + } + } + + c.connMtx.Lock() + c.destAddr = newAddr + // store prev to close later + prevConn := c.conn + c.conn = connUDP + c.connMtx.Unlock() + + if prevConn != nil { + return prevConn.Close() + } + + return nil +} + +// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning +func (c *reconnectingUDPConn) Write(b []byte) (int, error) { + var bytesWritten int + var err error + + c.connMtx.RLock() + if c.conn == nil { + // if connection is not initialized indicate this with err in order to hook into retry logic + err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") + } else { + bytesWritten, err = c.conn.Write(b) + } + c.connMtx.RUnlock() + + if err == nil { + return bytesWritten, nil + } + + // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again + if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { + c.connMtx.RLock() + defer c.connMtx.RUnlock() + return c.conn.Write(b) + } + + // return original error if reconn fails + return bytesWritten, err +} + +// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation +func (c *reconnectingUDPConn) Close() error { + close(c.closeChan) + + // acquire rw lock before closing conn to ensure calls to Write drain + c.connMtx.Lock() + defer c.connMtx.Unlock() + + if c.conn != nil { + return c.conn.Close() + } + + return nil +} + +// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held +// and SetWriteBuffer is called store bufferBytes to be set for new conns +func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { + var err error + + c.connMtx.RLock() + if c.conn != nil { + err = c.conn.SetWriteBuffer(bytes) + } + c.connMtx.RUnlock() + + if err == nil { + atomic.StoreInt64(&c.bufferBytes, int64(bytes)) + } + + return err +} diff --git a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go index fadd73e49..2352643ce 100644 --- a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go +++ b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go @@ -19,7 +19,9 @@ import ( "fmt" "io" "net" + "time" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" "github.com/uber/jaeger-client-go/thrift-gen/agent" @@ -35,41 +37,90 @@ type AgentClientUDP struct { agent.Agent io.Closer - connUDP *net.UDPConn + connUDP udpConn client *agent.AgentClient maxPacketSize int // max size of datagram in bytes thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span } -// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. -func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { - if maxPacketSize == 0 { - maxPacketSize = UDPPacketMaxLength +type udpConn interface { + Write([]byte) (int, error) + SetWriteBuffer(int) error + Close() error +} + +// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should +// be passed to NewAgentClientUDPWithParams. +type AgentClientUDPParams struct { + HostPort string + MaxPacketSize int + Logger log.Logger + DisableAttemptReconnecting bool + AttemptReconnectInterval time.Duration +} + +// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) { + // validate hostport + if _, _, err := net.SplitHostPort(params.HostPort); err != nil { + return nil, err + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = UDPPacketMaxLength + } + + if params.Logger == nil { + params.Logger = log.StdLogger } - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 { + params.AttemptReconnectInterval = time.Second * 30 + } + + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) protocolFactory := thrift.NewTCompactProtocolFactory() client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory) - destAddr, err := net.ResolveUDPAddr("udp", hostPort) - if err != nil { - return nil, err - } + var connUDP udpConn + var err error - connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr) - if err != nil { - return nil, err + if params.DisableAttemptReconnecting { + destAddr, err := net.ResolveUDPAddr("udp", params.HostPort) + if err != nil { + return nil, err + } + + connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) + if err != nil { + return nil, err + } + } else { + // host is hostname, setup resolver loop in case host record changes during operation + connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) + if err != nil { + return nil, err + } } - if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil { + + if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { return nil, err } - clientUDP := &AgentClientUDP{ + return &AgentClientUDP{ connUDP: connUDP, client: client, - maxPacketSize: maxPacketSize, - thriftBuffer: thriftBuffer} - return clientUDP, nil + maxPacketSize: params.MaxPacketSize, + thriftBuffer: thriftBuffer, + }, nil +} + +// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { + return NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }) } // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface |