diff options
28 files changed, 681 insertions, 76 deletions
diff --git a/cmd/podman/common/netflags.go b/cmd/podman/common/netflags.go index 5c83a3c9c..82a82b310 100644 --- a/cmd/podman/common/netflags.go +++ b/cmd/podman/common/netflags.go @@ -2,6 +2,7 @@ package common import ( "net" + "strings" "github.com/containers/libpod/v2/cmd/podman/parse" "github.com/containers/libpod/v2/libpod/define" @@ -164,11 +165,18 @@ func NetFlagsToNetOptions(cmd *cobra.Command) (*entities.NetOptions, error) { return nil, err } + parts := strings.SplitN(network, ":", 2) + ns, cniNets, err := specgen.ParseNetworkNamespace(network) if err != nil { return nil, err } + if len(parts) > 1 { + opts.NetworkOptions = make(map[string][]string) + opts.NetworkOptions[parts[0]] = strings.Split(parts[1], ",") + cniNets = nil + } opts.Network = ns opts.CNINetworks = cniNets } diff --git a/cmd/podman/common/specgen.go b/cmd/podman/common/specgen.go index 8a265cedf..08099da4b 100644 --- a/cmd/podman/common/specgen.go +++ b/cmd/podman/common/specgen.go @@ -417,6 +417,7 @@ func FillOutSpecGen(s *specgen.SpecGenerator, c *ContainerCLIOpts, args []string s.DNSOptions = c.Net.DNSOptions s.StaticIP = c.Net.StaticIP s.StaticMAC = c.Net.StaticMAC + s.NetworkOptions = c.Net.NetworkOptions s.UseImageHosts = c.Net.NoHosts s.ImageVolumeMode = c.ImageVolume diff --git a/docs/source/markdown/podman-create.1.md b/docs/source/markdown/podman-create.1.md index fd5e14535..4466e6616 100644 --- a/docs/source/markdown/podman-create.1.md +++ b/docs/source/markdown/podman-create.1.md @@ -553,7 +553,10 @@ Valid values are: - `<network-name>|<network-id>`: connect to a user-defined network, multiple networks should be comma separated - `ns:<path>`: path to a network namespace to join - `private`: create a new namespace for the container (default) -- `slirp4netns`: use slirp4netns to create a user network stack. This is the default for rootless containers +- `slirp4netns[:OPTIONS,...]`: use slirp4netns to create a user network stack. This is the default for rootless containers. It is possible to specify these additional options: + **port_handler=rootlesskit**: Use rootlesskit for port forwarding. Default. + **port_handler=slirp4netns**: Use the slirp4netns port forwarding. + **allow_host_loopback=true|false**: Allow the slirp4netns to reach the host loopback IP (`10.0.2.2`). Default to false. **--network-alias**=*alias* diff --git a/docs/source/markdown/podman-run.1.md b/docs/source/markdown/podman-run.1.md index 68ffb69e5..de1d8aff6 100644 --- a/docs/source/markdown/podman-run.1.md +++ b/docs/source/markdown/podman-run.1.md @@ -561,7 +561,10 @@ Valid _mode_ values are: - _network-id_: connect to a user-defined network, multiple networks should be comma separated; - **ns:**_path_: path to a network namespace to join; - `private`: create a new namespace for the container (default) -- **slirp4netns**: use **slirp4netns**(1) to create a user network stack. This is the default for rootless containers. +- **slirp4netns[:OPTIONS,...]**: use **slirp4netns**(1) to create a user network stack. This is the default for rootless containers. It is possible to specify these additional options: + **port_handler=rootlesskit**: Use rootlesskit for port forwarding. Default. + **port_handler=slirp4netns**: Use the slirp4netns port forwarding. + **allow_host_loopback=true|false**: Allow the slirp4netns to reach the host loopback IP (`10.0.2.2`). Default to false. **--network-alias**=*alias* @@ -53,7 +53,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.6.1 github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2 - github.com/uber/jaeger-client-go v2.24.0+incompatible + github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect github.com/varlink/go v0.0.0-20190502142041-0f1d566d194b github.com/vishvananda/netlink v1.1.0 @@ -424,6 +424,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -439,8 +440,8 @@ github.com/tchap/go-patricia v2.3.0+incompatible h1:GkY4dP3cEfEASBPPkWd+AmjYxhmD github.com/tchap/go-patricia v2.3.0+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/u-root/u-root v6.0.0+incompatible/go.mod h1:RYkpo8pTHrNjW08opNd/U6p/RJE7K0D8fXO0d47+3YY= -github.com/uber/jaeger-client-go v2.24.0+incompatible h1:CGchgJcHsDd2jWnaL4XngByMrXoGHh3n8oCqAKx0uMo= -github.com/uber/jaeger-client-go v2.24.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= +github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= diff --git a/libpod/container.go b/libpod/container.go index f7abfb005..1ca38ae7e 100644 --- a/libpod/container.go +++ b/libpod/container.go @@ -341,6 +341,8 @@ type ContainerConfig struct { Networks []string `json:"networks,omitempty"` // Network mode specified for the default network. NetMode namespaces.NetworkMode `json:"networkMode,omitempty"` + // NetworkOptions are additional options for each network + NetworkOptions map[string][]string `json:"network_options,omitempty"` // Image Config diff --git a/libpod/networking_linux.go b/libpod/networking_linux.go index 1e79e8732..35e58f916 100644 --- a/libpod/networking_linux.go +++ b/libpod/networking_linux.go @@ -173,6 +173,19 @@ type slirpFeatures struct { HasEnableSeccomp bool } +type slirp4netnsCmdArg struct { + Proto string `json:"proto,omitempty"` + HostAddr string `json:"host_addr"` + HostPort int32 `json:"host_port"` + GuestAddr string `json:"guest_addr"` + GuestPort int32 `json:"guest_port"` +} + +type slirp4netnsCmd struct { + Execute string `json:"execute"` + Args slirp4netnsCmdArg `json:"arguments"` +} + func checkSlirpFlags(path string) (*slirpFeatures, error) { cmd := exec.Command(path, "--help") out, err := cmd.CombinedOutput() @@ -210,12 +223,33 @@ func (r *Runtime) setupRootlessNetNS(ctr *Container) error { havePortMapping := len(ctr.Config().PortMappings) > 0 logPath := filepath.Join(ctr.runtime.config.Engine.TmpDir, fmt.Sprintf("slirp4netns-%s.log", ctr.config.ID)) + isSlirpHostForward := false + disableHostLoopback := true + if ctr.config.NetworkOptions != nil { + slirpOptions := ctr.config.NetworkOptions["slirp4netns"] + for _, o := range slirpOptions { + switch o { + case "port_handler=slirp4netns": + isSlirpHostForward = true + case "port_handler=rootlesskit": + isSlirpHostForward = false + case "allow_host_loopback=true": + disableHostLoopback = false + case "allow_host_loopback=false": + disableHostLoopback = true + default: + return errors.Errorf("unknown option for slirp4netns: %q", o) + + } + } + } + cmdArgs := []string{} slirpFeatures, err := checkSlirpFlags(path) if err != nil { return errors.Wrapf(err, "error checking slirp4netns binary %s: %q", path, err) } - if slirpFeatures.HasDisableHostLoopback { + if disableHostLoopback && slirpFeatures.HasDisableHostLoopback { cmdArgs = append(cmdArgs, "--disable-host-loopback") } if slirpFeatures.HasMTU { @@ -228,6 +262,12 @@ func (r *Runtime) setupRootlessNetNS(ctr *Container) error { cmdArgs = append(cmdArgs, "--enable-seccomp") } + var apiSocket string + if havePortMapping && isSlirpHostForward { + apiSocket = filepath.Join(ctr.runtime.config.Engine.TmpDir, fmt.Sprintf("%s.net", ctr.config.ID)) + cmdArgs = append(cmdArgs, "--api-socket", apiSocket) + } + // the slirp4netns arguments being passed are describes as follows: // from the slirp4netns documentation: https://github.com/rootless-containers/slirp4netns // -c, --configure Brings up the tap interface @@ -291,7 +331,11 @@ func (r *Runtime) setupRootlessNetNS(ctr *Container) error { } if havePortMapping { - return r.setupRootlessPortMapping(ctr, netnsPath) + if isSlirpHostForward { + return r.setupRootlessPortMappingViaSlirp(ctr, cmd, apiSocket) + } else { + return r.setupRootlessPortMappingViaRLK(ctr, netnsPath) + } } return nil } @@ -342,7 +386,7 @@ func waitForSync(syncR *os.File, cmd *exec.Cmd, logFile io.ReadSeeker, timeout t return nil } -func (r *Runtime) setupRootlessPortMapping(ctr *Container, netnsPath string) error { +func (r *Runtime) setupRootlessPortMappingViaRLK(ctr *Container, netnsPath string) error { syncR, syncW, err := os.Pipe() if err != nil { return errors.Wrapf(err, "failed to open pipe") @@ -419,6 +463,90 @@ func (r *Runtime) setupRootlessPortMapping(ctr *Container, netnsPath string) err return nil } +func (r *Runtime) setupRootlessPortMappingViaSlirp(ctr *Container, cmd *exec.Cmd, apiSocket string) (err error) { + const pidWaitTimeout = 60 * time.Second + chWait := make(chan error) + go func() { + interval := 25 * time.Millisecond + for i := time.Duration(0); i < pidWaitTimeout; i += interval { + // Check if the process is still running. + var status syscall.WaitStatus + pid, err := syscall.Wait4(cmd.Process.Pid, &status, syscall.WNOHANG, nil) + if err != nil { + break + } + if pid != cmd.Process.Pid { + continue + } + if status.Exited() || status.Signaled() { + chWait <- fmt.Errorf("slirp4netns exited with status %d", status.ExitStatus()) + } + time.Sleep(interval) + } + }() + defer close(chWait) + + // wait that API socket file appears before trying to use it. + if _, err := WaitForFile(apiSocket, chWait, pidWaitTimeout); err != nil { + return errors.Wrapf(err, "waiting for slirp4nets to create the api socket file %s", apiSocket) + } + + // for each port we want to add we need to open a connection to the slirp4netns control socket + // and send the add_hostfwd command. + for _, i := range ctr.config.PortMappings { + conn, err := net.Dial("unix", apiSocket) + if err != nil { + return errors.Wrapf(err, "cannot open connection to %s", apiSocket) + } + defer func() { + if err := conn.Close(); err != nil { + logrus.Errorf("unable to close connection: %q", err) + } + }() + hostIP := i.HostIP + if hostIP == "" { + hostIP = "0.0.0.0" + } + apiCmd := slirp4netnsCmd{ + Execute: "add_hostfwd", + Args: slirp4netnsCmdArg{ + Proto: i.Protocol, + HostAddr: hostIP, + HostPort: i.HostPort, + GuestPort: i.ContainerPort, + }, + } + // create the JSON payload and send it. Mark the end of request shutting down writes + // to the socket, as requested by slirp4netns. + data, err := json.Marshal(&apiCmd) + if err != nil { + return errors.Wrapf(err, "cannot marshal JSON for slirp4netns") + } + if _, err := conn.Write([]byte(fmt.Sprintf("%s\n", data))); err != nil { + return errors.Wrapf(err, "cannot write to control socket %s", apiSocket) + } + if err := conn.(*net.UnixConn).CloseWrite(); err != nil { + return errors.Wrapf(err, "cannot shutdown the socket %s", apiSocket) + } + buf := make([]byte, 2048) + readLength, err := conn.Read(buf) + if err != nil { + return errors.Wrapf(err, "cannot read from control socket %s", apiSocket) + } + // if there is no 'error' key in the received JSON data, then the operation was + // successful. + var y map[string]interface{} + if err := json.Unmarshal(buf[0:readLength], &y); err != nil { + return errors.Wrapf(err, "error parsing error status from slirp4netns") + } + if e, found := y["error"]; found { + return errors.Errorf("error from slirp4netns while setting up port redirection: %v", e) + } + } + logrus.Debug("slirp4netns port-forwarding setup via add_hostfwd is ready") + return nil +} + // Configure the network namespace using the container process func (r *Runtime) setupNetNS(ctr *Container) error { nsProcess := fmt.Sprintf("/proc/%d/ns/net", ctr.state.PID) diff --git a/libpod/options.go b/libpod/options.go index b3c11ebc1..32748a3c1 100644 --- a/libpod/options.go +++ b/libpod/options.go @@ -1003,6 +1003,19 @@ func WithStaticIP(ip net.IP) CtrCreateOption { } } +// WithNetworkOptions sets additional options for the networks. +func WithNetworkOptions(options map[string][]string) CtrCreateOption { + return func(ctr *Container) error { + if ctr.valid { + return define.ErrCtrFinalized + } + + ctr.config.NetworkOptions = options + + return nil + } +} + // WithStaticMAC indicates that the container should request a static MAC from // the CNI plugins. // It cannot be set unless WithNetNS has already been passed. diff --git a/pkg/api/handlers/compat/containers_create.go b/pkg/api/handlers/compat/containers_create.go index cbee8a8b6..4ad6aa862 100644 --- a/pkg/api/handlers/compat/containers_create.go +++ b/pkg/api/handlers/compat/containers_create.go @@ -9,6 +9,7 @@ import ( "github.com/containers/common/pkg/config" "github.com/containers/libpod/v2/libpod" + "github.com/containers/libpod/v2/libpod/define" image2 "github.com/containers/libpod/v2/libpod/image" "github.com/containers/libpod/v2/pkg/api/handlers" "github.com/containers/libpod/v2/pkg/api/handlers/utils" @@ -45,6 +46,11 @@ func CreateContainer(w http.ResponseWriter, r *http.Request) { } newImage, err := runtime.ImageRuntime().NewFromLocal(input.Image) if err != nil { + if errors.Cause(err) == define.ErrNoSuchImage { + utils.Error(w, "No such image", http.StatusNotFound, err) + return + } + utils.Error(w, "Something went wrong.", http.StatusInternalServerError, errors.Wrap(err, "NewFromLocal()")) return } diff --git a/pkg/domain/entities/types.go b/pkg/domain/entities/types.go index 7e910ff61..b313e5f8b 100644 --- a/pkg/domain/entities/types.go +++ b/pkg/domain/entities/types.go @@ -42,6 +42,8 @@ type NetOptions struct { PublishPorts []specgen.PortMapping StaticIP *net.IP StaticMAC *net.HardwareAddr + // NetworkOptions are additional options for each network + NetworkOptions map[string][]string } // All CLI inspect commands and inspect sub-commands use the same options diff --git a/pkg/namespaces/namespaces.go b/pkg/namespaces/namespaces.go index 2ffbde977..7831af8f9 100644 --- a/pkg/namespaces/namespaces.go +++ b/pkg/namespaces/namespaces.go @@ -385,7 +385,7 @@ func (n NetworkMode) IsBridge() bool { // IsSlirp4netns indicates if we are running a rootless network stack func (n NetworkMode) IsSlirp4netns() bool { - return n == slirpType + return n == slirpType || strings.HasPrefix(string(n), slirpType+":") } // IsNS indicates a network namespace passed in by path (ns:<path>) diff --git a/pkg/specgen/generate/namespaces.go b/pkg/specgen/generate/namespaces.go index 09d6ba445..a19009bc2 100644 --- a/pkg/specgen/generate/namespaces.go +++ b/pkg/specgen/generate/namespaces.go @@ -2,6 +2,7 @@ package generate import ( "context" + "fmt" "os" "strings" @@ -226,7 +227,11 @@ func namespaceOptions(ctx context.Context, s *specgen.SpecGenerator, rt *libpod. if err != nil { return nil, err } - toReturn = append(toReturn, libpod.WithNetNS(portMappings, postConfigureNetNS, "slirp4netns", nil)) + val := "slirp4netns" + if s.NetNS.Value != "" { + val = fmt.Sprintf("slirp4netns:%s", s.NetNS.Value) + } + toReturn = append(toReturn, libpod.WithNetNS(portMappings, postConfigureNetNS, val, nil)) case specgen.Bridge: portMappings, err := createPortMappings(ctx, s, img) if err != nil { @@ -261,6 +266,9 @@ func namespaceOptions(ctx context.Context, s *specgen.SpecGenerator, rt *libpod. if s.StaticMAC != nil { toReturn = append(toReturn, libpod.WithStaticMAC(*s.StaticMAC)) } + if s.NetworkOptions != nil { + toReturn = append(toReturn, libpod.WithNetworkOptions(s.NetworkOptions)) + } return toReturn, nil } @@ -465,7 +473,7 @@ func GetNamespaceOptions(ns []string) ([]libpod.PodCreateOption, error) { case "pid": options = append(options, libpod.WithPodPID()) case "user": - return erroredOptions, errors.Errorf("User sharing functionality not supported on pod level") + continue case "ipc": options = append(options, libpod.WithPodIPC()) case "uts": diff --git a/pkg/specgen/namespaces.go b/pkg/specgen/namespaces.go index 5f56b242b..9bf2c5d05 100644 --- a/pkg/specgen/namespaces.go +++ b/pkg/specgen/namespaces.go @@ -108,7 +108,9 @@ func validateNetNS(n *Namespace) error { return nil } switch n.NSMode { - case "", Default, Host, Path, FromContainer, FromPod, Private, NoNetwork, Bridge, Slirp: + case Slirp: + break + case "", Default, Host, Path, FromContainer, FromPod, Private, NoNetwork, Bridge: break default: return errors.Errorf("invalid network %q", n.NSMode) @@ -119,8 +121,8 @@ func validateNetNS(n *Namespace) error { if len(n.Value) < 1 { return errors.Errorf("namespace mode %s requires a value", n.NSMode) } - } else { - // All others must NOT set a string value + } else if n.NSMode != Slirp { + // All others except must NOT set a string value if len(n.Value) > 0 { return errors.Errorf("namespace value %s cannot be provided with namespace mode %s", n.Value, n.NSMode) } @@ -250,7 +252,7 @@ func ParseNetworkNamespace(ns string) (Namespace, []string, error) { var cniNetworks []string // Net defaults to Slirp on rootless switch { - case ns == "slirp4netns": + case ns == "slirp4netns", strings.HasPrefix(ns, "slirp4netns:"): toReturn.NSMode = Slirp case ns == "pod": toReturn.NSMode = FromPod diff --git a/pkg/specgen/specgen.go b/pkg/specgen/specgen.go index 17583d82a..a346a9742 100644 --- a/pkg/specgen/specgen.go +++ b/pkg/specgen/specgen.go @@ -379,6 +379,9 @@ type ContainerNetworkConfig struct { // Conflicts with UseImageHosts. // Optional. HostAdd []string `json:"hostadd,omitempty"` + // NetworkOptions are additional options for each network + // Optional. + NetworkOptions map[string][]string `json:"network_options,omitempty"` } // ContainerResourceConfig contains information on container resource limits. diff --git a/test/e2e/run_networking_test.go b/test/e2e/run_networking_test.go index 467d0c5ef..317f760db 100644 --- a/test/e2e/run_networking_test.go +++ b/test/e2e/run_networking_test.go @@ -235,6 +235,35 @@ var _ = Describe("Podman run networking", func() { Expect(ncBusy).To(ExitWithError()) }) + It("podman run network expose host port 8081 to container port 8000 using rootlesskit port handler", func() { + session := podmanTest.Podman([]string{"run", "--network", "slirp4netns:port_handler=rootlesskit", "-dt", "-p", "8081:8000", ALPINE, "/bin/sh"}) + session.Wait(30) + Expect(session.ExitCode()).To(Equal(0)) + + ncBusy := SystemExec("nc", []string{"-l", "-p", "8081"}) + Expect(ncBusy).To(ExitWithError()) + }) + + It("podman run network expose host port 8082 to container port 8000 using slirp4netns port handler", func() { + session := podmanTest.Podman([]string{"run", "--network", "slirp4netns:port_handler=slirp4netns", "-dt", "-p", "8082:8000", ALPINE, "/bin/sh"}) + session.Wait(30) + Expect(session.ExitCode()).To(Equal(0)) + ncBusy := SystemExec("nc", []string{"-l", "-p", "8082"}) + Expect(ncBusy).To(ExitWithError()) + }) + + It("podman run network expose host port 8080 to container port 8000 using invalid port handler", func() { + session := podmanTest.Podman([]string{"run", "--network", "slirp4netns:port_handler=invalid", "-dt", "-p", "8080:8000", ALPINE, "/bin/sh"}) + session.Wait(30) + Expect(session.ExitCode()).To(Not(Equal(0))) + }) + + It("podman run slirp4netns network with host loopback", func() { + session := podmanTest.Podman([]string{"run", "--network", "slirp4netns:allow_host_loopback=true", ALPINE, "ping", "-c1", "10.0.2.2"}) + session.Wait(30) + Expect(session.ExitCode()).To(Equal(0)) + }) + It("podman run network expose ports in image metadata", func() { session := podmanTest.Podman([]string{"create", "-dt", "-P", nginx}) session.Wait(90) diff --git a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md index 6a7e3c5ca..cab87e9d6 100644 --- a/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md +++ b/vendor/github.com/uber/jaeger-client-go/CHANGELOG.md @@ -1,6 +1,19 @@ Changes by Version ================== +2.25.0 (2020-07-13) +------------------- +## Breaking changes +- [feat] Periodically re-resolve UDP server address, with opt-out (#520) -- Trevor Foster + + The re-resolving of UDP address is now enabled by default, to make the client more robust in Kubernetes deployments. + The old resolve-once behavior can be restored by setting DisableAttemptReconnecting=true in the Configuration struct, + or via JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED=true environment variable. + +## Bug fixes +- Do not add invalid context to references (#521) -- Yuri Shkuro + + 2.24.0 (2020-06-14) ------------------- - Mention FromEnv() in the README, docs, and examples (#518) -- Martin Lercher diff --git a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock index 2a5215a50..387958b12 100644 --- a/vendor/github.com/uber/jaeger-client-go/Gopkg.lock +++ b/vendor/github.com/uber/jaeger-client-go/Gopkg.lock @@ -142,10 +142,19 @@ version = "v0.0.5" [[projects]] - digest = "1:0496f0e99014b7fd0a560c539f51d0882731137b85494142f47e550e4657176a" + digest = "1:ac83cf90d08b63ad5f7e020ef480d319ae890c208f8524622a2f3136e2686b02" + name = "github.com/stretchr/objx" + packages = ["."] + pruneopts = "UT" + revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c" + version = "v0.1.1" + +[[projects]] + digest = "1:d88ba57c4e8f5db6ce9ab6605a89f4542ee751b576884ba5271c2ba3d4b6f2d2" name = "github.com/stretchr/testify" packages = [ "assert", + "mock", "require", "suite", ] @@ -154,6 +163,42 @@ version = "v1.4.0" [[projects]] + digest = "1:5b98956718573850caf7e0fd00b571a6657c4ef1f345ddf0c96b43ce355fe862" + name = "github.com/uber/jaeger-client-go" + packages = [ + ".", + "config", + "crossdock/client", + "crossdock/common", + "crossdock/endtoend", + "crossdock/log", + "crossdock/server", + "crossdock/thrift/tracetest", + "internal/baggage", + "internal/baggage/remote", + "internal/reporterstats", + "internal/spanlog", + "internal/throttler", + "internal/throttler/remote", + "log", + "log/zap/mock_opentracing", + "rpcmetrics", + "testutils", + "thrift", + "thrift-gen/agent", + "thrift-gen/baggage", + "thrift-gen/jaeger", + "thrift-gen/sampling", + "thrift-gen/zipkincore", + "transport", + "transport/zipkin", + "utils", + ] + pruneopts = "UT" + revision = "66c008c3d6ad856cac92a0af53186efbffa8e6a5" + version = "v2.24.0" + +[[projects]] digest = "1:0ec60ffd594af00ba1660bc746aa0e443d27dd4003dee55f9d08a0b4ff5431a3" name = "github.com/uber/jaeger-lib" packages = [ @@ -314,8 +359,36 @@ "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/stretchr/testify/assert", + "github.com/stretchr/testify/mock", "github.com/stretchr/testify/require", "github.com/stretchr/testify/suite", + "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", + "github.com/uber/jaeger-client-go/crossdock/client", + "github.com/uber/jaeger-client-go/crossdock/common", + "github.com/uber/jaeger-client-go/crossdock/endtoend", + "github.com/uber/jaeger-client-go/crossdock/log", + "github.com/uber/jaeger-client-go/crossdock/server", + "github.com/uber/jaeger-client-go/crossdock/thrift/tracetest", + "github.com/uber/jaeger-client-go/internal/baggage", + "github.com/uber/jaeger-client-go/internal/baggage/remote", + "github.com/uber/jaeger-client-go/internal/reporterstats", + "github.com/uber/jaeger-client-go/internal/spanlog", + "github.com/uber/jaeger-client-go/internal/throttler", + "github.com/uber/jaeger-client-go/internal/throttler/remote", + "github.com/uber/jaeger-client-go/log", + "github.com/uber/jaeger-client-go/log/zap/mock_opentracing", + "github.com/uber/jaeger-client-go/rpcmetrics", + "github.com/uber/jaeger-client-go/testutils", + "github.com/uber/jaeger-client-go/thrift", + "github.com/uber/jaeger-client-go/thrift-gen/agent", + "github.com/uber/jaeger-client-go/thrift-gen/baggage", + "github.com/uber/jaeger-client-go/thrift-gen/jaeger", + "github.com/uber/jaeger-client-go/thrift-gen/sampling", + "github.com/uber/jaeger-client-go/thrift-gen/zipkincore", + "github.com/uber/jaeger-client-go/transport", + "github.com/uber/jaeger-client-go/transport/zipkin", + "github.com/uber/jaeger-client-go/utils", "github.com/uber/jaeger-lib/metrics", "github.com/uber/jaeger-lib/metrics/metricstest", "github.com/uber/jaeger-lib/metrics/prometheus", diff --git a/vendor/github.com/uber/jaeger-client-go/README.md b/vendor/github.com/uber/jaeger-client-go/README.md index e7b13b1c2..687f5780c 100644 --- a/vendor/github.com/uber/jaeger-client-go/README.md +++ b/vendor/github.com/uber/jaeger-client-go/README.md @@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`). JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`). JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`). +JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`). +JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`). JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/. JAEGER_SAMPLER_PARAM | The sampler parameter (number). JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler. diff --git a/vendor/github.com/uber/jaeger-client-go/config/config.go b/vendor/github.com/uber/jaeger-client-go/config/config.go index e6ffb987d..bb1228294 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config.go @@ -22,6 +22,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/utils" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/internal/baggage/remote" @@ -124,6 +125,17 @@ type ReporterConfig struct { // Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT LocalAgentHostPort string `yaml:"localAgentHostPort"` + // DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves + // the agent's hostname and reconnects if there was a change. This option only + // applies if LocalAgentHostPort is specified. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED + DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"` + + // AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname + // in order to detect address changes. This option only applies if DisableAttemptReconnecting is false. + // Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL + AttemptReconnectInterval time.Duration + // CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL. // Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT CollectorEndpoint string `yaml:"collectorEndpoint"` @@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter( metrics *jaeger.Metrics, logger jaeger.Logger, ) (jaeger.Reporter, error) { - sender, err := rc.newTransport() + sender, err := rc.newTransport(logger) if err != nil { return nil, err } @@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter( return reporter, err } -func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { +func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) { switch { case rc.CollectorEndpoint != "": httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)} @@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) { } return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil default: - return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0) + return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: rc.LocalAgentHostPort, + Logger: logger, + DisableAttemptReconnecting: rc.DisableAttemptReconnecting, + AttemptReconnectInterval: rc.AttemptReconnectInterval, + }, + }) } } diff --git a/vendor/github.com/uber/jaeger-client-go/config/config_env.go b/vendor/github.com/uber/jaeger-client-go/config/config_env.go index f38eb9d93..92d60cd59 100644 --- a/vendor/github.com/uber/jaeger-client-go/config/config_env.go +++ b/vendor/github.com/uber/jaeger-client-go/config/config_env.go @@ -24,30 +24,31 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" - "github.com/uber/jaeger-client-go" ) const ( // environment variable names - envServiceName = "JAEGER_SERVICE_NAME" - envDisabled = "JAEGER_DISABLED" - envRPCMetrics = "JAEGER_RPC_METRICS" - envTags = "JAEGER_TAGS" - envSamplerType = "JAEGER_SAMPLER_TYPE" - envSamplerParam = "JAEGER_SAMPLER_PARAM" - envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint - envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" - envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" - envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" - envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" - envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" - envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" - envEndpoint = "JAEGER_ENDPOINT" - envUser = "JAEGER_USER" - envPassword = "JAEGER_PASSWORD" - envAgentHost = "JAEGER_AGENT_HOST" - envAgentPort = "JAEGER_AGENT_PORT" + envServiceName = "JAEGER_SERVICE_NAME" + envDisabled = "JAEGER_DISABLED" + envRPCMetrics = "JAEGER_RPC_METRICS" + envTags = "JAEGER_TAGS" + envSamplerType = "JAEGER_SAMPLER_TYPE" + envSamplerParam = "JAEGER_SAMPLER_PARAM" + envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint + envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT" + envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS" + envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL" + envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE" + envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL" + envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS" + envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED" + envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL" + envEndpoint = "JAEGER_ENDPOINT" + envUser = "JAEGER_USER" + envPassword = "JAEGER_PASSWORD" + envAgentHost = "JAEGER_AGENT_HOST" + envAgentPort = "JAEGER_AGENT_PORT" ) // FromEnv uses environment variables to set the tracer's Configuration @@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) { if useEnv || rc.LocalAgentHostPort == "" { rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port) } + + if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" { + if value, err := strconv.ParseBool(e); err == nil { + rc.DisableAttemptReconnecting = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e) + } + } + + if !rc.DisableAttemptReconnecting { + if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" { + if value, err := time.ParseDuration(e); err == nil { + rc.AttemptReconnectInterval = value + } else { + return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e) + } + } + } } return rc, nil diff --git a/vendor/github.com/uber/jaeger-client-go/constants.go b/vendor/github.com/uber/jaeger-client-go/constants.go index feaf344ad..2f63d5909 100644 --- a/vendor/github.com/uber/jaeger-client-go/constants.go +++ b/vendor/github.com/uber/jaeger-client-go/constants.go @@ -22,7 +22,7 @@ import ( const ( // JaegerClientVersion is the version of the client library reported as Span tag. - JaegerClientVersion = "Go-2.24.0" + JaegerClientVersion = "Go-2.25.0" // JaegerClientVersionTagKey is the name of the tag used to report client version. JaegerClientVersionTagKey = "jaeger.version" diff --git a/vendor/github.com/uber/jaeger-client-go/span_context.go b/vendor/github.com/uber/jaeger-client-go/span_context.go index 1b44f3f8c..ae9d94a9a 100644 --- a/vendor/github.com/uber/jaeger-client-go/span_context.go +++ b/vendor/github.com/uber/jaeger-client-go/span_context.go @@ -212,10 +212,14 @@ func (c SpanContext) SetFirehose() { } func (c SpanContext) String() string { + var flags int32 + if c.samplingState != nil { + flags = c.samplingState.stateFlags.Load() + } if c.traceID.High == 0 { - return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags) } - return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) + return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags) } // ContextFromString reconstructs the Context encoded in a string diff --git a/vendor/github.com/uber/jaeger-client-go/tracer.go b/vendor/github.com/uber/jaeger-client-go/tracer.go index 8a3fc97ab..477c6eae3 100644 --- a/vendor/github.com/uber/jaeger-client-go/tracer.go +++ b/vendor/github.com/uber/jaeger-client-go/tracer.go @@ -216,10 +216,10 @@ func (t *Tracer) startSpanWithOptions( options.StartTime = t.timeNow() } - // Predicate whether the given span context is a valid reference - // which may be used as parent / debug ID / baggage items source - isValidReference := func(ctx SpanContext) bool { - return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0 + // Predicate whether the given span context is an empty reference + // or may be used as parent / debug ID / baggage items source + isEmptyReference := func(ctx SpanContext) bool { + return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0 } var references []Reference @@ -235,7 +235,7 @@ func (t *Tracer) startSpanWithOptions( reflect.ValueOf(ref.ReferencedContext))) continue } - if !isValidReference(ctxRef) { + if isEmptyReference(ctxRef) { continue } @@ -245,14 +245,17 @@ func (t *Tracer) startSpanWithOptions( continue } - references = append(references, Reference{Type: ref.Type, Context: ctxRef}) + if ctxRef.IsValid() { + // we don't want empty context that contains only debug-id or baggage + references = append(references, Reference{Type: ref.Type, Context: ctxRef}) + } if !hasParent { parent = ctxRef hasParent = ref.Type == opentracing.ChildOfRef } } - if !hasParent && isValidReference(parent) { + if !hasParent && !isEmptyReference(parent) { // If ChildOfRef wasn't found but a FollowFromRef exists, use the context from // the FollowFromRef as the parent hasParent = true diff --git a/vendor/github.com/uber/jaeger-client-go/transport_udp.go b/vendor/github.com/uber/jaeger-client-go/transport_udp.go index 7370d8007..5734819ab 100644 --- a/vendor/github.com/uber/jaeger-client-go/transport_udp.go +++ b/vendor/github.com/uber/jaeger-client-go/transport_udp.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/uber/jaeger-client-go/internal/reporterstats" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" "github.com/uber/jaeger-client-go/utils" @@ -57,35 +58,57 @@ type udpSender struct { failedToEmitSpans int64 } -// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should +// be passed to NewUDPTransportWithParams. +type UDPTransportParams struct { + utils.AgentClientUDPParams +} + +// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent. // TODO: (breaking change) move to transport/ package. -func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { - if len(hostPort) == 0 { - hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) +func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) { + if len(params.HostPort) == 0 { + params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort) } - if maxPacketSize == 0 { - maxPacketSize = utils.UDPPacketMaxLength + + if params.Logger == nil { + params.Logger = log.StdLogger + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = utils.UDPPacketMaxLength } protocolFactory := thrift.NewTCompactProtocolFactory() // Each span is first written to thriftBuffer to determine its size in bytes. - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) thriftProtocol := protocolFactory.GetProtocol(thriftBuffer) - client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize) + client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams) if err != nil { return nil, err } return &udpSender{ client: client, - maxSpanBytes: maxPacketSize - emitBatchOverhead, + maxSpanBytes: params.MaxPacketSize - emitBatchOverhead, thriftBuffer: thriftBuffer, thriftProtocol: thriftProtocol, }, nil } +// NewUDPTransport creates a reporter that submits spans to jaeger-agent. +// TODO: (breaking change) move to transport/ package. +func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) { + return NewUDPTransportWithParams(UDPTransportParams{ + AgentClientUDPParams: utils.AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }, + }) +} + // SetReporterStats implements reporterstats.Receiver. func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) { s.reporterStats = rs diff --git a/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go b/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go new file mode 100644 index 000000000..0dffc7fa2 --- /dev/null +++ b/vendor/github.com/uber/jaeger-client-go/utils/reconnecting_udp_conn.go @@ -0,0 +1,189 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/uber/jaeger-client-go/log" +) + +// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +type reconnectingUDPConn struct { + hostPort string + resolveFunc resolveFunc + dialFunc dialFunc + logger log.Logger + bufferBytes int64 + + connMtx sync.RWMutex + conn *net.UDPConn + destAddr *net.UDPAddr + closeChan chan struct{} +} + +type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error) +type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) + +// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is +// different than the current conn then the new address is dialed and the conn is swapped. +func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) { + conn := &reconnectingUDPConn{ + hostPort: hostPort, + resolveFunc: resolveFunc, + dialFunc: dialFunc, + logger: logger, + closeChan: make(chan struct{}), + } + + if err := conn.attemptResolveAndDial(); err != nil { + logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)) + } + + go conn.reconnectLoop(resolveTimeout) + + return conn, nil +} + +func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) { + ticker := time.NewTicker(resolveTimeout) + defer ticker.Stop() + + for { + select { + case <-c.closeChan: + return + case <-ticker.C: + if err := c.attemptResolveAndDial(); err != nil { + c.logger.Error(err.Error()) + } + } + } +} + +func (c *reconnectingUDPConn) attemptResolveAndDial() error { + newAddr, err := c.resolveFunc("udp", c.hostPort) + if err != nil { + return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err) + } + + c.connMtx.RLock() + curAddr := c.destAddr + c.connMtx.RUnlock() + + // dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn + if curAddr != nil && newAddr.String() == curAddr.String() { + return nil + } + + if err := c.attemptDialNewAddr(newAddr); err != nil { + return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err) + } + + return nil +} + +func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error { + connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr) + if err != nil { + return err + } + + if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 { + if err = connUDP.SetWriteBuffer(bufferBytes); err != nil { + return err + } + } + + c.connMtx.Lock() + c.destAddr = newAddr + // store prev to close later + prevConn := c.conn + c.conn = connUDP + c.connMtx.Unlock() + + if prevConn != nil { + return prevConn.Close() + } + + return nil +} + +// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning +func (c *reconnectingUDPConn) Write(b []byte) (int, error) { + var bytesWritten int + var err error + + c.connMtx.RLock() + if c.conn == nil { + // if connection is not initialized indicate this with err in order to hook into retry logic + err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved") + } else { + bytesWritten, err = c.conn.Write(b) + } + c.connMtx.RUnlock() + + if err == nil { + return bytesWritten, nil + } + + // attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again + if reconnErr := c.attemptResolveAndDial(); reconnErr == nil { + c.connMtx.RLock() + defer c.connMtx.RUnlock() + return c.conn.Write(b) + } + + // return original error if reconn fails + return bytesWritten, err +} + +// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation +func (c *reconnectingUDPConn) Close() error { + close(c.closeChan) + + // acquire rw lock before closing conn to ensure calls to Write drain + c.connMtx.Lock() + defer c.connMtx.Unlock() + + if c.conn != nil { + return c.conn.Close() + } + + return nil +} + +// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held +// and SetWriteBuffer is called store bufferBytes to be set for new conns +func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error { + var err error + + c.connMtx.RLock() + if c.conn != nil { + err = c.conn.SetWriteBuffer(bytes) + } + c.connMtx.RUnlock() + + if err == nil { + atomic.StoreInt64(&c.bufferBytes, int64(bytes)) + } + + return err +} diff --git a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go index fadd73e49..2352643ce 100644 --- a/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go +++ b/vendor/github.com/uber/jaeger-client-go/utils/udp_client.go @@ -19,7 +19,9 @@ import ( "fmt" "io" "net" + "time" + "github.com/uber/jaeger-client-go/log" "github.com/uber/jaeger-client-go/thrift" "github.com/uber/jaeger-client-go/thrift-gen/agent" @@ -35,41 +37,90 @@ type AgentClientUDP struct { agent.Agent io.Closer - connUDP *net.UDPConn + connUDP udpConn client *agent.AgentClient maxPacketSize int // max size of datagram in bytes thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span } -// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. -func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { - if maxPacketSize == 0 { - maxPacketSize = UDPPacketMaxLength +type udpConn interface { + Write([]byte) (int, error) + SetWriteBuffer(int) error + Close() error +} + +// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should +// be passed to NewAgentClientUDPWithParams. +type AgentClientUDPParams struct { + HostPort string + MaxPacketSize int + Logger log.Logger + DisableAttemptReconnecting bool + AttemptReconnectInterval time.Duration +} + +// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) { + // validate hostport + if _, _, err := net.SplitHostPort(params.HostPort); err != nil { + return nil, err + } + + if params.MaxPacketSize == 0 { + params.MaxPacketSize = UDPPacketMaxLength + } + + if params.Logger == nil { + params.Logger = log.StdLogger } - thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize) + if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 { + params.AttemptReconnectInterval = time.Second * 30 + } + + thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize) protocolFactory := thrift.NewTCompactProtocolFactory() client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory) - destAddr, err := net.ResolveUDPAddr("udp", hostPort) - if err != nil { - return nil, err - } + var connUDP udpConn + var err error - connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr) - if err != nil { - return nil, err + if params.DisableAttemptReconnecting { + destAddr, err := net.ResolveUDPAddr("udp", params.HostPort) + if err != nil { + return nil, err + } + + connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr) + if err != nil { + return nil, err + } + } else { + // host is hostname, setup resolver loop in case host record changes during operation + connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger) + if err != nil { + return nil, err + } } - if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil { + + if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil { return nil, err } - clientUDP := &AgentClientUDP{ + return &AgentClientUDP{ connUDP: connUDP, client: client, - maxPacketSize: maxPacketSize, - thriftBuffer: thriftBuffer} - return clientUDP, nil + maxPacketSize: params.MaxPacketSize, + thriftBuffer: thriftBuffer, + }, nil +} + +// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP. +func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) { + return NewAgentClientUDPWithParams(AgentClientUDPParams{ + HostPort: hostPort, + MaxPacketSize: maxPacketSize, + }) } // EmitZipkinBatch implements EmitZipkinBatch() of Agent interface diff --git a/vendor/modules.txt b/vendor/modules.txt index d7ebbd7cc..913cb71eb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -500,7 +500,7 @@ github.com/stretchr/testify/require github.com/syndtr/gocapability/capability # github.com/tchap/go-patricia v2.3.0+incompatible github.com/tchap/go-patricia/patricia -# github.com/uber/jaeger-client-go v2.24.0+incompatible +# github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-client-go github.com/uber/jaeger-client-go/config github.com/uber/jaeger-client-go/internal/baggage |