aboutsummaryrefslogtreecommitdiff
path: root/vendor/k8s.io/kubernetes/pkg/kubelet/server
diff options
context:
space:
mode:
authorMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
committerMatthew Heon <matthew.heon@gmail.com>2017-11-01 11:24:59 -0400
commita031b83a09a8628435317a03f199cdc18b78262f (patch)
treebc017a96769ce6de33745b8b0b1304ccf38e9df0 /vendor/k8s.io/kubernetes/pkg/kubelet/server
parent2b74391cd5281f6fdf391ff8ad50fd1490f6bf89 (diff)
downloadpodman-a031b83a09a8628435317a03f199cdc18b78262f.tar.gz
podman-a031b83a09a8628435317a03f199cdc18b78262f.tar.bz2
podman-a031b83a09a8628435317a03f199cdc18b78262f.zip
Initial checkin from CRI-O repo
Signed-off-by: Matthew Heon <matthew.heon@gmail.com>
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg/kubelet/server')
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go23
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go309
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go53
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go198
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go59
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go18
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go79
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go447
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go132
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go55
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go146
-rw-r--r--vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go344
12 files changed, 1863 insertions, 0 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go
new file mode 100644
index 000000000..e7ccd58ae
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go
@@ -0,0 +1,23 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// package portforward contains server-side logic for handling port forwarding requests.
+package portforward
+
+// The subprotocol "portforward.k8s.io" is used for port forwarding.
+const ProtocolV1Name = "portforward.k8s.io"
+
+var SupportedProtocols = []string{ProtocolV1Name}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go
new file mode 100644
index 000000000..5f872c820
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go
@@ -0,0 +1,309 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package portforward
+
+import (
+ "errors"
+ "fmt"
+ "net/http"
+ "strconv"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/httpstream"
+ "k8s.io/apimachinery/pkg/util/httpstream/spdy"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/kubernetes/pkg/api"
+
+ "github.com/golang/glog"
+)
+
+func handleHttpStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
+ _, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
+ // negotiated protocol isn't currently used server side, but could be in the future
+ if err != nil {
+ // Handshake writes the error to the client
+ return err
+ }
+ streamChan := make(chan httpstream.Stream, 1)
+
+ glog.V(5).Infof("Upgrading port forward response")
+ upgrader := spdy.NewResponseUpgrader()
+ conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan))
+ if conn == nil {
+ return errors.New("Unable to upgrade websocket connection")
+ }
+ defer conn.Close()
+
+ glog.V(5).Infof("(conn=%p) setting port forwarding streaming connection idle timeout to %v", conn, idleTimeout)
+ conn.SetIdleTimeout(idleTimeout)
+
+ h := &httpStreamHandler{
+ conn: conn,
+ streamChan: streamChan,
+ streamPairs: make(map[string]*httpStreamPair),
+ streamCreationTimeout: streamCreationTimeout,
+ pod: podName,
+ uid: uid,
+ forwarder: portForwarder,
+ }
+ h.run()
+
+ return nil
+}
+
+// httpStreamReceived is the httpstream.NewStreamHandler for port
+// forward streams. It checks each stream's port and stream type headers,
+// rejecting any streams that with missing or invalid values. Each valid
+// stream is sent to the streams channel.
+func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
+ return func(stream httpstream.Stream, replySent <-chan struct{}) error {
+ // make sure it has a valid port header
+ portString := stream.Headers().Get(api.PortHeader)
+ if len(portString) == 0 {
+ return fmt.Errorf("%q header is required", api.PortHeader)
+ }
+ port, err := strconv.ParseUint(portString, 10, 16)
+ if err != nil {
+ return fmt.Errorf("unable to parse %q as a port: %v", portString, err)
+ }
+ if port < 1 {
+ return fmt.Errorf("port %q must be > 0", portString)
+ }
+
+ // make sure it has a valid stream type header
+ streamType := stream.Headers().Get(api.StreamType)
+ if len(streamType) == 0 {
+ return fmt.Errorf("%q header is required", api.StreamType)
+ }
+ if streamType != api.StreamTypeError && streamType != api.StreamTypeData {
+ return fmt.Errorf("invalid stream type %q", streamType)
+ }
+
+ streams <- stream
+ return nil
+ }
+}
+
+// httpStreamHandler is capable of processing multiple port forward
+// requests over a single httpstream.Connection.
+type httpStreamHandler struct {
+ conn httpstream.Connection
+ streamChan chan httpstream.Stream
+ streamPairsLock sync.RWMutex
+ streamPairs map[string]*httpStreamPair
+ streamCreationTimeout time.Duration
+ pod string
+ uid types.UID
+ forwarder PortForwarder
+}
+
+// getStreamPair returns a httpStreamPair for requestID. This creates a
+// new pair if one does not yet exist for the requestID. The returned bool is
+// true if the pair was created.
+func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) {
+ h.streamPairsLock.Lock()
+ defer h.streamPairsLock.Unlock()
+
+ if p, ok := h.streamPairs[requestID]; ok {
+ glog.V(5).Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
+ return p, false
+ }
+
+ glog.V(5).Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
+
+ p := newPortForwardPair(requestID)
+ h.streamPairs[requestID] = p
+
+ return p, true
+}
+
+// monitorStreamPair waits for the pair to receive both its error and data
+// streams, or for the timeout to expire (whichever happens first), and then
+// removes the pair.
+func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) {
+ select {
+ case <-timeout:
+ err := fmt.Errorf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
+ utilruntime.HandleError(err)
+ p.printError(err.Error())
+ case <-p.complete:
+ glog.V(5).Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
+ }
+ h.removeStreamPair(p.requestID)
+}
+
+// hasStreamPair returns a bool indicating if a stream pair for requestID
+// exists.
+func (h *httpStreamHandler) hasStreamPair(requestID string) bool {
+ h.streamPairsLock.RLock()
+ defer h.streamPairsLock.RUnlock()
+
+ _, ok := h.streamPairs[requestID]
+ return ok
+}
+
+// removeStreamPair removes the stream pair identified by requestID from streamPairs.
+func (h *httpStreamHandler) removeStreamPair(requestID string) {
+ h.streamPairsLock.Lock()
+ defer h.streamPairsLock.Unlock()
+
+ delete(h.streamPairs, requestID)
+}
+
+// requestID returns the request id for stream.
+func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
+ requestID := stream.Headers().Get(api.PortForwardRequestIDHeader)
+ if len(requestID) == 0 {
+ glog.V(5).Infof("(conn=%p) stream received without %s header", h.conn, api.PortForwardRequestIDHeader)
+ // If we get here, it's because the connection came from an older client
+ // that isn't generating the request id header
+ // (https://github.com/kubernetes/kubernetes/blob/843134885e7e0b360eb5441e85b1410a8b1a7a0c/pkg/client/unversioned/portforward/portforward.go#L258-L287)
+ //
+ // This is a best-effort attempt at supporting older clients.
+ //
+ // When there aren't concurrent new forwarded connections, each connection
+ // will have a pair of streams (data, error), and the stream IDs will be
+ // consecutive odd numbers, e.g. 1 and 3 for the first connection. Convert
+ // the stream ID into a pseudo-request id by taking the stream type and
+ // using id = stream.Identifier() when the stream type is error,
+ // and id = stream.Identifier() - 2 when it's data.
+ //
+ // NOTE: this only works when there are not concurrent new streams from
+ // multiple forwarded connections; it's a best-effort attempt at supporting
+ // old clients that don't generate request ids. If there are concurrent
+ // new connections, it's possible that 1 connection gets streams whose IDs
+ // are not consecutive (e.g. 5 and 9 instead of 5 and 7).
+ streamType := stream.Headers().Get(api.StreamType)
+ switch streamType {
+ case api.StreamTypeError:
+ requestID = strconv.Itoa(int(stream.Identifier()))
+ case api.StreamTypeData:
+ requestID = strconv.Itoa(int(stream.Identifier()) - 2)
+ }
+
+ glog.V(5).Infof("(conn=%p) automatically assigning request ID=%q from stream type=%s, stream ID=%d", h.conn, requestID, streamType, stream.Identifier())
+ }
+ return requestID
+}
+
+// run is the main loop for the httpStreamHandler. It processes new
+// streams, invoking portForward for each complete stream pair. The loop exits
+// when the httpstream.Connection is closed.
+func (h *httpStreamHandler) run() {
+ glog.V(5).Infof("(conn=%p) waiting for port forward streams", h.conn)
+Loop:
+ for {
+ select {
+ case <-h.conn.CloseChan():
+ glog.V(5).Infof("(conn=%p) upgraded connection closed", h.conn)
+ break Loop
+ case stream := <-h.streamChan:
+ requestID := h.requestID(stream)
+ streamType := stream.Headers().Get(api.StreamType)
+ glog.V(5).Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
+
+ p, created := h.getStreamPair(requestID)
+ if created {
+ go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
+ }
+ if complete, err := p.add(stream); err != nil {
+ msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
+ utilruntime.HandleError(errors.New(msg))
+ p.printError(msg)
+ } else if complete {
+ go h.portForward(p)
+ }
+ }
+ }
+}
+
+// portForward invokes the httpStreamHandler's forwarder.PortForward
+// function for the given stream pair.
+func (h *httpStreamHandler) portForward(p *httpStreamPair) {
+ defer p.dataStream.Close()
+ defer p.errorStream.Close()
+
+ portString := p.dataStream.Headers().Get(api.PortHeader)
+ port, _ := strconv.ParseInt(portString, 10, 32)
+
+ glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
+ err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream)
+ glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
+
+ if err != nil {
+ msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
+ utilruntime.HandleError(msg)
+ fmt.Fprint(p.errorStream, msg.Error())
+ }
+}
+
+// httpStreamPair represents the error and data streams for a port
+// forwarding request.
+type httpStreamPair struct {
+ lock sync.RWMutex
+ requestID string
+ dataStream httpstream.Stream
+ errorStream httpstream.Stream
+ complete chan struct{}
+}
+
+// newPortForwardPair creates a new httpStreamPair.
+func newPortForwardPair(requestID string) *httpStreamPair {
+ return &httpStreamPair{
+ requestID: requestID,
+ complete: make(chan struct{}),
+ }
+}
+
+// add adds the stream to the httpStreamPair. If the pair already
+// contains a stream for the new stream's type, an error is returned. add
+// returns true if both the data and error streams for this pair have been
+// received.
+func (p *httpStreamPair) add(stream httpstream.Stream) (bool, error) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ switch stream.Headers().Get(api.StreamType) {
+ case api.StreamTypeError:
+ if p.errorStream != nil {
+ return false, errors.New("error stream already assigned")
+ }
+ p.errorStream = stream
+ case api.StreamTypeData:
+ if p.dataStream != nil {
+ return false, errors.New("data stream already assigned")
+ }
+ p.dataStream = stream
+ }
+
+ complete := p.errorStream != nil && p.dataStream != nil
+ if complete {
+ close(p.complete)
+ }
+ return complete, nil
+}
+
+// printError writes s to p.errorStream if p.errorStream has been set.
+func (p *httpStreamPair) printError(s string) {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+ if p.errorStream != nil {
+ fmt.Fprint(p.errorStream, s)
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go
new file mode 100644
index 000000000..60a96e51a
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package portforward
+
+import (
+ "io"
+ "net/http"
+ "time"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apiserver/pkg/util/wsstream"
+)
+
+// PortForwarder knows how to forward content from a data stream to/from a port
+// in a pod.
+type PortForwarder interface {
+ // PortForwarder copies data between a data stream and a port in a pod.
+ PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
+}
+
+// ServePortForward handles a port forwarding request. A single request is
+// kept alive as long as the client is still alive and the connection has not
+// been timed out due to idleness. This function handles multiple forwarded
+// connections; i.e., multiple `curl http://localhost:8888/` requests will be
+// handled by a single invocation of ServePortForward.
+func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
+ var err error
+ if wsstream.IsWebSocketRequest(req) {
+ err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
+ } else {
+ err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
+ }
+
+ if err != nil {
+ runtime.HandleError(err)
+ return
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go
new file mode 100644
index 000000000..22d5add06
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go
@@ -0,0 +1,198 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package portforward
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/glog"
+
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apiserver/pkg/server/httplog"
+ "k8s.io/apiserver/pkg/util/wsstream"
+ "k8s.io/kubernetes/pkg/api"
+)
+
+const (
+ dataChannel = iota
+ errorChannel
+
+ v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
+ v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
+)
+
+// options contains details about which streams are required for
+// port forwarding.
+// All fields incldued in V4Options need to be expressed explicilty in the
+// CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest.
+type V4Options struct {
+ Ports []int32
+}
+
+// newOptions creates a new options from the Request.
+func NewV4Options(req *http.Request) (*V4Options, error) {
+ if !wsstream.IsWebSocketRequest(req) {
+ return &V4Options{}, nil
+ }
+
+ portStrings := req.URL.Query()[api.PortHeader]
+ if len(portStrings) == 0 {
+ return nil, fmt.Errorf("query parameter %q is required", api.PortHeader)
+ }
+
+ ports := make([]int32, 0, len(portStrings))
+ for _, portString := range portStrings {
+ if len(portString) == 0 {
+ return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader)
+ }
+ for _, p := range strings.Split(portString, ",") {
+ port, err := strconv.ParseUint(p, 10, 16)
+ if err != nil {
+ return nil, fmt.Errorf("unable to parse %q as a port: %v", portString, err)
+ }
+ if port < 1 {
+ return nil, fmt.Errorf("port %q must be > 0", portString)
+ }
+ ports = append(ports, int32(port))
+ }
+ }
+
+ return &V4Options{
+ Ports: ports,
+ }, nil
+}
+
+// BuildV4Options returns a V4Options based on the given information.
+func BuildV4Options(ports []int32) (*V4Options, error) {
+ return &V4Options{Ports: ports}, nil
+}
+
+// handleWebSocketStreams handles requests to forward ports to a pod via
+// a PortForwarder. A pair of streams are created per port (DATA n,
+// ERROR n+1). The associated port is written to each stream as a unsigned 16
+// bit integer in little endian format.
+func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
+ channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2)
+ for i := 0; i < len(opts.Ports); i++ {
+ channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel)
+ }
+ conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
+ "": {
+ Binary: true,
+ Channels: channels,
+ },
+ v4BinaryWebsocketProtocol: {
+ Binary: true,
+ Channels: channels,
+ },
+ v4Base64WebsocketProtocol: {
+ Binary: false,
+ Channels: channels,
+ },
+ })
+ conn.SetIdleTimeout(idleTimeout)
+ _, streams, err := conn.Open(httplog.Unlogged(w), req)
+ if err != nil {
+ err = fmt.Errorf("Unable to upgrade websocket connection: %v", err)
+ return err
+ }
+ defer conn.Close()
+ streamPairs := make([]*websocketStreamPair, len(opts.Ports))
+ for i := range streamPairs {
+ streamPair := websocketStreamPair{
+ port: opts.Ports[i],
+ dataStream: streams[i*2+dataChannel],
+ errorStream: streams[i*2+errorChannel],
+ }
+ streamPairs[i] = &streamPair
+
+ portBytes := make([]byte, 2)
+ // port is always positive so conversion is allowable
+ binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port))
+ streamPair.dataStream.Write(portBytes)
+ streamPair.errorStream.Write(portBytes)
+ }
+ h := &websocketStreamHandler{
+ conn: conn,
+ streamPairs: streamPairs,
+ pod: podName,
+ uid: uid,
+ forwarder: portForwarder,
+ }
+ h.run()
+
+ return nil
+}
+
+// websocketStreamPair represents the error and data streams for a port
+// forwarding request.
+type websocketStreamPair struct {
+ port int32
+ dataStream io.ReadWriteCloser
+ errorStream io.WriteCloser
+}
+
+// websocketStreamHandler is capable of processing a single port forward
+// request over a websocket connection
+type websocketStreamHandler struct {
+ conn *wsstream.Conn
+ ports []int32
+ streamPairs []*websocketStreamPair
+ pod string
+ uid types.UID
+ forwarder PortForwarder
+}
+
+// run invokes the websocketStreamHandler's forwarder.PortForward
+// function for the given stream pair.
+func (h *websocketStreamHandler) run() {
+ wg := sync.WaitGroup{}
+ wg.Add(len(h.streamPairs))
+
+ for _, pair := range h.streamPairs {
+ p := pair
+ go func() {
+ defer wg.Done()
+ h.portForward(p)
+ }()
+ }
+
+ wg.Wait()
+}
+
+func (h *websocketStreamHandler) portForward(p *websocketStreamPair) {
+ defer p.dataStream.Close()
+ defer p.errorStream.Close()
+
+ glog.V(5).Infof("(conn=%p) invoking forwarder.PortForward for port %d", h.conn, p.port)
+ err := h.forwarder.PortForward(h.pod, h.uid, p.port, p.dataStream)
+ glog.V(5).Infof("(conn=%p) done invoking forwarder.PortForward for port %d", h.conn, p.port)
+
+ if err != nil {
+ msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", p.port, h.pod, h.uid, err)
+ runtime.HandleError(msg)
+ fmt.Fprint(p.errorStream, msg.Error())
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go
new file mode 100644
index 000000000..e266f34fe
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go
@@ -0,0 +1,59 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/client-go/tools/remotecommand"
+)
+
+// Attacher knows how to attach to a running container in a pod.
+type Attacher interface {
+ // AttachContainer attaches to the running container in the pod, copying data between in/out/err
+ // and the container's stdin/stdout/stderr.
+ AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
+}
+
+// ServeAttach handles requests to attach to a container. After creating/receiving the required
+// streams, it delegates the actual attaching to attacher.
+func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
+ ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
+ if !ok {
+ // error is handled by createStreams
+ return
+ }
+ defer ctx.conn.Close()
+
+ err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
+ if err != nil {
+ err = fmt.Errorf("error attaching to container: %v", err)
+ runtime.HandleError(err)
+ ctx.writeStatus(apierrors.NewInternalError(err))
+ } else {
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
+ Status: metav1.StatusSuccess,
+ }})
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go
new file mode 100644
index 000000000..24f9393ab
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// package remotecommand contains functions related to executing commands in and attaching to pods.
+package remotecommand // import "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go
new file mode 100644
index 000000000..8d14a937a
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go
@@ -0,0 +1,79 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/client-go/tools/remotecommand"
+ utilexec "k8s.io/kubernetes/pkg/util/exec"
+)
+
+// Executor knows how to execute a command in a container in a pod.
+type Executor interface {
+ // ExecInContainer executes a command in a container in the pod, copying data
+ // between in/out/err and the container's stdin/stdout/stderr.
+ ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
+}
+
+// ServeExec handles requests to execute a command in a container. After
+// creating/receiving the required streams, it delegates the actual execution
+// to the executor.
+func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
+ ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
+ if !ok {
+ // error is handled by createStreams
+ return
+ }
+ defer ctx.conn.Close()
+
+ err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
+ if err != nil {
+ if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
+ rc := exitErr.ExitStatus()
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
+ Status: metav1.StatusFailure,
+ Reason: remotecommandconsts.NonZeroExitCodeReason,
+ Details: &metav1.StatusDetails{
+ Causes: []metav1.StatusCause{
+ {
+ Type: remotecommandconsts.ExitCodeCauseType,
+ Message: fmt.Sprintf("%d", rc),
+ },
+ },
+ },
+ Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
+ }})
+ } else {
+ err = fmt.Errorf("error executing command in container: %v", err)
+ runtime.HandleError(err)
+ ctx.writeStatus(apierrors.NewInternalError(err))
+ }
+ } else {
+ ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
+ Status: metav1.StatusSuccess,
+ }})
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go
new file mode 100644
index 000000000..f09b5e400
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go
@@ -0,0 +1,447 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/httpstream"
+ "k8s.io/apimachinery/pkg/util/httpstream/spdy"
+ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apiserver/pkg/util/wsstream"
+ "k8s.io/client-go/tools/remotecommand"
+ "k8s.io/kubernetes/pkg/api"
+
+ "github.com/golang/glog"
+)
+
+// Options contains details about which streams are required for
+// remote command execution.
+type Options struct {
+ Stdin bool
+ Stdout bool
+ Stderr bool
+ TTY bool
+}
+
+// NewOptions creates a new Options from the Request.
+func NewOptions(req *http.Request) (*Options, error) {
+ tty := req.FormValue(api.ExecTTYParam) == "1"
+ stdin := req.FormValue(api.ExecStdinParam) == "1"
+ stdout := req.FormValue(api.ExecStdoutParam) == "1"
+ stderr := req.FormValue(api.ExecStderrParam) == "1"
+ if tty && stderr {
+ // TODO: make this an error before we reach this method
+ glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
+ stderr = false
+ }
+
+ if !stdin && !stdout && !stderr {
+ return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr")
+ }
+
+ return &Options{
+ Stdin: stdin,
+ Stdout: stdout,
+ Stderr: stderr,
+ TTY: tty,
+ }, nil
+}
+
+// context contains the connection and streams used when
+// forwarding an attach or execute session into a container.
+type context struct {
+ conn io.Closer
+ stdinStream io.ReadCloser
+ stdoutStream io.WriteCloser
+ stderrStream io.WriteCloser
+ writeStatus func(status *apierrors.StatusError) error
+ resizeStream io.ReadCloser
+ resizeChan chan remotecommand.TerminalSize
+ tty bool
+}
+
+// streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
+// enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
+// replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
+// received and right after, the connection gets closed).
+type streamAndReply struct {
+ httpstream.Stream
+ replySent <-chan struct{}
+}
+
+// waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
+// an empty struct to the notify channel.
+func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
+ select {
+ case <-replySent:
+ notify <- struct{}{}
+ case <-stop:
+ }
+}
+
+func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
+ var ctx *context
+ var ok bool
+ if wsstream.IsWebSocketRequest(req) {
+ ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout)
+ } else {
+ ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout)
+ }
+ if !ok {
+ return nil, false
+ }
+
+ if ctx.resizeStream != nil {
+ ctx.resizeChan = make(chan remotecommand.TerminalSize)
+ go handleResizeEvents(ctx.resizeStream, ctx.resizeChan)
+ }
+
+ return ctx, true
+}
+
+func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) {
+ protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols)
+ if err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprint(w, err.Error())
+ return nil, false
+ }
+
+ streamCh := make(chan streamAndReply)
+
+ upgrader := spdy.NewResponseUpgrader()
+ conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
+ streamCh <- streamAndReply{Stream: stream, replySent: replySent}
+ return nil
+ })
+ // from this point on, we can no longer call methods on response
+ if conn == nil {
+ // The upgrader is responsible for notifying the client of any errors that
+ // occurred during upgrading. All we can do is return here at this point
+ // if we weren't successful in upgrading.
+ return nil, false
+ }
+
+ conn.SetIdleTimeout(idleTimeout)
+
+ var handler protocolHandler
+ switch protocol {
+ case remotecommandconsts.StreamProtocolV4Name:
+ handler = &v4ProtocolHandler{}
+ case remotecommandconsts.StreamProtocolV3Name:
+ handler = &v3ProtocolHandler{}
+ case remotecommandconsts.StreamProtocolV2Name:
+ handler = &v2ProtocolHandler{}
+ case "":
+ glog.V(4).Infof("Client did not request protocol negotiaion. Falling back to %q", remotecommandconsts.StreamProtocolV1Name)
+ fallthrough
+ case remotecommandconsts.StreamProtocolV1Name:
+ handler = &v1ProtocolHandler{}
+ }
+
+ // count the streams client asked for, starting with 1
+ expectedStreams := 1
+ if opts.Stdin {
+ expectedStreams++
+ }
+ if opts.Stdout {
+ expectedStreams++
+ }
+ if opts.Stderr {
+ expectedStreams++
+ }
+ if opts.TTY && handler.supportsTerminalResizing() {
+ expectedStreams++
+ }
+
+ expired := time.NewTimer(streamCreationTimeout)
+ defer expired.Stop()
+
+ ctx, err := handler.waitForStreams(streamCh, expectedStreams, expired.C)
+ if err != nil {
+ runtime.HandleError(err)
+ return nil, false
+ }
+
+ ctx.conn = conn
+ ctx.tty = opts.TTY
+
+ return ctx, true
+}
+
+type protocolHandler interface {
+ // waitForStreams waits for the expected streams or a timeout, returning a
+ // remoteCommandContext if all the streams were received, or an error if not.
+ waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error)
+ // supportsTerminalResizing returns true if the protocol handler supports terminal resizing
+ supportsTerminalResizing() bool
+}
+
+// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
+// in from v3 in the error stream format using an json-marshaled metav1.Status which carries
+// the process' exit code.
+type v4ProtocolHandler struct{}
+
+func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
+ ctx := &context{}
+ receivedStreams := 0
+ replyChan := make(chan struct{})
+ stop := make(chan struct{})
+ defer close(stop)
+WaitForStreams:
+ for {
+ select {
+ case stream := <-streams:
+ streamType := stream.Headers().Get(api.StreamType)
+ switch streamType {
+ case api.StreamTypeError:
+ ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdin:
+ ctx.stdinStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdout:
+ ctx.stdoutStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStderr:
+ ctx.stderrStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeResize:
+ ctx.resizeStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ default:
+ runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
+ }
+ case <-replyChan:
+ receivedStreams++
+ if receivedStreams == expectedStreams {
+ break WaitForStreams
+ }
+ case <-expired:
+ // TODO find a way to return the error to the user. Maybe use a separate
+ // stream to report errors?
+ return nil, errors.New("timed out waiting for client to create streams")
+ }
+ }
+
+ return ctx, nil
+}
+
+// supportsTerminalResizing returns true because v4ProtocolHandler supports it
+func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
+
+// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
+type v3ProtocolHandler struct{}
+
+func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
+ ctx := &context{}
+ receivedStreams := 0
+ replyChan := make(chan struct{})
+ stop := make(chan struct{})
+ defer close(stop)
+WaitForStreams:
+ for {
+ select {
+ case stream := <-streams:
+ streamType := stream.Headers().Get(api.StreamType)
+ switch streamType {
+ case api.StreamTypeError:
+ ctx.writeStatus = v1WriteStatusFunc(stream)
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdin:
+ ctx.stdinStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdout:
+ ctx.stdoutStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStderr:
+ ctx.stderrStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeResize:
+ ctx.resizeStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ default:
+ runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
+ }
+ case <-replyChan:
+ receivedStreams++
+ if receivedStreams == expectedStreams {
+ break WaitForStreams
+ }
+ case <-expired:
+ // TODO find a way to return the error to the user. Maybe use a separate
+ // stream to report errors?
+ return nil, errors.New("timed out waiting for client to create streams")
+ }
+ }
+
+ return ctx, nil
+}
+
+// supportsTerminalResizing returns true because v3ProtocolHandler supports it
+func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true }
+
+// v2ProtocolHandler implements the V2 protocol version for streaming command execution.
+type v2ProtocolHandler struct{}
+
+func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
+ ctx := &context{}
+ receivedStreams := 0
+ replyChan := make(chan struct{})
+ stop := make(chan struct{})
+ defer close(stop)
+WaitForStreams:
+ for {
+ select {
+ case stream := <-streams:
+ streamType := stream.Headers().Get(api.StreamType)
+ switch streamType {
+ case api.StreamTypeError:
+ ctx.writeStatus = v1WriteStatusFunc(stream)
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdin:
+ ctx.stdinStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdout:
+ ctx.stdoutStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStderr:
+ ctx.stderrStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ default:
+ runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
+ }
+ case <-replyChan:
+ receivedStreams++
+ if receivedStreams == expectedStreams {
+ break WaitForStreams
+ }
+ case <-expired:
+ // TODO find a way to return the error to the user. Maybe use a separate
+ // stream to report errors?
+ return nil, errors.New("timed out waiting for client to create streams")
+ }
+ }
+
+ return ctx, nil
+}
+
+// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it.
+func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false }
+
+// v1ProtocolHandler implements the V1 protocol version for streaming command execution.
+type v1ProtocolHandler struct{}
+
+func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
+ ctx := &context{}
+ receivedStreams := 0
+ replyChan := make(chan struct{})
+ stop := make(chan struct{})
+ defer close(stop)
+WaitForStreams:
+ for {
+ select {
+ case stream := <-streams:
+ streamType := stream.Headers().Get(api.StreamType)
+ switch streamType {
+ case api.StreamTypeError:
+ ctx.writeStatus = v1WriteStatusFunc(stream)
+
+ // This defer statement shouldn't be here, but due to previous refactoring, it ended up in
+ // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
+ // the v2ProtocolHandler.
+ defer stream.Reset()
+
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdin:
+ ctx.stdinStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStdout:
+ ctx.stdoutStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ case api.StreamTypeStderr:
+ ctx.stderrStream = stream
+ go waitStreamReply(stream.replySent, replyChan, stop)
+ default:
+ runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
+ }
+ case <-replyChan:
+ receivedStreams++
+ if receivedStreams == expectedStreams {
+ break WaitForStreams
+ }
+ case <-expired:
+ // TODO find a way to return the error to the user. Maybe use a separate
+ // stream to report errors?
+ return nil, errors.New("timed out waiting for client to create streams")
+ }
+ }
+
+ if ctx.stdinStream != nil {
+ ctx.stdinStream.Close()
+ }
+
+ return ctx, nil
+}
+
+// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it.
+func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false }
+
+func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalSize) {
+ defer runtime.HandleCrash()
+
+ decoder := json.NewDecoder(stream)
+ for {
+ size := remotecommand.TerminalSize{}
+ if err := decoder.Decode(&size); err != nil {
+ break
+ }
+ channel <- size
+ }
+}
+
+func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
+ return func(status *apierrors.StatusError) error {
+ if status.Status().Status == metav1.StatusSuccess {
+ return nil // send error messages
+ }
+ _, err := stream.Write([]byte(status.Error()))
+ return err
+ }
+}
+
+// v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
+// as json in the error channel.
+func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
+ return func(status *apierrors.StatusError) error {
+ bs, err := json.Marshal(status.Status())
+ if err != nil {
+ return err
+ }
+ _, err = stream.Write(bs)
+ return err
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go
new file mode 100644
index 000000000..c60012b21
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go
@@ -0,0 +1,132 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remotecommand
+
+import (
+ "fmt"
+ "net/http"
+ "time"
+
+ "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apiserver/pkg/server/httplog"
+ "k8s.io/apiserver/pkg/util/wsstream"
+)
+
+const (
+ stdinChannel = iota
+ stdoutChannel
+ stderrChannel
+ errorChannel
+ resizeChannel
+
+ preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol
+ preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol
+ v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
+ v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
+)
+
+// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
+// along with the approximate duplex value. It also creates the error (3) and resize (4) channels.
+func createChannels(opts *Options) []wsstream.ChannelType {
+ // open the requested channels, and always open the error channel
+ channels := make([]wsstream.ChannelType, 5)
+ channels[stdinChannel] = readChannel(opts.Stdin)
+ channels[stdoutChannel] = writeChannel(opts.Stdout)
+ channels[stderrChannel] = writeChannel(opts.Stderr)
+ channels[errorChannel] = wsstream.WriteChannel
+ channels[resizeChannel] = wsstream.ReadChannel
+ return channels
+}
+
+// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel.
+func readChannel(real bool) wsstream.ChannelType {
+ if real {
+ return wsstream.ReadChannel
+ }
+ return wsstream.IgnoreChannel
+}
+
+// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel.
+func writeChannel(real bool) wsstream.ChannelType {
+ if real {
+ return wsstream.WriteChannel
+ }
+ return wsstream.IgnoreChannel
+}
+
+// createWebSocketStreams returns a context containing the websocket connection and
+// streams needed to perform an exec or an attach.
+func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Options, idleTimeout time.Duration) (*context, bool) {
+ channels := createChannels(opts)
+ conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
+ "": {
+ Binary: true,
+ Channels: channels,
+ },
+ preV4BinaryWebsocketProtocol: {
+ Binary: true,
+ Channels: channels,
+ },
+ preV4Base64WebsocketProtocol: {
+ Binary: false,
+ Channels: channels,
+ },
+ v4BinaryWebsocketProtocol: {
+ Binary: true,
+ Channels: channels,
+ },
+ v4Base64WebsocketProtocol: {
+ Binary: false,
+ Channels: channels,
+ },
+ })
+ conn.SetIdleTimeout(idleTimeout)
+ negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(w), req)
+ if err != nil {
+ runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
+ return nil, false
+ }
+
+ // Send an empty message to the lowest writable channel to notify the client the connection is established
+ // TODO: make generic to SPDY and WebSockets and do it outside of this method?
+ switch {
+ case opts.Stdout:
+ streams[stdoutChannel].Write([]byte{})
+ case opts.Stderr:
+ streams[stderrChannel].Write([]byte{})
+ default:
+ streams[errorChannel].Write([]byte{})
+ }
+
+ ctx := &context{
+ conn: conn,
+ stdinStream: streams[stdinChannel],
+ stdoutStream: streams[stdoutChannel],
+ stderrStream: streams[stderrChannel],
+ tty: opts.TTY,
+ resizeStream: streams[resizeChannel],
+ }
+
+ switch negotiatedProtocol {
+ case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
+ ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel])
+ default:
+ ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel])
+ }
+
+ return ctx, true
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go
new file mode 100644
index 000000000..9f16b4eb2
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go
@@ -0,0 +1,55 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package streaming
+
+import (
+ "fmt"
+ "net/http"
+ "strconv"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+)
+
+func ErrorStreamingDisabled(method string) error {
+ return grpc.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method))
+}
+
+// The error returned when the maximum number of in-flight requests is exceeded.
+func ErrorTooManyInFlight() error {
+ return grpc.Errorf(codes.ResourceExhausted, "maximum number of in-flight requests exceeded")
+}
+
+// Translates a CRI streaming error into an appropriate HTTP response.
+func WriteError(err error, w http.ResponseWriter) error {
+ var status int
+ switch grpc.Code(err) {
+ case codes.NotFound:
+ status = http.StatusNotFound
+ case codes.ResourceExhausted:
+ // We only expect to hit this if there is a DoS, so we just wait the full TTL.
+ // If this is ever hit in steady-state operations, consider increasing the MaxInFlight requests,
+ // or plumbing through the time to next expiration.
+ w.Header().Set("Retry-After", strconv.Itoa(int(CacheTTL.Seconds())))
+ status = http.StatusTooManyRequests
+ default:
+ status = http.StatusInternalServerError
+ }
+ w.WriteHeader(status)
+ _, writeErr := w.Write([]byte(err.Error()))
+ return writeErr
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go
new file mode 100644
index 000000000..f68f640be
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go
@@ -0,0 +1,146 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package streaming
+
+import (
+ "container/list"
+ "crypto/rand"
+ "encoding/base64"
+ "fmt"
+ "math"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/util/clock"
+)
+
+var (
+ // Timeout after which tokens become invalid.
+ CacheTTL = 1 * time.Minute
+ // The maximum number of in-flight requests to allow.
+ MaxInFlight = 1000
+ // Length of the random base64 encoded token identifying the request.
+ TokenLen = 8
+)
+
+// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use
+// random token for their retrieval. The requestCache is used for building streaming URLs without
+// the need to encode every request parameter in the URL.
+type requestCache struct {
+ // clock is used to obtain the current time
+ clock clock.Clock
+
+ // tokens maps the generate token to the request for fast retrieval.
+ tokens map[string]*list.Element
+ // ll maintains an age-ordered request list for faster garbage collection of expired requests.
+ ll *list.List
+
+ lock sync.Mutex
+}
+
+// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest.
+type request interface{}
+
+type cacheEntry struct {
+ token string
+ req request
+ expireTime time.Time
+}
+
+func newRequestCache() *requestCache {
+ return &requestCache{
+ clock: clock.RealClock{},
+ ll: list.New(),
+ tokens: make(map[string]*list.Element),
+ }
+}
+
+// Insert the given request into the cache and returns the token used for fetching it out.
+func (c *requestCache) Insert(req request) (token string, err error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ // Remove expired entries.
+ c.gc()
+ // If the cache is full, reject the request.
+ if c.ll.Len() == MaxInFlight {
+ return "", ErrorTooManyInFlight()
+ }
+ token, err = c.uniqueToken()
+ if err != nil {
+ return "", err
+ }
+ ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(CacheTTL)})
+
+ c.tokens[token] = ele
+ return token, nil
+}
+
+// Consume the token (remove it from the cache) and return the cached request, if found.
+func (c *requestCache) Consume(token string) (req request, found bool) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ ele, ok := c.tokens[token]
+ if !ok {
+ return nil, false
+ }
+ c.ll.Remove(ele)
+ delete(c.tokens, token)
+
+ entry := ele.Value.(*cacheEntry)
+ if c.clock.Now().After(entry.expireTime) {
+ // Entry already expired.
+ return nil, false
+ }
+ return entry.req, true
+}
+
+// uniqueToken generates a random URL-safe token and ensures uniqueness.
+func (c *requestCache) uniqueToken() (string, error) {
+ const maxTries = 10
+ // Number of bytes to be TokenLen when base64 encoded.
+ tokenSize := math.Ceil(float64(TokenLen) * 6 / 8)
+ rawToken := make([]byte, int(tokenSize))
+ for i := 0; i < maxTries; i++ {
+ if _, err := rand.Read(rawToken); err != nil {
+ return "", err
+ }
+ encoded := base64.RawURLEncoding.EncodeToString(rawToken)
+ token := encoded[:TokenLen]
+ // If it's unique, return it. Otherwise retry.
+ if _, exists := c.tokens[encoded]; !exists {
+ return token, nil
+ }
+ }
+ return "", fmt.Errorf("failed to generate unique token")
+}
+
+// Must be write-locked prior to calling.
+func (c *requestCache) gc() {
+ now := c.clock.Now()
+ for c.ll.Len() > 0 {
+ oldest := c.ll.Back()
+ entry := oldest.Value.(*cacheEntry)
+ if !now.After(entry.expireTime) {
+ return
+ }
+
+ // Oldest value is expired; remove it.
+ c.ll.Remove(oldest)
+ delete(c.tokens, entry.token)
+ }
+}
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go
new file mode 100644
index 000000000..875e44462
--- /dev/null
+++ b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go
@@ -0,0 +1,344 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package streaming
+
+import (
+ "crypto/tls"
+ "errors"
+ "io"
+ "net/http"
+ "net/url"
+ "path"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+
+ restful "github.com/emicklei/go-restful"
+
+ "k8s.io/apimachinery/pkg/types"
+ remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
+ "k8s.io/client-go/tools/remotecommand"
+ runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
+ "k8s.io/kubernetes/pkg/kubelet/server/portforward"
+ remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
+)
+
+// The library interface to serve the stream requests.
+type Server interface {
+ http.Handler
+
+ // Get the serving URL for the requests.
+ // Requests must not be nil. Responses may be nil iff an error is returned.
+ GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
+ GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
+ GetPortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
+
+ // Start the server.
+ // addr is the address to serve on (address:port) stayUp indicates whether the server should
+ // listen until Stop() is called, or automatically stop after all expected connections are
+ // closed. Calling Get{Exec,Attach,PortForward} increments the expected connection count.
+ // Function does not return until the server is stopped.
+ Start(stayUp bool) error
+ // Stop the server, and terminate any open connections.
+ Stop() error
+}
+
+// The interface to execute the commands and provide the streams.
+type Runtime interface {
+ Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
+ Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
+ PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error
+}
+
+// Config defines the options used for running the stream server.
+type Config struct {
+ // The host:port address the server will listen on.
+ Addr string
+ // The optional base URL for constructing streaming URLs. If empty, the baseURL will be
+ // constructed from the serve address.
+ BaseURL *url.URL
+
+ // How long to leave idle connections open for.
+ StreamIdleTimeout time.Duration
+ // How long to wait for clients to create streams. Only used for SPDY streaming.
+ StreamCreationTimeout time.Duration
+
+ // The streaming protocols the server supports (understands and permits). See
+ // k8s.io/kubernetes/pkg/kubelet/server/remotecommand/constants.go for available protocols.
+ // Only used for SPDY streaming.
+ SupportedRemoteCommandProtocols []string
+
+ // The streaming protocols the server supports (understands and permits). See
+ // k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go for available protocols.
+ // Only used for SPDY streaming.
+ SupportedPortForwardProtocols []string
+
+ // The config for serving over TLS. If nil, TLS will not be used.
+ TLSConfig *tls.Config
+}
+
+// DefaultConfig provides default values for server Config. The DefaultConfig is partial, so
+// some fields like Addr must still be provided.
+var DefaultConfig = Config{
+ StreamIdleTimeout: 4 * time.Hour,
+ StreamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout,
+ SupportedRemoteCommandProtocols: remotecommandconsts.SupportedStreamingProtocols,
+ SupportedPortForwardProtocols: portforward.SupportedProtocols,
+}
+
+// TODO(timstclair): Add auth(n/z) interface & handling.
+func NewServer(config Config, runtime Runtime) (Server, error) {
+ s := &server{
+ config: config,
+ runtime: &criAdapter{runtime},
+ cache: newRequestCache(),
+ }
+
+ if s.config.BaseURL == nil {
+ s.config.BaseURL = &url.URL{
+ Scheme: "http",
+ Host: s.config.Addr,
+ }
+ if s.config.TLSConfig != nil {
+ s.config.BaseURL.Scheme = "https"
+ }
+ }
+
+ ws := &restful.WebService{}
+ endpoints := []struct {
+ path string
+ handler restful.RouteFunction
+ }{
+ {"/exec/{token}", s.serveExec},
+ {"/attach/{token}", s.serveAttach},
+ {"/portforward/{token}", s.servePortForward},
+ }
+ // If serving relative to a base path, set that here.
+ pathPrefix := path.Dir(s.config.BaseURL.Path)
+ for _, e := range endpoints {
+ for _, method := range []string{"GET", "POST"} {
+ ws.Route(ws.
+ Method(method).
+ Path(path.Join(pathPrefix, e.path)).
+ To(e.handler))
+ }
+ }
+ handler := restful.NewContainer()
+ handler.Add(ws)
+ s.handler = handler
+
+ return s, nil
+}
+
+type server struct {
+ config Config
+ runtime *criAdapter
+ handler http.Handler
+ cache *requestCache
+}
+
+func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
+ if req.ContainerId == "" {
+ return nil, grpc.Errorf(codes.InvalidArgument, "missing required container_id")
+ }
+ token, err := s.cache.Insert(req)
+ if err != nil {
+ return nil, err
+ }
+ return &runtimeapi.ExecResponse{
+ Url: s.buildURL("exec", token),
+ }, nil
+}
+
+func (s *server) GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
+ if req.ContainerId == "" {
+ return nil, grpc.Errorf(codes.InvalidArgument, "missing required container_id")
+ }
+ token, err := s.cache.Insert(req)
+ if err != nil {
+ return nil, err
+ }
+ return &runtimeapi.AttachResponse{
+ Url: s.buildURL("attach", token),
+ }, nil
+}
+
+func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
+ if req.PodSandboxId == "" {
+ return nil, grpc.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id")
+ }
+ token, err := s.cache.Insert(req)
+ if err != nil {
+ return nil, err
+ }
+ return &runtimeapi.PortForwardResponse{
+ Url: s.buildURL("portforward", token),
+ }, nil
+}
+
+func (s *server) Start(stayUp bool) error {
+ if !stayUp {
+ // TODO(timstclair): Implement this.
+ return errors.New("stayUp=false is not yet implemented")
+ }
+
+ server := &http.Server{
+ Addr: s.config.Addr,
+ Handler: s.handler,
+ TLSConfig: s.config.TLSConfig,
+ }
+ if s.config.TLSConfig != nil {
+ return server.ListenAndServeTLS("", "") // Use certs from TLSConfig.
+ } else {
+ return server.ListenAndServe()
+ }
+}
+
+func (s *server) Stop() error {
+ // TODO(timstclair): Implement this.
+ return errors.New("not yet implemented")
+}
+
+func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ s.handler.ServeHTTP(w, r)
+}
+
+func (s *server) buildURL(method, token string) string {
+ return s.config.BaseURL.ResolveReference(&url.URL{
+ Path: path.Join(method, token),
+ }).String()
+}
+
+func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
+ token := req.PathParameter("token")
+ cachedRequest, ok := s.cache.Consume(token)
+ if !ok {
+ http.NotFound(resp.ResponseWriter, req.Request)
+ return
+ }
+ exec, ok := cachedRequest.(*runtimeapi.ExecRequest)
+ if !ok {
+ http.NotFound(resp.ResponseWriter, req.Request)
+ return
+ }
+
+ streamOpts := &remotecommandserver.Options{
+ Stdin: exec.Stdin,
+ Stdout: true,
+ Stderr: !exec.Tty,
+ TTY: exec.Tty,
+ }
+
+ remotecommandserver.ServeExec(
+ resp.ResponseWriter,
+ req.Request,
+ s.runtime,
+ "", // unused: podName
+ "", // unusued: podUID
+ exec.ContainerId,
+ exec.Cmd,
+ streamOpts,
+ s.config.StreamIdleTimeout,
+ s.config.StreamCreationTimeout,
+ s.config.SupportedRemoteCommandProtocols)
+}
+
+func (s *server) serveAttach(req *restful.Request, resp *restful.Response) {
+ token := req.PathParameter("token")
+ cachedRequest, ok := s.cache.Consume(token)
+ if !ok {
+ http.NotFound(resp.ResponseWriter, req.Request)
+ return
+ }
+ attach, ok := cachedRequest.(*runtimeapi.AttachRequest)
+ if !ok {
+ http.NotFound(resp.ResponseWriter, req.Request)
+ return
+ }
+
+ streamOpts := &remotecommandserver.Options{
+ Stdin: attach.Stdin,
+ Stdout: true,
+ Stderr: !attach.Tty,
+ TTY: attach.Tty,
+ }
+ remotecommandserver.ServeAttach(
+ resp.ResponseWriter,
+ req.Request,
+ s.runtime,
+ "", // unused: podName
+ "", // unusued: podUID
+ attach.ContainerId,
+ streamOpts,
+ s.config.StreamIdleTimeout,
+ s.config.StreamCreationTimeout,
+ s.config.SupportedRemoteCommandProtocols)
+}
+
+func (s *server) servePortForward(req *restful.Request, resp *restful.Response) {
+ token := req.PathParameter("token")
+ cachedRequest, ok := s.cache.Consume(token)
+ if !ok {
+ http.NotFound(resp.ResponseWriter, req.Request)
+ return
+ }
+ pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest)
+ if !ok {
+ http.NotFound(resp.ResponseWriter, req.Request)
+ return
+ }
+
+ portForwardOptions, err := portforward.BuildV4Options(pf.Port)
+ if err != nil {
+ resp.WriteError(http.StatusBadRequest, err)
+ return
+ }
+
+ portforward.ServePortForward(
+ resp.ResponseWriter,
+ req.Request,
+ s.runtime,
+ pf.PodSandboxId,
+ "", // unused: podUID
+ portForwardOptions,
+ s.config.StreamIdleTimeout,
+ s.config.StreamCreationTimeout,
+ s.config.SupportedPortForwardProtocols)
+}
+
+// criAdapter wraps the Runtime functions to conform to the remotecommand interfaces.
+// The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name.
+type criAdapter struct {
+ Runtime
+}
+
+var _ remotecommandserver.Executor = &criAdapter{}
+var _ remotecommandserver.Attacher = &criAdapter{}
+var _ portforward.PortForwarder = &criAdapter{}
+
+func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
+ return a.Exec(container, cmd, in, out, err, tty, resize)
+}
+
+func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
+ return a.Attach(container, in, out, err, tty, resize)
+}
+
+func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
+ return a.Runtime.PortForward(podName, port, stream)
+}