From da7595a69fc15d131c9d8123d0a165bdde4232b6 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Thu, 28 Nov 2019 23:33:42 +0900 Subject: rootless: use RootlessKit port forwarder RootlessKit port forwarder has a lot of advantages over the slirp4netns port forwarder: * Very high throughput. Benchmark result on Travis: socat: 5.2 Gbps, slirp4netns: 8.3 Gbps, RootlessKit: 27.3 Gbps (https://travis-ci.org/rootless-containers/rootlesskit/builds/597056377) * Connections from the host are treated as 127.0.0.1 rather than 10.0.2.2 in the namespace. No UDP issue (#4586) * No tcp_rmem issue (#4537) * Probably works with IPv6. Even if not, it is trivial to support IPv6. (#4311) * Easily extensible for future support of SCTP * Easily extensible for future support of `lxc-user-nic` SUID network RootlessKit port forwarder has been already adopted as the default port forwarder by Rootless Docker/Moby, and no issue has been reported AFAIK. As the port forwarder is imported as a Go package, no `rootlesskit` binary is required for Podman. Fix #4586 May-fix #4559 Fix #4537 May-fix #4311 See https://github.com/rootless-containers/rootlesskit/blob/v0.7.0/pkg/port/builtin/builtin.go Signed-off-by: Akihiro Suda --- .../rootlesskit/pkg/msgutil/msgutil.go | 66 +++++++++ .../rootlesskit/pkg/port/builtin/builtin.go | 14 ++ .../rootlesskit/pkg/port/builtin/child/child.go | 134 ++++++++++++++++++ .../rootlesskit/pkg/port/builtin/msg/msg.go | 129 ++++++++++++++++++ .../rootlesskit/pkg/port/builtin/opaque/opaque.go | 6 + .../rootlesskit/pkg/port/builtin/parent/parent.go | 145 ++++++++++++++++++++ .../rootlesskit/pkg/port/builtin/parent/tcp/tcp.go | 104 ++++++++++++++ .../rootlesskit/pkg/port/builtin/parent/udp/udp.go | 60 +++++++++ .../port/builtin/parent/udp/udpproxy/udp_proxy.go | 150 +++++++++++++++++++++ .../rootlesskit/pkg/port/port.go | 51 +++++++ .../rootlesskit/pkg/port/portutil/portutil.go | 67 +++++++++ 11 files changed, 926 insertions(+) create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/msgutil/msgutil.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg/msg.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/opaque/opaque.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/parent.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/tcp/tcp.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udp.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udpproxy/udp_proxy.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/port.go create mode 100644 vendor/github.com/rootless-containers/rootlesskit/pkg/port/portutil/portutil.go (limited to 'vendor/github.com/rootless-containers/rootlesskit/pkg') diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/msgutil/msgutil.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/msgutil/msgutil.go new file mode 100644 index 000000000..a0a0c94c6 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/msgutil/msgutil.go @@ -0,0 +1,66 @@ +// Package msgutil provides utility for JSON message with uint32le header +package msgutil + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "io" + + "github.com/pkg/errors" +) + +const ( + maxLength = 1 << 16 +) + +func MarshalToWriter(w io.Writer, x interface{}) (int, error) { + b, err := json.Marshal(x) + if err != nil { + return 0, err + } + if len(b) > maxLength { + return 0, errors.Errorf("bad message length: %d (max: %d)", len(b), maxLength) + } + h := make([]byte, 4) + binary.LittleEndian.PutUint32(h, uint32(len(b))) + return w.Write(append(h, b...)) +} + +func UnmarshalFromReader(r io.Reader, x interface{}) (int, error) { + hdr := make([]byte, 4) + n, err := r.Read(hdr) + if err != nil { + return n, err + } + if n != 4 { + return n, errors.Errorf("read %d bytes, expected 4 bytes", n) + } + bLen := binary.LittleEndian.Uint32(hdr) + if bLen > maxLength || bLen < 1 { + return n, errors.Errorf("bad message length: %d (max: %d)", bLen, maxLength) + } + b := make([]byte, bLen) + n, err = r.Read(b) + if err != nil { + return 4 + n, err + } + if n != int(bLen) { + return 4 + n, errors.Errorf("read %d bytes, expected %d bytes", n, bLen) + } + return 4 + n, json.Unmarshal(b, x) +} + +func Marshal(x interface{}) ([]byte, error) { + var b bytes.Buffer + _, err := MarshalToWriter(&b, x) + return b.Bytes(), err +} + +func Unmarshal(b []byte, x interface{}) error { + n, err := UnmarshalFromReader(bytes.NewReader(b), x) + if n != len(b) { + return errors.Errorf("read %d bytes, expected %d bytes", n, len(b)) + } + return err +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go new file mode 100644 index 000000000..ca3f10b26 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/builtin.go @@ -0,0 +1,14 @@ +package builtin + +import ( + "io" + + "github.com/rootless-containers/rootlesskit/pkg/port" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/child" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent" +) + +var ( + NewParentDriver func(logWriter io.Writer, stateDir string) (port.ParentDriver, error) = parent.NewDriver + NewChildDriver func(logWriter io.Writer) port.ChildDriver = child.NewDriver +) diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go new file mode 100644 index 000000000..5477dda51 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/child/child.go @@ -0,0 +1,134 @@ +package child + +import ( + "fmt" + "io" + "net" + "os" + + "github.com/pkg/errors" + "golang.org/x/sys/unix" + + "github.com/rootless-containers/rootlesskit/pkg/msgutil" + "github.com/rootless-containers/rootlesskit/pkg/port" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg" + opaquepkg "github.com/rootless-containers/rootlesskit/pkg/port/builtin/opaque" +) + +func NewDriver(logWriter io.Writer) port.ChildDriver { + return &childDriver{ + logWriter: logWriter, + } +} + +type childDriver struct { + logWriter io.Writer +} + +func (d *childDriver) RunChildDriver(opaque map[string]string, quit <-chan struct{}) error { + socketPath := opaque[opaquepkg.SocketPath] + if socketPath == "" { + return errors.New("socket path not set") + } + childReadyPipePath := opaque[opaquepkg.ChildReadyPipePath] + if childReadyPipePath == "" { + return errors.New("child ready pipe path not set") + } + childReadyPipeW, err := os.OpenFile(childReadyPipePath, os.O_WRONLY, os.ModeNamedPipe) + if err != nil { + return err + } + ln, err := net.ListenUnix("unix", &net.UnixAddr{ + Name: socketPath, + Net: "unix", + }) + if err != nil { + return err + } + // write nothing, just close + if err = childReadyPipeW.Close(); err != nil { + return err + } + stopAccept := make(chan struct{}, 1) + go func() { + <-quit + stopAccept <- struct{}{} + ln.Close() + }() + for { + c, err := ln.AcceptUnix() + if err != nil { + select { + case <-stopAccept: + return nil + default: + } + return err + } + go func() { + if rerr := d.routine(c); rerr != nil { + rep := msg.Reply{ + Error: rerr.Error(), + } + msgutil.MarshalToWriter(c, &rep) + } + c.Close() + }() + } + return nil +} + +func (d *childDriver) routine(c *net.UnixConn) error { + var req msg.Request + if _, err := msgutil.UnmarshalFromReader(c, &req); err != nil { + return err + } + switch req.Type { + case msg.RequestTypeInit: + return d.handleConnectInit(c, &req) + case msg.RequestTypeConnect: + return d.handleConnectRequest(c, &req) + default: + return errors.Errorf("unknown request type %q", req.Type) + } +} + +func (d *childDriver) handleConnectInit(c *net.UnixConn, req *msg.Request) error { + _, err := msgutil.MarshalToWriter(c, nil) + return err +} + +func (d *childDriver) handleConnectRequest(c *net.UnixConn, req *msg.Request) error { + switch req.Proto { + case "tcp": + case "udp": + default: + return errors.Errorf("unknown proto: %q", req.Proto) + } + var dialer net.Dialer + targetConn, err := dialer.Dial(req.Proto, fmt.Sprintf("127.0.0.1:%d", req.Port)) + if err != nil { + return err + } + defer targetConn.Close() // no effect on duplicated FD + targetConnFiler, ok := targetConn.(filer) + if !ok { + return errors.Errorf("unknown target connection: %+v", targetConn) + } + targetConnFile, err := targetConnFiler.File() + if err != nil { + return err + } + oob := unix.UnixRights(int(targetConnFile.Fd())) + f, err := c.File() + if err != nil { + return err + } + err = unix.Sendmsg(int(f.Fd()), []byte("dummy"), oob, nil, 0) + return err +} + +// filer is implemented by *net.TCPConn and *net.UDPConn +type filer interface { + File() (f *os.File, err error) +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg/msg.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg/msg.go new file mode 100644 index 000000000..c603f473a --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg/msg.go @@ -0,0 +1,129 @@ +package msg + +import ( + "net" + "time" + + "github.com/pkg/errors" + "golang.org/x/sys/unix" + + "github.com/rootless-containers/rootlesskit/pkg/msgutil" + "github.com/rootless-containers/rootlesskit/pkg/port" +) + +const ( + RequestTypeInit = "init" + RequestTypeConnect = "connect" +) + +// Request and Response are encoded as JSON with uint32le length header. +type Request struct { + Type string // "init" or "connect" + Proto string // "tcp" or "udp" + Port int +} + +// Reply may contain FD as OOB +type Reply struct { + Error string +} + +// Initiate sends "init" request to the child UNIX socket. +func Initiate(c *net.UnixConn) error { + req := Request{ + Type: RequestTypeInit, + } + if _, err := msgutil.MarshalToWriter(c, &req); err != nil { + return err + } + if err := c.CloseWrite(); err != nil { + return err + } + var rep Reply + if _, err := msgutil.UnmarshalFromReader(c, &rep); err != nil { + return err + } + return c.CloseRead() +} + +// ConnectToChild connects to the child UNIX socket, and obtains TCP or UDP socket FD +// that corresponds to the port spec. +func ConnectToChild(c *net.UnixConn, spec port.Spec) (int, error) { + req := Request{ + Type: RequestTypeConnect, + Proto: spec.Proto, + Port: spec.ChildPort, + } + if _, err := msgutil.MarshalToWriter(c, &req); err != nil { + return 0, err + } + if err := c.CloseWrite(); err != nil { + return 0, err + } + oobSpace := unix.CmsgSpace(4) + oob := make([]byte, oobSpace) + _, oobN, _, _, err := c.ReadMsgUnix(nil, oob) + if err != nil { + return 0, err + } + if oobN != oobSpace { + return 0, errors.Errorf("expected OOB space %d, got %d", oobSpace, oobN) + } + oob = oob[:oobN] + fd, err := parseFDFromOOB(oob) + if err != nil { + return 0, err + } + if err := c.CloseRead(); err != nil { + return 0, err + } + return fd, nil +} + +// ConnectToChildWithSocketPath wraps ConnectToChild +func ConnectToChildWithSocketPath(socketPath string, spec port.Spec) (int, error) { + var dialer net.Dialer + conn, err := dialer.Dial("unix", socketPath) + if err != nil { + return 0, err + } + defer conn.Close() + c := conn.(*net.UnixConn) + return ConnectToChild(c, spec) +} + +// ConnectToChildWithRetry retries ConnectToChild every (i*5) milliseconds. +func ConnectToChildWithRetry(socketPath string, spec port.Spec, retries int) (int, error) { + for i := 0; i < retries; i++ { + fd, err := ConnectToChildWithSocketPath(socketPath, spec) + if i == retries-1 && err != nil { + return 0, err + } + if err == nil { + return fd, err + } + // TODO: backoff + time.Sleep(time.Duration(i*5) * time.Millisecond) + } + // NOT REACHED + return 0, errors.New("reached max retry") +} + +func parseFDFromOOB(oob []byte) (int, error) { + scms, err := unix.ParseSocketControlMessage(oob) + if err != nil { + return 0, err + } + if len(scms) != 1 { + return 0, errors.Errorf("unexpected scms: %v", scms) + } + scm := scms[0] + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + return 0, err + } + if len(fds) != 1 { + return 0, errors.Errorf("unexpected fds: %v", fds) + } + return fds[0], nil +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/opaque/opaque.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/opaque/opaque.go new file mode 100644 index 000000000..391b3d340 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/opaque/opaque.go @@ -0,0 +1,6 @@ +package opaque + +const ( + SocketPath = "builtin.socketpath" + ChildReadyPipePath = "builtin.readypipepath" +) diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/parent.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/parent.go new file mode 100644 index 000000000..893bf1da9 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/parent.go @@ -0,0 +1,145 @@ +package parent + +import ( + "context" + "io" + "io/ioutil" + "net" + "os" + "path/filepath" + "sync" + "syscall" + + "github.com/pkg/errors" + + "github.com/rootless-containers/rootlesskit/pkg/port" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/opaque" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/tcp" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp" + "github.com/rootless-containers/rootlesskit/pkg/port/portutil" +) + +// NewDriver for builtin driver. +func NewDriver(logWriter io.Writer, stateDir string) (port.ParentDriver, error) { + // TODO: consider using socketpair FD instead of socket file + socketPath := filepath.Join(stateDir, ".bp.sock") + childReadyPipePath := filepath.Join(stateDir, ".bp-ready.pipe") + // remove the path just in case the previous rootlesskit instance crashed + if err := os.RemoveAll(childReadyPipePath); err != nil { + return nil, errors.Wrapf(err, "cannot remove %s", childReadyPipePath) + } + if err := syscall.Mkfifo(childReadyPipePath, 0600); err != nil { + return nil, errors.Wrapf(err, "cannot mkfifo %s", childReadyPipePath) + } + d := driver{ + logWriter: logWriter, + socketPath: socketPath, + childReadyPipePath: childReadyPipePath, + ports: make(map[int]*port.Status, 0), + stoppers: make(map[int]func() error, 0), + nextID: 1, + } + return &d, nil +} + +type driver struct { + logWriter io.Writer + socketPath string + childReadyPipePath string + mu sync.Mutex + ports map[int]*port.Status + stoppers map[int]func() error + nextID int +} + +func (d *driver) OpaqueForChild() map[string]string { + return map[string]string{ + opaque.SocketPath: d.socketPath, + opaque.ChildReadyPipePath: d.childReadyPipePath, + } +} + +func (d *driver) RunParentDriver(initComplete chan struct{}, quit <-chan struct{}, _ *port.ChildContext) error { + childReadyPipeR, err := os.OpenFile(d.childReadyPipePath, os.O_RDONLY, os.ModeNamedPipe) + if err != nil { + return err + } + if _, err = ioutil.ReadAll(childReadyPipeR); err != nil { + return err + } + childReadyPipeR.Close() + var dialer net.Dialer + conn, err := dialer.Dial("unix", d.socketPath) + if err != nil { + return err + } + err = msg.Initiate(conn.(*net.UnixConn)) + conn.Close() + if err != nil { + return err + } + initComplete <- struct{}{} + <-quit + return nil +} + +func (d *driver) AddPort(ctx context.Context, spec port.Spec) (*port.Status, error) { + d.mu.Lock() + err := portutil.ValidatePortSpec(spec, d.ports) + d.mu.Unlock() + if err != nil { + return nil, err + } + routineStopCh := make(chan struct{}) + routineStop := func() error { + close(routineStopCh) + return nil // FIXME + } + switch spec.Proto { + case "tcp": + err = tcp.Run(d.socketPath, spec, routineStopCh, d.logWriter) + case "udp": + err = udp.Run(d.socketPath, spec, routineStopCh, d.logWriter) + default: + // NOTREACHED + return nil, errors.New("spec was not validated?") + } + if err != nil { + return nil, err + } + d.mu.Lock() + id := d.nextID + st := port.Status{ + ID: id, + Spec: spec, + } + d.ports[id] = &st + d.stoppers[id] = routineStop + d.nextID++ + d.mu.Unlock() + return &st, nil +} + +func (d *driver) ListPorts(ctx context.Context) ([]port.Status, error) { + var ports []port.Status + d.mu.Lock() + for _, p := range d.ports { + ports = append(ports, *p) + } + d.mu.Unlock() + return ports, nil +} + +func (d *driver) RemovePort(ctx context.Context, id int) error { + d.mu.Lock() + defer d.mu.Unlock() + stop, ok := d.stoppers[id] + if !ok { + return errors.Errorf("unknown id: %d", id) + } + err := stop() + delete(d.stoppers, id) + delete(d.ports, id) + return err +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/tcp/tcp.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/tcp/tcp.go new file mode 100644 index 000000000..b9f2d1802 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/tcp/tcp.go @@ -0,0 +1,104 @@ +package tcp + +import ( + "fmt" + "io" + "net" + "os" + "sync" + + "github.com/rootless-containers/rootlesskit/pkg/port" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg" +) + +func Run(socketPath string, spec port.Spec, stopCh <-chan struct{}, logWriter io.Writer) error { + ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", spec.ParentIP, spec.ParentPort)) + if err != nil { + fmt.Fprintf(logWriter, "listen: %v\n", err) + return err + } + newConns := make(chan net.Conn) + go func() { + for { + c, err := ln.Accept() + if err != nil { + fmt.Fprintf(logWriter, "accept: %v\n", err) + close(newConns) + return + } + newConns <- c + } + }() + go func() { + defer ln.Close() + for { + select { + case c, ok := <-newConns: + if !ok { + return + } + go func() { + if err := copyConnToChild(c, socketPath, spec, stopCh); err != nil { + fmt.Fprintf(logWriter, "copyConnToChild: %v\n", err) + return + } + }() + case <-stopCh: + return + } + } + }() + // no wait + return nil +} + +func copyConnToChild(c net.Conn, socketPath string, spec port.Spec, stopCh <-chan struct{}) error { + defer c.Close() + // get fd from the child as an SCM_RIGHTS cmsg + fd, err := msg.ConnectToChildWithRetry(socketPath, spec, 10) + if err != nil { + return err + } + f := os.NewFile(uintptr(fd), "") + defer f.Close() + fc, err := net.FileConn(f) + if err != nil { + return err + } + defer fc.Close() + bicopy(c, fc, stopCh) + return nil +} + +// bicopy is based on libnetwork/cmd/proxy/tcp_proxy.go . +// NOTE: sendfile(2) cannot be used for sockets +func bicopy(x, y net.Conn, quit <-chan struct{}) { + var wg sync.WaitGroup + var broker = func(to, from net.Conn) { + io.Copy(to, from) + if fromTCP, ok := from.(*net.TCPConn); ok { + fromTCP.CloseRead() + } + if toTCP, ok := to.(*net.TCPConn); ok { + toTCP.CloseWrite() + } + wg.Done() + } + + wg.Add(2) + go broker(x, y) + go broker(y, x) + finish := make(chan struct{}) + go func() { + wg.Wait() + close(finish) + }() + + select { + case <-quit: + case <-finish: + } + x.Close() + y.Close() + <-finish +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udp.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udp.go new file mode 100644 index 000000000..d8f646b5d --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udp.go @@ -0,0 +1,60 @@ +package udp + +import ( + "fmt" + "io" + "net" + "os" + + "github.com/pkg/errors" + + "github.com/rootless-containers/rootlesskit/pkg/port" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/msg" + "github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udpproxy" +) + +func Run(socketPath string, spec port.Spec, stopCh <-chan struct{}, logWriter io.Writer) error { + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", spec.ParentIP, spec.ParentPort)) + if err != nil { + return err + } + c, err := net.ListenUDP("udp", addr) + if err != nil { + return err + } + udpp := &udpproxy.UDPProxy{ + LogWriter: logWriter, + Listener: c, + BackendDial: func() (*net.UDPConn, error) { + // get fd from the child as an SCM_RIGHTS cmsg + fd, err := msg.ConnectToChildWithRetry(socketPath, spec, 10) + if err != nil { + return nil, err + } + f := os.NewFile(uintptr(fd), "") + defer f.Close() + fc, err := net.FileConn(f) + if err != nil { + return nil, err + } + uc, ok := fc.(*net.UDPConn) + if !ok { + return nil, errors.Errorf("file conn doesn't implement *net.UDPConn: %+v", fc) + } + return uc, nil + }, + } + go udpp.Run() + go func() { + for { + select { + case <-stopCh: + // udpp.Close closes ln as well + udpp.Close() + return + } + } + }() + // no wait + return nil +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udpproxy/udp_proxy.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udpproxy/udp_proxy.go new file mode 100644 index 000000000..af7b7d5d9 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/builtin/parent/udp/udpproxy/udp_proxy.go @@ -0,0 +1,150 @@ +// Package udpproxy is from https://raw.githubusercontent.com/docker/libnetwork/fec6476dfa21380bf8ee4d74048515d968c1ee63/cmd/proxy/udp_proxy.go +package udpproxy + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "strings" + "sync" + "syscall" + "time" +) + +const ( + // UDPConnTrackTimeout is the timeout used for UDP connection tracking + UDPConnTrackTimeout = 90 * time.Second + // UDPBufSize is the buffer size for the UDP proxy + UDPBufSize = 65507 +) + +// A net.Addr where the IP is split into two fields so you can use it as a key +// in a map: +type connTrackKey struct { + IPHigh uint64 + IPLow uint64 + Port int +} + +func newConnTrackKey(addr *net.UDPAddr) *connTrackKey { + if len(addr.IP) == net.IPv4len { + return &connTrackKey{ + IPHigh: 0, + IPLow: uint64(binary.BigEndian.Uint32(addr.IP)), + Port: addr.Port, + } + } + return &connTrackKey{ + IPHigh: binary.BigEndian.Uint64(addr.IP[:8]), + IPLow: binary.BigEndian.Uint64(addr.IP[8:]), + Port: addr.Port, + } +} + +type connTrackMap map[connTrackKey]*net.UDPConn + +// UDPProxy is proxy for which handles UDP datagrams. +// From libnetwork udp_proxy.go . +type UDPProxy struct { + LogWriter io.Writer + Listener *net.UDPConn + BackendDial func() (*net.UDPConn, error) + connTrackTable connTrackMap + connTrackLock sync.Mutex +} + +func (proxy *UDPProxy) replyLoop(proxyConn *net.UDPConn, clientAddr *net.UDPAddr, clientKey *connTrackKey) { + defer func() { + proxy.connTrackLock.Lock() + delete(proxy.connTrackTable, *clientKey) + proxy.connTrackLock.Unlock() + proxyConn.Close() + }() + + readBuf := make([]byte, UDPBufSize) + for { + proxyConn.SetReadDeadline(time.Now().Add(UDPConnTrackTimeout)) + again: + read, err := proxyConn.Read(readBuf) + if err != nil { + if err, ok := err.(*net.OpError); ok && err.Err == syscall.ECONNREFUSED { + // This will happen if the last write failed + // (e.g: nothing is actually listening on the + // proxied port on the container), ignore it + // and continue until UDPConnTrackTimeout + // expires: + goto again + } + return + } + for i := 0; i != read; { + written, err := proxy.Listener.WriteToUDP(readBuf[i:read], clientAddr) + if err != nil { + return + } + i += written + } + } +} + +// Run starts forwarding the traffic using UDP. +func (proxy *UDPProxy) Run() { + proxy.connTrackTable = make(connTrackMap) + readBuf := make([]byte, UDPBufSize) + for { + read, from, err := proxy.Listener.ReadFromUDP(readBuf) + if err != nil { + // NOTE: Apparently ReadFrom doesn't return + // ECONNREFUSED like Read do (see comment in + // UDPProxy.replyLoop) + if !isClosedError(err) { + fmt.Fprintf(proxy.LogWriter, "Stopping proxy on udp: %v\n", err) + } + break + } + + fromKey := newConnTrackKey(from) + proxy.connTrackLock.Lock() + proxyConn, hit := proxy.connTrackTable[*fromKey] + if !hit { + proxyConn, err = proxy.BackendDial() + if err != nil { + fmt.Fprintf(proxy.LogWriter, "Can't proxy a datagram to udp: %v\n", err) + proxy.connTrackLock.Unlock() + continue + } + proxy.connTrackTable[*fromKey] = proxyConn + go proxy.replyLoop(proxyConn, from, fromKey) + } + proxy.connTrackLock.Unlock() + for i := 0; i != read; { + written, err := proxyConn.Write(readBuf[i:read]) + if err != nil { + fmt.Fprintf(proxy.LogWriter, "Can't proxy a datagram to udp: %v\n", err) + break + } + i += written + } + } +} + +// Close stops forwarding the traffic. +func (proxy *UDPProxy) Close() { + proxy.Listener.Close() + proxy.connTrackLock.Lock() + defer proxy.connTrackLock.Unlock() + for _, conn := range proxy.connTrackTable { + conn.Close() + } +} + +func isClosedError(err error) bool { + /* This comparison is ugly, but unfortunately, net.go doesn't export errClosing. + * See: + * http://golang.org/src/pkg/net/net.go + * https://code.google.com/p/go/issues/detail?id=4337 + * https://groups.google.com/forum/#!msg/golang-nuts/0_aaCvBmOcM/SptmDyX1XJMJ + */ + return strings.HasSuffix(err.Error(), "use of closed network connection") +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/port.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/port.go new file mode 100644 index 000000000..9ef46f549 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/port.go @@ -0,0 +1,51 @@ +package port + +import ( + "context" + "net" +) + +type Spec struct { + Proto string `json:"proto,omitempty"` // either "tcp" or "udp". in future "sctp" will be supported as well. + ParentIP string `json:"parentIP,omitempty"` // IPv4 address. can be empty (0.0.0.0). + ParentPort int `json:"parentPort,omitempty"` + ChildPort int `json:"childPort,omitempty"` +} + +type Status struct { + ID int `json:"id"` + Spec Spec `json:"spec"` +} + +// Manager MUST be thread-safe. +type Manager interface { + AddPort(ctx context.Context, spec Spec) (*Status, error) + ListPorts(ctx context.Context) ([]Status, error) + RemovePort(ctx context.Context, id int) error +} + +// ChildContext is used for RunParentDriver +type ChildContext struct { + // PID of the child, can be used for ns-entering to the child namespaces. + PID int + // IP of the tap device + IP net.IP +} + +// ParentDriver is a driver for the parent process. +type ParentDriver interface { + Manager + // OpaqueForChild typically consists of socket path + // for controlling child from parent + OpaqueForChild() map[string]string + // RunParentDriver signals initComplete when ParentDriver is ready to + // serve as Manager. + // RunParentDriver blocks until quit is signaled. + // + // ChildContext is optional. + RunParentDriver(initComplete chan struct{}, quit <-chan struct{}, cctx *ChildContext) error +} + +type ChildDriver interface { + RunChildDriver(opaque map[string]string, quit <-chan struct{}) error +} diff --git a/vendor/github.com/rootless-containers/rootlesskit/pkg/port/portutil/portutil.go b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/portutil/portutil.go new file mode 100644 index 000000000..f1aa5f859 --- /dev/null +++ b/vendor/github.com/rootless-containers/rootlesskit/pkg/port/portutil/portutil.go @@ -0,0 +1,67 @@ +package portutil + +import ( + "net" + "regexp" + "strconv" + + "github.com/pkg/errors" + + "github.com/rootless-containers/rootlesskit/pkg/port" +) + +// ParsePortSpec parses a Docker-like representation of PortSpec. +// e.g. "127.0.0.1:8080:80/tcp" +func ParsePortSpec(s string) (*port.Spec, error) { + r := regexp.MustCompile("^([0-9a-f\\.]+):([0-9]+):([0-9]+)/([a-z]+)$") + g := r.FindStringSubmatch(s) + if len(g) != 5 { + return nil, errors.Errorf("unexpected PortSpec string: %q", s) + } + parentIP := g[1] + parentPort, err := strconv.Atoi(g[2]) + if err != nil { + return nil, errors.Wrapf(err, "unexpected ParentPort in PortSpec string: %q", s) + } + childPort, err := strconv.Atoi(g[3]) + if err != nil { + return nil, errors.Wrapf(err, "unexpected ChildPort in PortSpec string: %q", s) + } + proto := g[4] + // validation is up to the caller (as json.Unmarshal doesn't validate values) + return &port.Spec{ + Proto: proto, + ParentIP: parentIP, + ParentPort: parentPort, + ChildPort: childPort, + }, nil +} + +// ValidatePortSpec validates *port.Spec. +// existingPorts can be optionally passed for detecting conflicts. +func ValidatePortSpec(spec port.Spec, existingPorts map[int]*port.Status) error { + if spec.Proto != "tcp" && spec.Proto != "udp" { + return errors.Errorf("unknown proto: %q", spec.Proto) + } + if spec.ParentIP != "" { + if net.ParseIP(spec.ParentIP) == nil { + return errors.Errorf("invalid ParentIP: %q", spec.ParentIP) + } + } + if spec.ParentPort <= 0 || spec.ParentPort > 65535 { + return errors.Errorf("invalid ParentPort: %q", spec.ParentPort) + } + if spec.ChildPort <= 0 || spec.ChildPort > 65535 { + return errors.Errorf("invalid ChildPort: %q", spec.ChildPort) + } + for id, p := range existingPorts { + sp := p.Spec + sameProto := sp.Proto == spec.Proto + sameParent := sp.ParentIP == spec.ParentIP && sp.ParentPort == spec.ParentPort + sameChild := sp.ChildPort == spec.ChildPort + if sameProto && (sameParent || sameChild) { + return errors.Errorf("conflict with ID %d", id) + } + } + return nil +} -- cgit v1.2.3-54-g00ecf