summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward
diff options
context:
space:
mode:
authorMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
committerMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
commita031b83a09a8628435317a03f199cdc18b78262f (patch)
treebc017a96769ce6de33745b8b0b1304ccf38e9df0 /vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward
parent2b74391cd5281f6fdf391ff8ad50fd1490f6bf89 (diff)
downloadpodman-a031b83a09a8628435317a03f199cdc18b78262f.tar.gz
podman-a031b83a09a8628435317a03f199cdc18b78262f.tar.bz2
podman-a031b83a09a8628435317a03f199cdc18b78262f.zip
Initial checkin from CRI-O repo
Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward')
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go23
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go309
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go53
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go198
4 files changed, 583 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go
new file mode 100644
index 000000000..e7ccd58ae
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go
@@ -0,0 +1,23 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// package portforward contains server-side logic for handling port forwarding requests.
+package portforward
+
+// The subprotocol "portforward.k8s.io" is used for port forwarding.
+const ProtocolV1Name = "portforward.k8s.io"
+
+var SupportedProtocols = []string{ProtocolV1Name}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go
new file mode 100644
index 000000000..5f872c820
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go
@@ -0,0 +1,309 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package portforward
+
+import (
+ "errors"
+ "fmt"
+ "net/http"
+ "strconv"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/httpstream"
+ "k8s.io/apimachinery/pkg/util/httpstream/spdy"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/kubernetes/pkg/api"
+
+ "github.com/golang/glog"
+)
+
+func handleHttpStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
+ _, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
+ // negotiated protocol isn't currently used server side, but could be in the future
+ if err != nil {
+ // Handshake writes the error to the client
+ return err
+ }
+ streamChan := make(chan httpstream.Stream, 1)
+
+ glog.V(5).Infof("Upgrading port forward response")
+ upgrader := spdy.NewResponseUpgrader()
+ conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan))
+ if conn == nil {
+ return errors.New("Unable to upgrade websocket connection")
+ }
+ defer conn.Close()
+
+ glog.V(5).Infof("(conn=%p) setting port forwarding streaming connection idle timeout to %v", conn, idleTimeout)
+ conn.SetIdleTimeout(idleTimeout)
+
+ h := &httpStreamHandler{
+ conn: conn,
+ streamChan: streamChan,
+ streamPairs: make(map[string]*httpStreamPair),
+ streamCreationTimeout: streamCreationTimeout,
+ pod: podName,
+ uid: uid,
+ forwarder: portForwarder,
+ }
+ h.run()
+
+ return nil
+}
+
+// httpStreamReceived is the httpstream.NewStreamHandler for port
+// forward streams. It checks each stream's port and stream type headers,
+// rejecting any streams that with missing or invalid values. Each valid
+// stream is sent to the streams channel.
+func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
+ return func(stream httpstream.Stream, replySent <-chan struct{}) error {
+ // make sure it has a valid port header
+ portString := stream.Headers().Get(api.PortHeader)
+ if len(portString) == 0 {
+ return fmt.Errorf("%q header is required", api.PortHeader)
+ }
+ port, err := strconv.ParseUint(portString, 10, 16)
+ if err != nil {
+ return fmt.Errorf("unable to parse %q as a port: %v", portString, err)
+ }
+ if port < 1 {
+ return fmt.Errorf("port %q must be > 0", portString)
+ }
+
+ // make sure it has a valid stream type header
+ streamType := stream.Headers().Get(api.StreamType)
+ if len(streamType) == 0 {
+ return fmt.Errorf("%q header is required", api.StreamType)
+ }
+ if streamType != api.StreamTypeError && streamType != api.StreamTypeData {
+ return fmt.Errorf("invalid stream type %q", streamType)
+ }
+
+ streams <- stream
+ return nil
+ }
+}
+
+// httpStreamHandler is capable of processing multiple port forward
+// requests over a single httpstream.Connection.
+type httpStreamHandler struct {
+ conn httpstream.Connection
+ streamChan chan httpstream.Stream
+ streamPairsLock sync.RWMutex
+ streamPairs map[string]*httpStreamPair
+ streamCreationTimeout time.Duration
+ pod string
+ uid types.UID
+ forwarder PortForwarder
+}
+
+// getStreamPair returns a httpStreamPair for requestID. This creates a
+// new pair if one does not yet exist for the requestID. The returned bool is
+// true if the pair was created.
+func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) {
+ h.streamPairsLock.Lock()
+ defer h.streamPairsLock.Unlock()
+
+ if p, ok := h.streamPairs[requestID]; ok {
+ glog.V(5).Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
+ return p, false
+ }
+
+ glog.V(5).Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
+
+ p := newPortForwardPair(requestID)
+ h.streamPairs[requestID] = p
+
+ return p, true
+}
+
+// monitorStreamPair waits for the pair to receive both its error and data
+// streams, or for the timeout to expire (whichever happens first), and then
+// removes the pair.
+func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) {
+ select {
+ case <-timeout:
+ err := fmt.Errorf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
+ utilruntime.HandleError(err)
+ p.printError(err.Error())
+ case <-p.complete:
+ glog.V(5).Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
+ }
+ h.removeStreamPair(p.requestID)
+}
+
+// hasStreamPair returns a bool indicating if a stream pair for requestID
+// exists.
+func (h *httpStreamHandler) hasStreamPair(requestID string) bool {
+ h.streamPairsLock.RLock()
+ defer h.streamPairsLock.RUnlock()
+
+ _, ok := h.streamPairs[requestID]
+ return ok
+}
+
+// removeStreamPair removes the stream pair identified by requestID from streamPairs.
+func (h *httpStreamHandler) removeStreamPair(requestID string) {
+ h.streamPairsLock.Lock()
+ defer h.streamPairsLock.Unlock()
+
+ delete(h.streamPairs, requestID)
+}
+
+// requestID returns the request id for stream.
+func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
+ requestID := stream.Headers().Get(api.PortForwardRequestIDHeader)
+ if len(requestID) == 0 {
+ glog.V(5).Infof("(conn=%p) stream received without %s header", h.conn, api.PortForwardRequestIDHeader)
+ // If we get here, it's because the connection came from an older client
+ // that isn't generating the request id header
+ // (https://github.com/kubernetes/kubernetes/blob/843134885e7e0b360eb5441e85b1410a8b1a7a0c/pkg/client/unversioned/portforward/portforward.go#L258-L287)
+ //
+ // This is a best-effort attempt at supporting older clients.
+ //
+ // When there aren't concurrent new forwarded connections, each connection
+ // will have a pair of streams (data, error), and the stream IDs will be
+ // consecutive odd numbers, e.g. 1 and 3 for the first connection. Convert
+ // the stream ID into a pseudo-request id by taking the stream type and
+ // using id = stream.Identifier() when the stream type is error,
+ // and id = stream.Identifier() - 2 when it's data.
+ //
+ // NOTE: this only works when there are not concurrent new streams from
+ // multiple forwarded connections; it's a best-effort attempt at supporting
+ // old clients that don't generate request ids. If there are concurrent
+ // new connections, it's possible that 1 connection gets streams whose IDs
+ // are not consecutive (e.g. 5 and 9 instead of 5 and 7).
+ streamType := stream.Headers().Get(api.StreamType)
+ switch streamType {
+ case api.StreamTypeError:
+ requestID = strconv.Itoa(int(stream.Identifier()))
+ case api.StreamTypeData:
+ requestID = strconv.Itoa(int(stream.Identifier()) - 2)
+ }
+
+ glog.V(5).Infof("(conn=%p) automatically assigning request ID=%q from stream type=%s, stream ID=%d", h.conn, requestID, streamType, stream.Identifier())
+ }
+ return requestID
+}
+
+// run is the main loop for the httpStreamHandler. It processes new
+// streams, invoking portForward for each complete stream pair. The loop exits
+// when the httpstream.Connection is closed.
+func (h *httpStreamHandler) run() {
+ glog.V(5).Infof("(conn=%p) waiting for port forward streams", h.conn)
+Loop:
+ for {
+ select {
+ case <-h.conn.CloseChan():
+ glog.V(5).Infof("(conn=%p) upgraded connection closed", h.conn)
+ break Loop
+ case stream := <-h.streamChan:
+ requestID := h.requestID(stream)
+ streamType := stream.Headers().Get(api.StreamType)
+ glog.V(5).Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
+
+ p, created := h.getStreamPair(requestID)
+ if created {
+ go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
+ }
+ if complete, err := p.add(stream); err != nil {
+ msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
+ utilruntime.HandleError(errors.New(msg))
+ p.printError(msg)
+ } else if complete {
+ go h.portForward(p)
+ }
+ }
+ }
+}
+
+// portForward invokes the httpStreamHandler's forwarder.PortForward
+// function for the given stream pair.
+func (h *httpStreamHandler) portForward(p *httpStreamPair) {
+ defer p.dataStream.Close()
+ defer p.errorStream.Close()
+
+ portString := p.dataStream.Headers().Get(api.PortHeader)
+ port, _ := strconv.ParseInt(portString, 10, 32)
+
+ glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
+ err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream)
+ glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
+
+ if err != nil {
+ msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
+ utilruntime.HandleError(msg)
+ fmt.Fprint(p.errorStream, msg.Error())
+ }
+}
+
+// httpStreamPair represents the error and data streams for a port
+// forwarding request.
+type httpStreamPair struct {
+ lock sync.RWMutex
+ requestID string
+ dataStream httpstream.Stream
+ errorStream httpstream.Stream
+ complete chan struct{}
+}
+
+// newPortForwardPair creates a new httpStreamPair.
+func newPortForwardPair(requestID string) *httpStreamPair {
+ return &httpStreamPair{
+ requestID: requestID,
+ complete: make(chan struct{}),
+ }
+}
+
+// add adds the stream to the httpStreamPair. If the pair already
+// contains a stream for the new stream's type, an error is returned. add
+// returns true if both the data and error streams for this pair have been
+// received.
+func (p *httpStreamPair) add(stream httpstream.Stream) (bool, error) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ switch stream.Headers().Get(api.StreamType) {
+ case api.StreamTypeError:
+ if p.errorStream != nil {
+ return false, errors.New("error stream already assigned")
+ }
+ p.errorStream = stream
+ case api.StreamTypeData:
+ if p.dataStream != nil {
+ return false, errors.New("data stream already assigned")
+ }
+ p.dataStream = stream
+ }
+
+ complete := p.errorStream != nil && p.dataStream != nil
+ if complete {
+ close(p.complete)
+ }
+ return complete, nil
+}
+
+// printError writes s to p.errorStream if p.errorStream has been set.
+func (p *httpStreamPair) printError(s string) {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ if p.errorStream != nil {
+ fmt.Fprint(p.errorStream, s)
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go
new file mode 100644
index 000000000..60a96e51a
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package portforward
+
+import (
+ "io"
+ "net/http"
+ "time"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apiserver/pkg/util/wsstream"
+)
+
+// PortForwarder knows how to forward content from a data stream to/from a port
+// in a pod.
+type PortForwarder interface {
+ // PortForwarder copies data between a data stream and a port in a pod.
+ PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
+}
+
+// ServePortForward handles a port forwarding request. A single request is
+// kept alive as long as the client is still alive and the connection has not
+// been timed out due to idleness. This function handles multiple forwarded
+// connections; i.e., multiple `curl http://localhost:8888/` requests will be
+// handled by a single invocation of ServePortForward.
+func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
+ var err error
+ if wsstream.IsWebSocketRequest(req) {
+ err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
+ } else {
+ err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
+ }
+
+ if err != nil {
+ runtime.HandleError(err)
+ return
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go
new file mode 100644
index 000000000..22d5add06
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go
@@ -0,0 +1,198 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package portforward
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apiserver/pkg/server/httplog"
+ "k8s.io/apiserver/pkg/util/wsstream"
+ "k8s.io/kubernetes/pkg/api"
+)
+
+const (
+ dataChannel = iota
+ errorChannel
+
+ v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
+ v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
+)
+
+// options contains details about which streams are required for
+// port forwarding.
+// All fields incldued in V4Options need to be expressed explicilty in the
+// CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest.
+type V4Options struct {
+ Ports []int32
+}
+
+// newOptions creates a new options from the Request.
+func NewV4Options(req *http.Request) (*V4Options, error) {
+ if !wsstream.IsWebSocketRequest(req) {
+ return &V4Options{}, nil
+ }
+
+ portStrings := req.URL.Query()[api.PortHeader]
+ if len(portStrings) == 0 {
+ return nil, fmt.Errorf("query parameter %q is required", api.PortHeader)
+ }
+
+ ports := make([]int32, 0, len(portStrings))
+ for _, portString := range portStrings {
+ if len(portString) == 0 {
+ return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader)
+ }
+ for _, p := range strings.Split(portString, ",") {
+ port, err := strconv.ParseUint(p, 10, 16)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse %q as a port: %v", portString, err)
+ }
+ if port < 1 {
+ return nil, fmt.Errorf("port %q must be > 0", portString)
+ }
+ ports = append(ports, int32(port))
+ }
+ }
+
+ return &V4Options{
+ Ports: ports,
+ }, nil
+}
+
+// BuildV4Options returns a V4Options based on the given information.
+func BuildV4Options(ports []int32) (*V4Options, error) {
+ return &V4Options{Ports: ports}, nil
+}
+
+// handleWebSocketStreams handles requests to forward ports to a pod via
+// a PortForwarder. A pair of streams are created per port (DATA n,
+// ERROR n+1). The associated port is written to each stream as a unsigned 16
+// bit integer in little endian format.
+func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
+ channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2)
+ for i := 0; i < len(opts.Ports); i++ {
+ channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel)
+ }
+ conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
+ "": {
+ Binary: true,
+ Channels: channels,
+ },
+ v4BinaryWebsocketProtocol: {
+ Binary: true,
+ Channels: channels,
+ },
+ v4Base64WebsocketProtocol: {
+ Binary: false,
+ Channels: channels,
+ },
+ })
+ conn.SetIdleTimeout(idleTimeout)
+ _, streams, err := conn.Open(httplog.Unlogged(w), req)
+ if err != nil {
+ err = fmt.Errorf("Unable to upgrade websocket connection: %v", err)
+ return err
+ }
+ defer conn.Close()
+ streamPairs := make([]*websocketStreamPair, len(opts.Ports))
+ for i := range streamPairs {
+ streamPair := websocketStreamPair{
+ port: opts.Ports[i],
+ dataStream: streams[i*2+dataChannel],
+ errorStream: streams[i*2+errorChannel],
+ }
+ streamPairs[i] = &streamPair
+
+ portBytes := make([]byte, 2)
+ // port is always positive so conversion is allowable
+ binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port))
+ streamPair.dataStream.Write(portBytes)
+ streamPair.errorStream.Write(portBytes)
+ }
+ h := &websocketStreamHandler{
+ conn: conn,
+ streamPairs: streamPairs,
+ pod: podName,
+ uid: uid,
+ forwarder: portForwarder,
+ }
+ h.run()
+
+ return nil
+}
+
+// websocketStreamPair represents the error and data streams for a port
+// forwarding request.
+type websocketStreamPair struct {
+ port int32
+ dataStream io.ReadWriteCloser
+ errorStream io.WriteCloser
+}
+
+// websocketStreamHandler is capable of processing a single port forward
+// request over a websocket connection
+type websocketStreamHandler struct {
+ conn *wsstream.Conn
+ ports []int32
+ streamPairs []*websocketStreamPair
+ pod string
+ uid types.UID
+ forwarder PortForwarder
+}
+
+// run invokes the websocketStreamHandler's forwarder.PortForward
+// function for the given stream pair.
+func (h *websocketStreamHandler) run() {
+ wg := sync.WaitGroup{}
+ wg.Add(len(h.streamPairs))
+
+ for _, pair := range h.streamPairs {
+ p := pair
+ go func() {
+ defer wg.Done()
+ h.portForward(p)
+ }()
+ }
+
+ wg.Wait()
+}
+
+func (h *websocketStreamHandler) portForward(p *websocketStreamPair) {
+ defer p.dataStream.Close()
+ defer p.errorStream.Close()
+
+ glog.V(5).Infof("(conn=%p) invoking forwarder.PortForward for port %d", h.conn, p.port)
+ err := h.forwarder.PortForward(h.pod, h.uid, p.port, p.dataStream)
+ glog.V(5).Infof("(conn=%p) done invoking forwarder.PortForward for port %d", h.conn, p.port)
+
+ if err != nil {
+ msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", p.port, h.pod, h.uid, err)
+ runtime.HandleError(msg)
+ fmt.Fprint(p.errorStream, msg.Error())
+ }
+}