diff options
Diffstat (limited to 'vendor/k8s.io/kubernetes/pkg')
26 files changed, 0 insertions, 2864 deletions
diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/leaky/leaky.go b/vendor/k8s.io/kubernetes/pkg/kubelet/leaky/leaky.go deleted file mode 100644 index 4e3e1e1f2..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/leaky/leaky.go +++ /dev/null @@ -1,25 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package leaky holds bits of kubelet that should be internal but have leaked -// out through bad abstractions. TODO: delete all of this. -package leaky - -const ( - // This is used in a few places outside of Kubelet, such as indexing - // into the container info. - PodInfraContainerName = "POD" -) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go deleted file mode 100644 index e7ccd58ae..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go +++ /dev/null @@ -1,23 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// package portforward contains server-side logic for handling port forwarding requests. -package portforward - -// The subprotocol "portforward.k8s.io" is used for port forwarding. -const ProtocolV1Name = "portforward.k8s.io" - -var SupportedProtocols = []string{ProtocolV1Name} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go deleted file mode 100644 index 5f872c820..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/httpstream.go +++ /dev/null @@ -1,309 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package portforward - -import ( - "errors" - "fmt" - "net/http" - "strconv" - "sync" - "time" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/httpstream" - "k8s.io/apimachinery/pkg/util/httpstream/spdy" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/kubernetes/pkg/api" - - "github.com/golang/glog" -) - -func handleHttpStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { - _, err := httpstream.Handshake(req, w, supportedPortForwardProtocols) - // negotiated protocol isn't currently used server side, but could be in the future - if err != nil { - // Handshake writes the error to the client - return err - } - streamChan := make(chan httpstream.Stream, 1) - - glog.V(5).Infof("Upgrading port forward response") - upgrader := spdy.NewResponseUpgrader() - conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan)) - if conn == nil { - return errors.New("Unable to upgrade websocket connection") - } - defer conn.Close() - - glog.V(5).Infof("(conn=%p) setting port forwarding streaming connection idle timeout to %v", conn, idleTimeout) - conn.SetIdleTimeout(idleTimeout) - - h := &httpStreamHandler{ - conn: conn, - streamChan: streamChan, - streamPairs: make(map[string]*httpStreamPair), - streamCreationTimeout: streamCreationTimeout, - pod: podName, - uid: uid, - forwarder: portForwarder, - } - h.run() - - return nil -} - -// httpStreamReceived is the httpstream.NewStreamHandler for port -// forward streams. It checks each stream's port and stream type headers, -// rejecting any streams that with missing or invalid values. Each valid -// stream is sent to the streams channel. -func httpStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error { - return func(stream httpstream.Stream, replySent <-chan struct{}) error { - // make sure it has a valid port header - portString := stream.Headers().Get(api.PortHeader) - if len(portString) == 0 { - return fmt.Errorf("%q header is required", api.PortHeader) - } - port, err := strconv.ParseUint(portString, 10, 16) - if err != nil { - return fmt.Errorf("unable to parse %q as a port: %v", portString, err) - } - if port < 1 { - return fmt.Errorf("port %q must be > 0", portString) - } - - // make sure it has a valid stream type header - streamType := stream.Headers().Get(api.StreamType) - if len(streamType) == 0 { - return fmt.Errorf("%q header is required", api.StreamType) - } - if streamType != api.StreamTypeError && streamType != api.StreamTypeData { - return fmt.Errorf("invalid stream type %q", streamType) - } - - streams <- stream - return nil - } -} - -// httpStreamHandler is capable of processing multiple port forward -// requests over a single httpstream.Connection. -type httpStreamHandler struct { - conn httpstream.Connection - streamChan chan httpstream.Stream - streamPairsLock sync.RWMutex - streamPairs map[string]*httpStreamPair - streamCreationTimeout time.Duration - pod string - uid types.UID - forwarder PortForwarder -} - -// getStreamPair returns a httpStreamPair for requestID. This creates a -// new pair if one does not yet exist for the requestID. The returned bool is -// true if the pair was created. -func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) { - h.streamPairsLock.Lock() - defer h.streamPairsLock.Unlock() - - if p, ok := h.streamPairs[requestID]; ok { - glog.V(5).Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID) - return p, false - } - - glog.V(5).Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID) - - p := newPortForwardPair(requestID) - h.streamPairs[requestID] = p - - return p, true -} - -// monitorStreamPair waits for the pair to receive both its error and data -// streams, or for the timeout to expire (whichever happens first), and then -// removes the pair. -func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) { - select { - case <-timeout: - err := fmt.Errorf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID) - utilruntime.HandleError(err) - p.printError(err.Error()) - case <-p.complete: - glog.V(5).Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID) - } - h.removeStreamPair(p.requestID) -} - -// hasStreamPair returns a bool indicating if a stream pair for requestID -// exists. -func (h *httpStreamHandler) hasStreamPair(requestID string) bool { - h.streamPairsLock.RLock() - defer h.streamPairsLock.RUnlock() - - _, ok := h.streamPairs[requestID] - return ok -} - -// removeStreamPair removes the stream pair identified by requestID from streamPairs. -func (h *httpStreamHandler) removeStreamPair(requestID string) { - h.streamPairsLock.Lock() - defer h.streamPairsLock.Unlock() - - delete(h.streamPairs, requestID) -} - -// requestID returns the request id for stream. -func (h *httpStreamHandler) requestID(stream httpstream.Stream) string { - requestID := stream.Headers().Get(api.PortForwardRequestIDHeader) - if len(requestID) == 0 { - glog.V(5).Infof("(conn=%p) stream received without %s header", h.conn, api.PortForwardRequestIDHeader) - // If we get here, it's because the connection came from an older client - // that isn't generating the request id header - // (https://github.com/kubernetes/kubernetes/blob/843134885e7e0b360eb5441e85b1410a8b1a7a0c/pkg/client/unversioned/portforward/portforward.go#L258-L287) - // - // This is a best-effort attempt at supporting older clients. - // - // When there aren't concurrent new forwarded connections, each connection - // will have a pair of streams (data, error), and the stream IDs will be - // consecutive odd numbers, e.g. 1 and 3 for the first connection. Convert - // the stream ID into a pseudo-request id by taking the stream type and - // using id = stream.Identifier() when the stream type is error, - // and id = stream.Identifier() - 2 when it's data. - // - // NOTE: this only works when there are not concurrent new streams from - // multiple forwarded connections; it's a best-effort attempt at supporting - // old clients that don't generate request ids. If there are concurrent - // new connections, it's possible that 1 connection gets streams whose IDs - // are not consecutive (e.g. 5 and 9 instead of 5 and 7). - streamType := stream.Headers().Get(api.StreamType) - switch streamType { - case api.StreamTypeError: - requestID = strconv.Itoa(int(stream.Identifier())) - case api.StreamTypeData: - requestID = strconv.Itoa(int(stream.Identifier()) - 2) - } - - glog.V(5).Infof("(conn=%p) automatically assigning request ID=%q from stream type=%s, stream ID=%d", h.conn, requestID, streamType, stream.Identifier()) - } - return requestID -} - -// run is the main loop for the httpStreamHandler. It processes new -// streams, invoking portForward for each complete stream pair. The loop exits -// when the httpstream.Connection is closed. -func (h *httpStreamHandler) run() { - glog.V(5).Infof("(conn=%p) waiting for port forward streams", h.conn) -Loop: - for { - select { - case <-h.conn.CloseChan(): - glog.V(5).Infof("(conn=%p) upgraded connection closed", h.conn) - break Loop - case stream := <-h.streamChan: - requestID := h.requestID(stream) - streamType := stream.Headers().Get(api.StreamType) - glog.V(5).Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType) - - p, created := h.getStreamPair(requestID) - if created { - go h.monitorStreamPair(p, time.After(h.streamCreationTimeout)) - } - if complete, err := p.add(stream); err != nil { - msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err) - utilruntime.HandleError(errors.New(msg)) - p.printError(msg) - } else if complete { - go h.portForward(p) - } - } - } -} - -// portForward invokes the httpStreamHandler's forwarder.PortForward -// function for the given stream pair. -func (h *httpStreamHandler) portForward(p *httpStreamPair) { - defer p.dataStream.Close() - defer p.errorStream.Close() - - portString := p.dataStream.Headers().Get(api.PortHeader) - port, _ := strconv.ParseInt(portString, 10, 32) - - glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) - err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream) - glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) - - if err != nil { - msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err) - utilruntime.HandleError(msg) - fmt.Fprint(p.errorStream, msg.Error()) - } -} - -// httpStreamPair represents the error and data streams for a port -// forwarding request. -type httpStreamPair struct { - lock sync.RWMutex - requestID string - dataStream httpstream.Stream - errorStream httpstream.Stream - complete chan struct{} -} - -// newPortForwardPair creates a new httpStreamPair. -func newPortForwardPair(requestID string) *httpStreamPair { - return &httpStreamPair{ - requestID: requestID, - complete: make(chan struct{}), - } -} - -// add adds the stream to the httpStreamPair. If the pair already -// contains a stream for the new stream's type, an error is returned. add -// returns true if both the data and error streams for this pair have been -// received. -func (p *httpStreamPair) add(stream httpstream.Stream) (bool, error) { - p.lock.Lock() - defer p.lock.Unlock() - - switch stream.Headers().Get(api.StreamType) { - case api.StreamTypeError: - if p.errorStream != nil { - return false, errors.New("error stream already assigned") - } - p.errorStream = stream - case api.StreamTypeData: - if p.dataStream != nil { - return false, errors.New("data stream already assigned") - } - p.dataStream = stream - } - - complete := p.errorStream != nil && p.dataStream != nil - if complete { - close(p.complete) - } - return complete, nil -} - -// printError writes s to p.errorStream if p.errorStream has been set. -func (p *httpStreamPair) printError(s string) { - p.lock.RLock() - defer p.lock.RUnlock() - if p.errorStream != nil { - fmt.Fprint(p.errorStream, s) - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go deleted file mode 100644 index 60a96e51a..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/portforward.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package portforward - -import ( - "io" - "net/http" - "time" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/util/wsstream" -) - -// PortForwarder knows how to forward content from a data stream to/from a port -// in a pod. -type PortForwarder interface { - // PortForwarder copies data between a data stream and a port in a pod. - PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error -} - -// ServePortForward handles a port forwarding request. A single request is -// kept alive as long as the client is still alive and the connection has not -// been timed out due to idleness. This function handles multiple forwarded -// connections; i.e., multiple `curl http://localhost:8888/` requests will be -// handled by a single invocation of ServePortForward. -func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) { - var err error - if wsstream.IsWebSocketRequest(req) { - err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout) - } else { - err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) - } - - if err != nil { - runtime.HandleError(err) - return - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go deleted file mode 100644 index 22d5add06..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/portforward/websocket.go +++ /dev/null @@ -1,198 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package portforward - -import ( - "encoding/binary" - "fmt" - "io" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "github.com/golang/glog" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/server/httplog" - "k8s.io/apiserver/pkg/util/wsstream" - "k8s.io/kubernetes/pkg/api" -) - -const ( - dataChannel = iota - errorChannel - - v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol - v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol -) - -// options contains details about which streams are required for -// port forwarding. -// All fields incldued in V4Options need to be expressed explicilty in the -// CRI (pkg/kubelet/apis/cri/{version}/api.proto) PortForwardRequest. -type V4Options struct { - Ports []int32 -} - -// newOptions creates a new options from the Request. -func NewV4Options(req *http.Request) (*V4Options, error) { - if !wsstream.IsWebSocketRequest(req) { - return &V4Options{}, nil - } - - portStrings := req.URL.Query()[api.PortHeader] - if len(portStrings) == 0 { - return nil, fmt.Errorf("query parameter %q is required", api.PortHeader) - } - - ports := make([]int32, 0, len(portStrings)) - for _, portString := range portStrings { - if len(portString) == 0 { - return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader) - } - for _, p := range strings.Split(portString, ",") { - port, err := strconv.ParseUint(p, 10, 16) - if err != nil { - return nil, fmt.Errorf("unable to parse %q as a port: %v", portString, err) - } - if port < 1 { - return nil, fmt.Errorf("port %q must be > 0", portString) - } - ports = append(ports, int32(port)) - } - } - - return &V4Options{ - Ports: ports, - }, nil -} - -// BuildV4Options returns a V4Options based on the given information. -func BuildV4Options(ports []int32) (*V4Options, error) { - return &V4Options{Ports: ports}, nil -} - -// handleWebSocketStreams handles requests to forward ports to a pod via -// a PortForwarder. A pair of streams are created per port (DATA n, -// ERROR n+1). The associated port is written to each stream as a unsigned 16 -// bit integer in little endian format. -func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { - channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2) - for i := 0; i < len(opts.Ports); i++ { - channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel) - } - conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ - "": { - Binary: true, - Channels: channels, - }, - v4BinaryWebsocketProtocol: { - Binary: true, - Channels: channels, - }, - v4Base64WebsocketProtocol: { - Binary: false, - Channels: channels, - }, - }) - conn.SetIdleTimeout(idleTimeout) - _, streams, err := conn.Open(httplog.Unlogged(w), req) - if err != nil { - err = fmt.Errorf("Unable to upgrade websocket connection: %v", err) - return err - } - defer conn.Close() - streamPairs := make([]*websocketStreamPair, len(opts.Ports)) - for i := range streamPairs { - streamPair := websocketStreamPair{ - port: opts.Ports[i], - dataStream: streams[i*2+dataChannel], - errorStream: streams[i*2+errorChannel], - } - streamPairs[i] = &streamPair - - portBytes := make([]byte, 2) - // port is always positive so conversion is allowable - binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port)) - streamPair.dataStream.Write(portBytes) - streamPair.errorStream.Write(portBytes) - } - h := &websocketStreamHandler{ - conn: conn, - streamPairs: streamPairs, - pod: podName, - uid: uid, - forwarder: portForwarder, - } - h.run() - - return nil -} - -// websocketStreamPair represents the error and data streams for a port -// forwarding request. -type websocketStreamPair struct { - port int32 - dataStream io.ReadWriteCloser - errorStream io.WriteCloser -} - -// websocketStreamHandler is capable of processing a single port forward -// request over a websocket connection -type websocketStreamHandler struct { - conn *wsstream.Conn - ports []int32 - streamPairs []*websocketStreamPair - pod string - uid types.UID - forwarder PortForwarder -} - -// run invokes the websocketStreamHandler's forwarder.PortForward -// function for the given stream pair. -func (h *websocketStreamHandler) run() { - wg := sync.WaitGroup{} - wg.Add(len(h.streamPairs)) - - for _, pair := range h.streamPairs { - p := pair - go func() { - defer wg.Done() - h.portForward(p) - }() - } - - wg.Wait() -} - -func (h *websocketStreamHandler) portForward(p *websocketStreamPair) { - defer p.dataStream.Close() - defer p.errorStream.Close() - - glog.V(5).Infof("(conn=%p) invoking forwarder.PortForward for port %d", h.conn, p.port) - err := h.forwarder.PortForward(h.pod, h.uid, p.port, p.dataStream) - glog.V(5).Infof("(conn=%p) done invoking forwarder.PortForward for port %d", h.conn, p.port) - - if err != nil { - msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", p.port, h.pod, h.uid, err) - runtime.HandleError(msg) - fmt.Fprint(p.errorStream, msg.Error()) - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go deleted file mode 100644 index e266f34fe..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/attach.go +++ /dev/null @@ -1,59 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package remotecommand - -import ( - "fmt" - "io" - "net/http" - "time" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/remotecommand" -) - -// Attacher knows how to attach to a running container in a pod. -type Attacher interface { - // AttachContainer attaches to the running container in the pod, copying data between in/out/err - // and the container's stdin/stdout/stderr. - AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error -} - -// ServeAttach handles requests to attach to a container. After creating/receiving the required -// streams, it delegates the actual attaching to attacher. -func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, podName string, uid types.UID, container string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { - ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) - if !ok { - // error is handled by createStreams - return - } - defer ctx.conn.Close() - - err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan) - if err != nil { - err = fmt.Errorf("error attaching to container: %v", err) - runtime.HandleError(err) - ctx.writeStatus(apierrors.NewInternalError(err)) - } else { - ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ - Status: metav1.StatusSuccess, - }}) - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go deleted file mode 100644 index 24f9393ab..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// package remotecommand contains functions related to executing commands in and attaching to pods. -package remotecommand // import "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go deleted file mode 100644 index 8d14a937a..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/exec.go +++ /dev/null @@ -1,79 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package remotecommand - -import ( - "fmt" - "io" - "net/http" - "time" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/remotecommand" - utilexec "k8s.io/kubernetes/pkg/util/exec" -) - -// Executor knows how to execute a command in a container in a pod. -type Executor interface { - // ExecInContainer executes a command in a container in the pod, copying data - // between in/out/err and the container's stdin/stdout/stderr. - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error -} - -// ServeExec handles requests to execute a command in a container. After -// creating/receiving the required streams, it delegates the actual execution -// to the executor. -func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { - ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) - if !ok { - // error is handled by createStreams - return - } - defer ctx.conn.Close() - - err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) - if err != nil { - if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { - rc := exitErr.ExitStatus() - ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ - Status: metav1.StatusFailure, - Reason: remotecommandconsts.NonZeroExitCodeReason, - Details: &metav1.StatusDetails{ - Causes: []metav1.StatusCause{ - { - Type: remotecommandconsts.ExitCodeCauseType, - Message: fmt.Sprintf("%d", rc), - }, - }, - }, - Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr), - }}) - } else { - err = fmt.Errorf("error executing command in container: %v", err) - runtime.HandleError(err) - ctx.writeStatus(apierrors.NewInternalError(err)) - } - } else { - ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ - Status: metav1.StatusSuccess, - }}) - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go deleted file mode 100644 index f09b5e400..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/httpstream.go +++ /dev/null @@ -1,447 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package remotecommand - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "net/http" - "time" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/httpstream" - "k8s.io/apimachinery/pkg/util/httpstream/spdy" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/util/wsstream" - "k8s.io/client-go/tools/remotecommand" - "k8s.io/kubernetes/pkg/api" - - "github.com/golang/glog" -) - -// Options contains details about which streams are required for -// remote command execution. -type Options struct { - Stdin bool - Stdout bool - Stderr bool - TTY bool -} - -// NewOptions creates a new Options from the Request. -func NewOptions(req *http.Request) (*Options, error) { - tty := req.FormValue(api.ExecTTYParam) == "1" - stdin := req.FormValue(api.ExecStdinParam) == "1" - stdout := req.FormValue(api.ExecStdoutParam) == "1" - stderr := req.FormValue(api.ExecStderrParam) == "1" - if tty && stderr { - // TODO: make this an error before we reach this method - glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr") - stderr = false - } - - if !stdin && !stdout && !stderr { - return nil, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr") - } - - return &Options{ - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, - TTY: tty, - }, nil -} - -// context contains the connection and streams used when -// forwarding an attach or execute session into a container. -type context struct { - conn io.Closer - stdinStream io.ReadCloser - stdoutStream io.WriteCloser - stderrStream io.WriteCloser - writeStatus func(status *apierrors.StatusError) error - resizeStream io.ReadCloser - resizeChan chan remotecommand.TerminalSize - tty bool -} - -// streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is -// enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the -// replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was -// received and right after, the connection gets closed). -type streamAndReply struct { - httpstream.Stream - replySent <-chan struct{} -} - -// waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends -// an empty struct to the notify channel. -func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) { - select { - case <-replySent: - notify <- struct{}{} - case <-stop: - } -} - -func createStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { - var ctx *context - var ok bool - if wsstream.IsWebSocketRequest(req) { - ctx, ok = createWebSocketStreams(req, w, opts, idleTimeout) - } else { - ctx, ok = createHttpStreamStreams(req, w, opts, supportedStreamProtocols, idleTimeout, streamCreationTimeout) - } - if !ok { - return nil, false - } - - if ctx.resizeStream != nil { - ctx.resizeChan = make(chan remotecommand.TerminalSize) - go handleResizeEvents(ctx.resizeStream, ctx.resizeChan) - } - - return ctx, true -} - -func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *Options, supportedStreamProtocols []string, idleTimeout, streamCreationTimeout time.Duration) (*context, bool) { - protocol, err := httpstream.Handshake(req, w, supportedStreamProtocols) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, err.Error()) - return nil, false - } - - streamCh := make(chan streamAndReply) - - upgrader := spdy.NewResponseUpgrader() - conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error { - streamCh <- streamAndReply{Stream: stream, replySent: replySent} - return nil - }) - // from this point on, we can no longer call methods on response - if conn == nil { - // The upgrader is responsible for notifying the client of any errors that - // occurred during upgrading. All we can do is return here at this point - // if we weren't successful in upgrading. - return nil, false - } - - conn.SetIdleTimeout(idleTimeout) - - var handler protocolHandler - switch protocol { - case remotecommandconsts.StreamProtocolV4Name: - handler = &v4ProtocolHandler{} - case remotecommandconsts.StreamProtocolV3Name: - handler = &v3ProtocolHandler{} - case remotecommandconsts.StreamProtocolV2Name: - handler = &v2ProtocolHandler{} - case "": - glog.V(4).Infof("Client did not request protocol negotiaion. Falling back to %q", remotecommandconsts.StreamProtocolV1Name) - fallthrough - case remotecommandconsts.StreamProtocolV1Name: - handler = &v1ProtocolHandler{} - } - - // count the streams client asked for, starting with 1 - expectedStreams := 1 - if opts.Stdin { - expectedStreams++ - } - if opts.Stdout { - expectedStreams++ - } - if opts.Stderr { - expectedStreams++ - } - if opts.TTY && handler.supportsTerminalResizing() { - expectedStreams++ - } - - expired := time.NewTimer(streamCreationTimeout) - defer expired.Stop() - - ctx, err := handler.waitForStreams(streamCh, expectedStreams, expired.C) - if err != nil { - runtime.HandleError(err) - return nil, false - } - - ctx.conn = conn - ctx.tty = opts.TTY - - return ctx, true -} - -type protocolHandler interface { - // waitForStreams waits for the expected streams or a timeout, returning a - // remoteCommandContext if all the streams were received, or an error if not. - waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) - // supportsTerminalResizing returns true if the protocol handler supports terminal resizing - supportsTerminalResizing() bool -} - -// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs -// in from v3 in the error stream format using an json-marshaled metav1.Status which carries -// the process' exit code. -type v4ProtocolHandler struct{} - -func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { - ctx := &context{} - receivedStreams := 0 - replyChan := make(chan struct{}) - stop := make(chan struct{}) - defer close(stop) -WaitForStreams: - for { - select { - case stream := <-streams: - streamType := stream.Headers().Get(api.StreamType) - switch streamType { - case api.StreamTypeError: - ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdin: - ctx.stdinStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdout: - ctx.stdoutStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStderr: - ctx.stderrStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeResize: - ctx.resizeStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - default: - runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) - } - case <-replyChan: - receivedStreams++ - if receivedStreams == expectedStreams { - break WaitForStreams - } - case <-expired: - // TODO find a way to return the error to the user. Maybe use a separate - // stream to report errors? - return nil, errors.New("timed out waiting for client to create streams") - } - } - - return ctx, nil -} - -// supportsTerminalResizing returns true because v4ProtocolHandler supports it -func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true } - -// v3ProtocolHandler implements the V3 protocol version for streaming command execution. -type v3ProtocolHandler struct{} - -func (*v3ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { - ctx := &context{} - receivedStreams := 0 - replyChan := make(chan struct{}) - stop := make(chan struct{}) - defer close(stop) -WaitForStreams: - for { - select { - case stream := <-streams: - streamType := stream.Headers().Get(api.StreamType) - switch streamType { - case api.StreamTypeError: - ctx.writeStatus = v1WriteStatusFunc(stream) - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdin: - ctx.stdinStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdout: - ctx.stdoutStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStderr: - ctx.stderrStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeResize: - ctx.resizeStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - default: - runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) - } - case <-replyChan: - receivedStreams++ - if receivedStreams == expectedStreams { - break WaitForStreams - } - case <-expired: - // TODO find a way to return the error to the user. Maybe use a separate - // stream to report errors? - return nil, errors.New("timed out waiting for client to create streams") - } - } - - return ctx, nil -} - -// supportsTerminalResizing returns true because v3ProtocolHandler supports it -func (*v3ProtocolHandler) supportsTerminalResizing() bool { return true } - -// v2ProtocolHandler implements the V2 protocol version for streaming command execution. -type v2ProtocolHandler struct{} - -func (*v2ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { - ctx := &context{} - receivedStreams := 0 - replyChan := make(chan struct{}) - stop := make(chan struct{}) - defer close(stop) -WaitForStreams: - for { - select { - case stream := <-streams: - streamType := stream.Headers().Get(api.StreamType) - switch streamType { - case api.StreamTypeError: - ctx.writeStatus = v1WriteStatusFunc(stream) - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdin: - ctx.stdinStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdout: - ctx.stdoutStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStderr: - ctx.stderrStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - default: - runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) - } - case <-replyChan: - receivedStreams++ - if receivedStreams == expectedStreams { - break WaitForStreams - } - case <-expired: - // TODO find a way to return the error to the user. Maybe use a separate - // stream to report errors? - return nil, errors.New("timed out waiting for client to create streams") - } - } - - return ctx, nil -} - -// supportsTerminalResizing returns false because v2ProtocolHandler doesn't support it. -func (*v2ProtocolHandler) supportsTerminalResizing() bool { return false } - -// v1ProtocolHandler implements the V1 protocol version for streaming command execution. -type v1ProtocolHandler struct{} - -func (*v1ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) { - ctx := &context{} - receivedStreams := 0 - replyChan := make(chan struct{}) - stop := make(chan struct{}) - defer close(stop) -WaitForStreams: - for { - select { - case stream := <-streams: - streamType := stream.Headers().Get(api.StreamType) - switch streamType { - case api.StreamTypeError: - ctx.writeStatus = v1WriteStatusFunc(stream) - - // This defer statement shouldn't be here, but due to previous refactoring, it ended up in - // here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in - // the v2ProtocolHandler. - defer stream.Reset() - - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdin: - ctx.stdinStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStdout: - ctx.stdoutStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - case api.StreamTypeStderr: - ctx.stderrStream = stream - go waitStreamReply(stream.replySent, replyChan, stop) - default: - runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType)) - } - case <-replyChan: - receivedStreams++ - if receivedStreams == expectedStreams { - break WaitForStreams - } - case <-expired: - // TODO find a way to return the error to the user. Maybe use a separate - // stream to report errors? - return nil, errors.New("timed out waiting for client to create streams") - } - } - - if ctx.stdinStream != nil { - ctx.stdinStream.Close() - } - - return ctx, nil -} - -// supportsTerminalResizing returns false because v1ProtocolHandler doesn't support it. -func (*v1ProtocolHandler) supportsTerminalResizing() bool { return false } - -func handleResizeEvents(stream io.Reader, channel chan<- remotecommand.TerminalSize) { - defer runtime.HandleCrash() - - decoder := json.NewDecoder(stream) - for { - size := remotecommand.TerminalSize{} - if err := decoder.Decode(&size); err != nil { - break - } - channel <- size - } -} - -func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error { - return func(status *apierrors.StatusError) error { - if status.Status().Status == metav1.StatusSuccess { - return nil // send error messages - } - _, err := stream.Write([]byte(status.Error())) - return err - } -} - -// v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status -// as json in the error channel. -func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error { - return func(status *apierrors.StatusError) error { - bs, err := json.Marshal(status.Status()) - if err != nil { - return err - } - _, err = stream.Write(bs) - return err - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go deleted file mode 100644 index c60012b21..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/remotecommand/websocket.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package remotecommand - -import ( - "fmt" - "net/http" - "time" - - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apiserver/pkg/server/httplog" - "k8s.io/apiserver/pkg/util/wsstream" -) - -const ( - stdinChannel = iota - stdoutChannel - stderrChannel - errorChannel - resizeChannel - - preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol - preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol - v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol - v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol -) - -// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2) -// along with the approximate duplex value. It also creates the error (3) and resize (4) channels. -func createChannels(opts *Options) []wsstream.ChannelType { - // open the requested channels, and always open the error channel - channels := make([]wsstream.ChannelType, 5) - channels[stdinChannel] = readChannel(opts.Stdin) - channels[stdoutChannel] = writeChannel(opts.Stdout) - channels[stderrChannel] = writeChannel(opts.Stderr) - channels[errorChannel] = wsstream.WriteChannel - channels[resizeChannel] = wsstream.ReadChannel - return channels -} - -// readChannel returns wsstream.ReadChannel if real is true, or wsstream.IgnoreChannel. -func readChannel(real bool) wsstream.ChannelType { - if real { - return wsstream.ReadChannel - } - return wsstream.IgnoreChannel -} - -// writeChannel returns wsstream.WriteChannel if real is true, or wsstream.IgnoreChannel. -func writeChannel(real bool) wsstream.ChannelType { - if real { - return wsstream.WriteChannel - } - return wsstream.IgnoreChannel -} - -// createWebSocketStreams returns a context containing the websocket connection and -// streams needed to perform an exec or an attach. -func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *Options, idleTimeout time.Duration) (*context, bool) { - channels := createChannels(opts) - conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ - "": { - Binary: true, - Channels: channels, - }, - preV4BinaryWebsocketProtocol: { - Binary: true, - Channels: channels, - }, - preV4Base64WebsocketProtocol: { - Binary: false, - Channels: channels, - }, - v4BinaryWebsocketProtocol: { - Binary: true, - Channels: channels, - }, - v4Base64WebsocketProtocol: { - Binary: false, - Channels: channels, - }, - }) - conn.SetIdleTimeout(idleTimeout) - negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(w), req) - if err != nil { - runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err)) - return nil, false - } - - // Send an empty message to the lowest writable channel to notify the client the connection is established - // TODO: make generic to SPDY and WebSockets and do it outside of this method? - switch { - case opts.Stdout: - streams[stdoutChannel].Write([]byte{}) - case opts.Stderr: - streams[stderrChannel].Write([]byte{}) - default: - streams[errorChannel].Write([]byte{}) - } - - ctx := &context{ - conn: conn, - stdinStream: streams[stdinChannel], - stdoutStream: streams[stdoutChannel], - stderrStream: streams[stderrChannel], - tty: opts.TTY, - resizeStream: streams[resizeChannel], - } - - switch negotiatedProtocol { - case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol: - ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel]) - default: - ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel]) - } - - return ctx, true -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go deleted file mode 100644 index 9f16b4eb2..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/errors.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package streaming - -import ( - "fmt" - "net/http" - "strconv" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" -) - -func ErrorStreamingDisabled(method string) error { - return grpc.Errorf(codes.NotFound, fmt.Sprintf("streaming method %s disabled", method)) -} - -// The error returned when the maximum number of in-flight requests is exceeded. -func ErrorTooManyInFlight() error { - return grpc.Errorf(codes.ResourceExhausted, "maximum number of in-flight requests exceeded") -} - -// Translates a CRI streaming error into an appropriate HTTP response. -func WriteError(err error, w http.ResponseWriter) error { - var status int - switch grpc.Code(err) { - case codes.NotFound: - status = http.StatusNotFound - case codes.ResourceExhausted: - // We only expect to hit this if there is a DoS, so we just wait the full TTL. - // If this is ever hit in steady-state operations, consider increasing the MaxInFlight requests, - // or plumbing through the time to next expiration. - w.Header().Set("Retry-After", strconv.Itoa(int(CacheTTL.Seconds()))) - status = http.StatusTooManyRequests - default: - status = http.StatusInternalServerError - } - w.WriteHeader(status) - _, writeErr := w.Write([]byte(err.Error())) - return writeErr -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go deleted file mode 100644 index f68f640be..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/request_cache.go +++ /dev/null @@ -1,146 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package streaming - -import ( - "container/list" - "crypto/rand" - "encoding/base64" - "fmt" - "math" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/clock" -) - -var ( - // Timeout after which tokens become invalid. - CacheTTL = 1 * time.Minute - // The maximum number of in-flight requests to allow. - MaxInFlight = 1000 - // Length of the random base64 encoded token identifying the request. - TokenLen = 8 -) - -// requestCache caches streaming (exec/attach/port-forward) requests and generates a single-use -// random token for their retrieval. The requestCache is used for building streaming URLs without -// the need to encode every request parameter in the URL. -type requestCache struct { - // clock is used to obtain the current time - clock clock.Clock - - // tokens maps the generate token to the request for fast retrieval. - tokens map[string]*list.Element - // ll maintains an age-ordered request list for faster garbage collection of expired requests. - ll *list.List - - lock sync.Mutex -} - -// Type representing an *ExecRequest, *AttachRequest, or *PortForwardRequest. -type request interface{} - -type cacheEntry struct { - token string - req request - expireTime time.Time -} - -func newRequestCache() *requestCache { - return &requestCache{ - clock: clock.RealClock{}, - ll: list.New(), - tokens: make(map[string]*list.Element), - } -} - -// Insert the given request into the cache and returns the token used for fetching it out. -func (c *requestCache) Insert(req request) (token string, err error) { - c.lock.Lock() - defer c.lock.Unlock() - - // Remove expired entries. - c.gc() - // If the cache is full, reject the request. - if c.ll.Len() == MaxInFlight { - return "", ErrorTooManyInFlight() - } - token, err = c.uniqueToken() - if err != nil { - return "", err - } - ele := c.ll.PushFront(&cacheEntry{token, req, c.clock.Now().Add(CacheTTL)}) - - c.tokens[token] = ele - return token, nil -} - -// Consume the token (remove it from the cache) and return the cached request, if found. -func (c *requestCache) Consume(token string) (req request, found bool) { - c.lock.Lock() - defer c.lock.Unlock() - ele, ok := c.tokens[token] - if !ok { - return nil, false - } - c.ll.Remove(ele) - delete(c.tokens, token) - - entry := ele.Value.(*cacheEntry) - if c.clock.Now().After(entry.expireTime) { - // Entry already expired. - return nil, false - } - return entry.req, true -} - -// uniqueToken generates a random URL-safe token and ensures uniqueness. -func (c *requestCache) uniqueToken() (string, error) { - const maxTries = 10 - // Number of bytes to be TokenLen when base64 encoded. - tokenSize := math.Ceil(float64(TokenLen) * 6 / 8) - rawToken := make([]byte, int(tokenSize)) - for i := 0; i < maxTries; i++ { - if _, err := rand.Read(rawToken); err != nil { - return "", err - } - encoded := base64.RawURLEncoding.EncodeToString(rawToken) - token := encoded[:TokenLen] - // If it's unique, return it. Otherwise retry. - if _, exists := c.tokens[encoded]; !exists { - return token, nil - } - } - return "", fmt.Errorf("failed to generate unique token") -} - -// Must be write-locked prior to calling. -func (c *requestCache) gc() { - now := c.clock.Now() - for c.ll.Len() > 0 { - oldest := c.ll.Back() - entry := oldest.Value.(*cacheEntry) - if !now.After(entry.expireTime) { - return - } - - // Oldest value is expired; remove it. - c.ll.Remove(oldest) - delete(c.tokens, entry.token) - } -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go b/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go deleted file mode 100644 index 875e44462..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/server/streaming/server.go +++ /dev/null @@ -1,344 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package streaming - -import ( - "crypto/tls" - "errors" - "io" - "net/http" - "net/url" - "path" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - - restful "github.com/emicklei/go-restful" - - "k8s.io/apimachinery/pkg/types" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" - "k8s.io/client-go/tools/remotecommand" - runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" - "k8s.io/kubernetes/pkg/kubelet/server/portforward" - remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" -) - -// The library interface to serve the stream requests. -type Server interface { - http.Handler - - // Get the serving URL for the requests. - // Requests must not be nil. Responses may be nil iff an error is returned. - GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) - GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) - GetPortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) - - // Start the server. - // addr is the address to serve on (address:port) stayUp indicates whether the server should - // listen until Stop() is called, or automatically stop after all expected connections are - // closed. Calling Get{Exec,Attach,PortForward} increments the expected connection count. - // Function does not return until the server is stopped. - Start(stayUp bool) error - // Stop the server, and terminate any open connections. - Stop() error -} - -// The interface to execute the commands and provide the streams. -type Runtime interface { - Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error - Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error - PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error -} - -// Config defines the options used for running the stream server. -type Config struct { - // The host:port address the server will listen on. - Addr string - // The optional base URL for constructing streaming URLs. If empty, the baseURL will be - // constructed from the serve address. - BaseURL *url.URL - - // How long to leave idle connections open for. - StreamIdleTimeout time.Duration - // How long to wait for clients to create streams. Only used for SPDY streaming. - StreamCreationTimeout time.Duration - - // The streaming protocols the server supports (understands and permits). See - // k8s.io/kubernetes/pkg/kubelet/server/remotecommand/constants.go for available protocols. - // Only used for SPDY streaming. - SupportedRemoteCommandProtocols []string - - // The streaming protocols the server supports (understands and permits). See - // k8s.io/kubernetes/pkg/kubelet/server/portforward/constants.go for available protocols. - // Only used for SPDY streaming. - SupportedPortForwardProtocols []string - - // The config for serving over TLS. If nil, TLS will not be used. - TLSConfig *tls.Config -} - -// DefaultConfig provides default values for server Config. The DefaultConfig is partial, so -// some fields like Addr must still be provided. -var DefaultConfig = Config{ - StreamIdleTimeout: 4 * time.Hour, - StreamCreationTimeout: remotecommandconsts.DefaultStreamCreationTimeout, - SupportedRemoteCommandProtocols: remotecommandconsts.SupportedStreamingProtocols, - SupportedPortForwardProtocols: portforward.SupportedProtocols, -} - -// TODO(timstclair): Add auth(n/z) interface & handling. -func NewServer(config Config, runtime Runtime) (Server, error) { - s := &server{ - config: config, - runtime: &criAdapter{runtime}, - cache: newRequestCache(), - } - - if s.config.BaseURL == nil { - s.config.BaseURL = &url.URL{ - Scheme: "http", - Host: s.config.Addr, - } - if s.config.TLSConfig != nil { - s.config.BaseURL.Scheme = "https" - } - } - - ws := &restful.WebService{} - endpoints := []struct { - path string - handler restful.RouteFunction - }{ - {"/exec/{token}", s.serveExec}, - {"/attach/{token}", s.serveAttach}, - {"/portforward/{token}", s.servePortForward}, - } - // If serving relative to a base path, set that here. - pathPrefix := path.Dir(s.config.BaseURL.Path) - for _, e := range endpoints { - for _, method := range []string{"GET", "POST"} { - ws.Route(ws. - Method(method). - Path(path.Join(pathPrefix, e.path)). - To(e.handler)) - } - } - handler := restful.NewContainer() - handler.Add(ws) - s.handler = handler - - return s, nil -} - -type server struct { - config Config - runtime *criAdapter - handler http.Handler - cache *requestCache -} - -func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { - if req.ContainerId == "" { - return nil, grpc.Errorf(codes.InvalidArgument, "missing required container_id") - } - token, err := s.cache.Insert(req) - if err != nil { - return nil, err - } - return &runtimeapi.ExecResponse{ - Url: s.buildURL("exec", token), - }, nil -} - -func (s *server) GetAttach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { - if req.ContainerId == "" { - return nil, grpc.Errorf(codes.InvalidArgument, "missing required container_id") - } - token, err := s.cache.Insert(req) - if err != nil { - return nil, err - } - return &runtimeapi.AttachResponse{ - Url: s.buildURL("attach", token), - }, nil -} - -func (s *server) GetPortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { - if req.PodSandboxId == "" { - return nil, grpc.Errorf(codes.InvalidArgument, "missing required pod_sandbox_id") - } - token, err := s.cache.Insert(req) - if err != nil { - return nil, err - } - return &runtimeapi.PortForwardResponse{ - Url: s.buildURL("portforward", token), - }, nil -} - -func (s *server) Start(stayUp bool) error { - if !stayUp { - // TODO(timstclair): Implement this. - return errors.New("stayUp=false is not yet implemented") - } - - server := &http.Server{ - Addr: s.config.Addr, - Handler: s.handler, - TLSConfig: s.config.TLSConfig, - } - if s.config.TLSConfig != nil { - return server.ListenAndServeTLS("", "") // Use certs from TLSConfig. - } else { - return server.ListenAndServe() - } -} - -func (s *server) Stop() error { - // TODO(timstclair): Implement this. - return errors.New("not yet implemented") -} - -func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.handler.ServeHTTP(w, r) -} - -func (s *server) buildURL(method, token string) string { - return s.config.BaseURL.ResolveReference(&url.URL{ - Path: path.Join(method, token), - }).String() -} - -func (s *server) serveExec(req *restful.Request, resp *restful.Response) { - token := req.PathParameter("token") - cachedRequest, ok := s.cache.Consume(token) - if !ok { - http.NotFound(resp.ResponseWriter, req.Request) - return - } - exec, ok := cachedRequest.(*runtimeapi.ExecRequest) - if !ok { - http.NotFound(resp.ResponseWriter, req.Request) - return - } - - streamOpts := &remotecommandserver.Options{ - Stdin: exec.Stdin, - Stdout: true, - Stderr: !exec.Tty, - TTY: exec.Tty, - } - - remotecommandserver.ServeExec( - resp.ResponseWriter, - req.Request, - s.runtime, - "", // unused: podName - "", // unusued: podUID - exec.ContainerId, - exec.Cmd, - streamOpts, - s.config.StreamIdleTimeout, - s.config.StreamCreationTimeout, - s.config.SupportedRemoteCommandProtocols) -} - -func (s *server) serveAttach(req *restful.Request, resp *restful.Response) { - token := req.PathParameter("token") - cachedRequest, ok := s.cache.Consume(token) - if !ok { - http.NotFound(resp.ResponseWriter, req.Request) - return - } - attach, ok := cachedRequest.(*runtimeapi.AttachRequest) - if !ok { - http.NotFound(resp.ResponseWriter, req.Request) - return - } - - streamOpts := &remotecommandserver.Options{ - Stdin: attach.Stdin, - Stdout: true, - Stderr: !attach.Tty, - TTY: attach.Tty, - } - remotecommandserver.ServeAttach( - resp.ResponseWriter, - req.Request, - s.runtime, - "", // unused: podName - "", // unusued: podUID - attach.ContainerId, - streamOpts, - s.config.StreamIdleTimeout, - s.config.StreamCreationTimeout, - s.config.SupportedRemoteCommandProtocols) -} - -func (s *server) servePortForward(req *restful.Request, resp *restful.Response) { - token := req.PathParameter("token") - cachedRequest, ok := s.cache.Consume(token) - if !ok { - http.NotFound(resp.ResponseWriter, req.Request) - return - } - pf, ok := cachedRequest.(*runtimeapi.PortForwardRequest) - if !ok { - http.NotFound(resp.ResponseWriter, req.Request) - return - } - - portForwardOptions, err := portforward.BuildV4Options(pf.Port) - if err != nil { - resp.WriteError(http.StatusBadRequest, err) - return - } - - portforward.ServePortForward( - resp.ResponseWriter, - req.Request, - s.runtime, - pf.PodSandboxId, - "", // unused: podUID - portForwardOptions, - s.config.StreamIdleTimeout, - s.config.StreamCreationTimeout, - s.config.SupportedPortForwardProtocols) -} - -// criAdapter wraps the Runtime functions to conform to the remotecommand interfaces. -// The adapter binds the container ID to the container name argument, and the pod sandbox ID to the pod name. -type criAdapter struct { - Runtime -} - -var _ remotecommandserver.Executor = &criAdapter{} -var _ remotecommandserver.Attacher = &criAdapter{} -var _ portforward.PortForwarder = &criAdapter{} - -func (a *criAdapter) ExecInContainer(podName string, podUID types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { - return a.Exec(container, cmd, in, out, err, tty, resize) -} - -func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - return a.Attach(container, in, out, err, tty, resize) -} - -func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error { - return a.Runtime.PortForward(podName, port, stream) -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/types/constants.go b/vendor/k8s.io/kubernetes/pkg/kubelet/types/constants.go deleted file mode 100644 index eeabba017..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/types/constants.go +++ /dev/null @@ -1,22 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -const ( - // system default DNS resolver configuration - ResolvConfDefault = "/etc/resolv.conf" -) diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/types/doc.go b/vendor/k8s.io/kubernetes/pkg/kubelet/types/doc.go deleted file mode 100644 index 88e345636..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/types/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Common types in the Kubelet. -package types // import "k8s.io/kubernetes/pkg/kubelet/types" diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/types/labels.go b/vendor/k8s.io/kubernetes/pkg/kubelet/types/labels.go deleted file mode 100644 index c4dad6302..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/types/labels.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -const ( - KubernetesPodNameLabel = "io.kubernetes.pod.name" - KubernetesPodNamespaceLabel = "io.kubernetes.pod.namespace" - KubernetesPodUIDLabel = "io.kubernetes.pod.uid" - KubernetesContainerNameLabel = "io.kubernetes.container.name" -) - -func GetContainerName(labels map[string]string) string { - return labels[KubernetesContainerNameLabel] -} - -func GetPodName(labels map[string]string) string { - return labels[KubernetesPodNameLabel] -} - -func GetPodUID(labels map[string]string) string { - return labels[KubernetesPodUIDLabel] -} - -func GetPodNamespace(labels map[string]string) string { - return labels[KubernetesPodNamespaceLabel] -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/types/pod_update.go b/vendor/k8s.io/kubernetes/pkg/kubelet/types/pod_update.go deleted file mode 100644 index 2c2dbb8a0..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/types/pod_update.go +++ /dev/null @@ -1,153 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -import ( - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubeapi "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/v1" -) - -const ( - ConfigSourceAnnotationKey = "kubernetes.io/config.source" - ConfigMirrorAnnotationKey = v1.MirrorPodAnnotationKey - ConfigFirstSeenAnnotationKey = "kubernetes.io/config.seen" - ConfigHashAnnotationKey = "kubernetes.io/config.hash" - CriticalPodAnnotationKey = "scheduler.alpha.kubernetes.io/critical-pod" -) - -// PodOperation defines what changes will be made on a pod configuration. -type PodOperation int - -const ( - // This is the current pod configuration - SET PodOperation = iota - // Pods with the given ids are new to this source - ADD - // Pods with the given ids are gracefully deleted from this source - DELETE - // Pods with the given ids have been removed from this source - REMOVE - // Pods with the given ids have been updated in this source - UPDATE - // Pods with the given ids have unexpected status in this source, - // kubelet should reconcile status with this source - RECONCILE - - // These constants identify the sources of pods - // Updates from a file - FileSource = "file" - // Updates from querying a web page - HTTPSource = "http" - // Updates from Kubernetes API Server - ApiserverSource = "api" - // Updates from all sources - AllSource = "*" - - NamespaceDefault = metav1.NamespaceDefault -) - -// PodUpdate defines an operation sent on the channel. You can add or remove single services by -// sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required). -// For setting the state of the system to a given state for this source configuration, set -// Pods as desired and Op to SET, which will reset the system state to that specified in this -// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET. -// -// Additionally, Pods should never be nil - it should always point to an empty slice. While -// functionally similar, this helps our unit tests properly check that the correct PodUpdates -// are generated. -type PodUpdate struct { - Pods []*v1.Pod - Op PodOperation - Source string -} - -// Gets all validated sources from the specified sources. -func GetValidatedSources(sources []string) ([]string, error) { - validated := make([]string, 0, len(sources)) - for _, source := range sources { - switch source { - case AllSource: - return []string{FileSource, HTTPSource, ApiserverSource}, nil - case FileSource, HTTPSource, ApiserverSource: - validated = append(validated, source) - break - case "": - break - default: - return []string{}, fmt.Errorf("unknown pod source %q", source) - } - } - return validated, nil -} - -// GetPodSource returns the source of the pod based on the annotation. -func GetPodSource(pod *v1.Pod) (string, error) { - if pod.Annotations != nil { - if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok { - return source, nil - } - } - return "", fmt.Errorf("cannot get source of pod %q", pod.UID) -} - -// SyncPodType classifies pod updates, eg: create, update. -type SyncPodType int - -const ( - // SyncPodSync is when the pod is synced to ensure desired state - SyncPodSync SyncPodType = iota - // SyncPodUpdate is when the pod is updated from source - SyncPodUpdate - // SyncPodCreate is when the pod is created from source - SyncPodCreate - // SyncPodKill is when the pod is killed based on a trigger internal to the kubelet for eviction. - // If a SyncPodKill request is made to pod workers, the request is never dropped, and will always be processed. - SyncPodKill -) - -func (sp SyncPodType) String() string { - switch sp { - case SyncPodCreate: - return "create" - case SyncPodUpdate: - return "update" - case SyncPodSync: - return "sync" - case SyncPodKill: - return "kill" - default: - return "unknown" - } -} - -// IsCriticalPod returns true if the pod bears the critical pod annotation -// key. Both the rescheduler and the kubelet use this key to make admission -// and scheduling decisions. -func IsCriticalPod(pod *v1.Pod) bool { - // Critical pods are restricted to "kube-system" namespace as of now. - if pod.Namespace != kubeapi.NamespaceSystem { - return false - } - val, ok := pod.Annotations[CriticalPodAnnotationKey] - if ok && val == "" { - return true - } - return false -} diff --git a/vendor/k8s.io/kubernetes/pkg/kubelet/types/types.go b/vendor/k8s.io/kubernetes/pkg/kubelet/types/types.go deleted file mode 100644 index 35359c7aa..000000000 --- a/vendor/k8s.io/kubernetes/pkg/kubelet/types/types.go +++ /dev/null @@ -1,93 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -import ( - "net/http" - "time" - - "k8s.io/kubernetes/pkg/api/v1" -) - -// TODO: Reconcile custom types in kubelet/types and this subpackage - -type HttpGetter interface { - Get(url string) (*http.Response, error) -} - -// Timestamp wraps around time.Time and offers utilities to format and parse -// the time using RFC3339Nano -type Timestamp struct { - time time.Time -} - -// NewTimestamp returns a Timestamp object using the current time. -func NewTimestamp() *Timestamp { - return &Timestamp{time.Now()} -} - -// ConvertToTimestamp takes a string, parses it using the RFC3339Nano layout, -// and converts it to a Timestamp object. -func ConvertToTimestamp(timeString string) *Timestamp { - parsed, _ := time.Parse(time.RFC3339Nano, timeString) - return &Timestamp{parsed} -} - -// Get returns the time as time.Time. -func (t *Timestamp) Get() time.Time { - return t.time -} - -// GetString returns the time in the string format using the RFC3339Nano -// layout. -func (t *Timestamp) GetString() string { - return t.time.Format(time.RFC3339Nano) -} - -// A type to help sort container statuses based on container names. -type SortedContainerStatuses []v1.ContainerStatus - -func (s SortedContainerStatuses) Len() int { return len(s) } -func (s SortedContainerStatuses) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s SortedContainerStatuses) Less(i, j int) bool { - return s[i].Name < s[j].Name -} - -// SortInitContainerStatuses ensures that statuses are in the order that their -// init container appears in the pod spec -func SortInitContainerStatuses(p *v1.Pod, statuses []v1.ContainerStatus) { - containers := p.Spec.InitContainers - current := 0 - for _, container := range containers { - for j := current; j < len(statuses); j++ { - if container.Name == statuses[j].Name { - statuses[current], statuses[j] = statuses[j], statuses[current] - current++ - break - } - } - } -} - -// Reservation represents reserved resources for non-pod components. -type Reservation struct { - // System represents resources reserved for non-kubernetes components. - System v1.ResourceList - // Kubernetes represents resources reserved for kubernetes system components. - Kubernetes v1.ResourceList -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/interrupt/interrupt.go b/vendor/k8s.io/kubernetes/pkg/util/interrupt/interrupt.go deleted file mode 100644 index 0265b9fb1..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/interrupt/interrupt.go +++ /dev/null @@ -1,104 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package interrupt - -import ( - "os" - "os/signal" - "sync" - "syscall" -) - -// terminationSignals are signals that cause the program to exit in the -// supported platforms (linux, darwin, windows). -var terminationSignals = []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT} - -// Handler guarantees execution of notifications after a critical section (the function passed -// to a Run method), even in the presence of process termination. It guarantees exactly once -// invocation of the provided notify functions. -type Handler struct { - notify []func() - final func(os.Signal) - once sync.Once -} - -// Chain creates a new handler that invokes all notify functions when the critical section exits -// and then invokes the optional handler's notifications. This allows critical sections to be -// nested without losing exactly once invocations. Notify functions can invoke any cleanup needed -// but should not exit (which is the responsibility of the parent handler). -func Chain(handler *Handler, notify ...func()) *Handler { - if handler == nil { - return New(nil, notify...) - } - return New(handler.Signal, append(notify, handler.Close)...) -} - -// New creates a new handler that guarantees all notify functions are run after the critical -// section exits (or is interrupted by the OS), then invokes the final handler. If no final -// handler is specified, the default final is `os.Exit(1)`. A handler can only be used for -// one critical section. -func New(final func(os.Signal), notify ...func()) *Handler { - return &Handler{ - final: final, - notify: notify, - } -} - -// Close executes all the notification handlers if they have not yet been executed. -func (h *Handler) Close() { - h.once.Do(func() { - for _, fn := range h.notify { - fn() - } - }) -} - -// Signal is called when an os.Signal is received, and guarantees that all notifications -// are executed, then the final handler is executed. This function should only be called once -// per Handler instance. -func (h *Handler) Signal(s os.Signal) { - h.once.Do(func() { - for _, fn := range h.notify { - fn() - } - if h.final == nil { - os.Exit(1) - } - h.final(s) - }) -} - -// Run ensures that any notifications are invoked after the provided fn exits (even if the -// process is interrupted by an OS termination signal). Notifications are only invoked once -// per Handler instance, so calling Run more than once will not behave as the user expects. -func (h *Handler) Run(fn func() error) error { - ch := make(chan os.Signal, 1) - signal.Notify(ch, terminationSignals...) - defer func() { - signal.Stop(ch) - close(ch) - }() - go func() { - sig, ok := <-ch - if !ok { - return - } - h.Signal(sig) - }() - defer h.Close() - return fn() -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/resize.go b/vendor/k8s.io/kubernetes/pkg/util/term/resize.go deleted file mode 100644 index 7ca09a858..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/resize.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "fmt" - - "github.com/docker/docker/pkg/term" - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/remotecommand" -) - -// GetSize returns the current size of the user's terminal. If it isn't a terminal, -// nil is returned. -func (t TTY) GetSize() *remotecommand.TerminalSize { - outFd, isTerminal := term.GetFdInfo(t.Out) - if !isTerminal { - return nil - } - return GetSize(outFd) -} - -// GetSize returns the current size of the terminal associated with fd. -func GetSize(fd uintptr) *remotecommand.TerminalSize { - winsize, err := term.GetWinsize(fd) - if err != nil { - runtime.HandleError(fmt.Errorf("unable to get terminal size: %v", err)) - return nil - } - - return &remotecommand.TerminalSize{Width: winsize.Width, Height: winsize.Height} -} - -// MonitorSize monitors the terminal's size. It returns a TerminalSizeQueue primed with -// initialSizes, or nil if there's no TTY present. -func (t *TTY) MonitorSize(initialSizes ...*remotecommand.TerminalSize) remotecommand.TerminalSizeQueue { - outFd, isTerminal := term.GetFdInfo(t.Out) - if !isTerminal { - return nil - } - - t.sizeQueue = &sizeQueue{ - t: *t, - // make it buffered so we can send the initial terminal sizes without blocking, prior to starting - // the streaming below - resizeChan: make(chan remotecommand.TerminalSize, len(initialSizes)), - stopResizing: make(chan struct{}), - } - - t.sizeQueue.monitorSize(outFd, initialSizes...) - - return t.sizeQueue -} - -// sizeQueue implements remotecommand.TerminalSizeQueue -type sizeQueue struct { - t TTY - // resizeChan receives a Size each time the user's terminal is resized. - resizeChan chan remotecommand.TerminalSize - stopResizing chan struct{} -} - -// make sure sizeQueue implements the resize.TerminalSizeQueue interface -var _ remotecommand.TerminalSizeQueue = &sizeQueue{} - -// monitorSize primes resizeChan with initialSizes and then monitors for resize events. With each -// new event, it sends the current terminal size to resizeChan. -func (s *sizeQueue) monitorSize(outFd uintptr, initialSizes ...*remotecommand.TerminalSize) { - // send the initial sizes - for i := range initialSizes { - if initialSizes[i] != nil { - s.resizeChan <- *initialSizes[i] - } - } - - resizeEvents := make(chan remotecommand.TerminalSize, 1) - - monitorResizeEvents(outFd, resizeEvents, s.stopResizing) - - // listen for resize events in the background - go func() { - defer runtime.HandleCrash() - - for { - select { - case size, ok := <-resizeEvents: - if !ok { - return - } - - select { - // try to send the size to resizeChan, but don't block - case s.resizeChan <- size: - // send successful - default: - // unable to send / no-op - } - case <-s.stopResizing: - return - } - } - }() -} - -// Next returns the new terminal size after the terminal has been resized. It returns nil when -// monitoring has been stopped. -func (s *sizeQueue) Next() *remotecommand.TerminalSize { - size, ok := <-s.resizeChan - if !ok { - return nil - } - return &size -} - -// stop stops the background goroutine that is monitoring for terminal resizes. -func (s *sizeQueue) stop() { - close(s.stopResizing) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/resizeevents.go b/vendor/k8s.io/kubernetes/pkg/util/term/resizeevents.go deleted file mode 100644 index 75e9690df..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/resizeevents.go +++ /dev/null @@ -1,61 +0,0 @@ -// +build !windows - -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "os" - "os/signal" - "syscall" - - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/remotecommand" -) - -// monitorResizeEvents spawns a goroutine that waits for SIGWINCH signals (these indicate the -// terminal has resized). After receiving a SIGWINCH, this gets the terminal size and tries to send -// it to the resizeEvents channel. The goroutine stops when the stop channel is closed. -func monitorResizeEvents(fd uintptr, resizeEvents chan<- remotecommand.TerminalSize, stop chan struct{}) { - go func() { - defer runtime.HandleCrash() - - winch := make(chan os.Signal, 1) - signal.Notify(winch, syscall.SIGWINCH) - defer signal.Stop(winch) - - for { - select { - case <-winch: - size := GetSize(fd) - if size == nil { - return - } - - // try to send size - select { - case resizeEvents <- *size: - // success - default: - // not sent - } - case <-stop: - return - } - } - }() -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/resizeevents_windows.go b/vendor/k8s.io/kubernetes/pkg/util/term/resizeevents_windows.go deleted file mode 100644 index adccf8734..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/resizeevents_windows.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "time" - - "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/tools/remotecommand" -) - -// monitorResizeEvents spawns a goroutine that periodically gets the terminal size and tries to send -// it to the resizeEvents channel if the size has changed. The goroutine stops when the stop channel -// is closed. -func monitorResizeEvents(fd uintptr, resizeEvents chan<- remotecommand.TerminalSize, stop chan struct{}) { - go func() { - defer runtime.HandleCrash() - - size := GetSize(fd) - if size == nil { - return - } - lastSize := *size - - for { - // see if we need to stop running - select { - case <-stop: - return - default: - } - - size := GetSize(fd) - if size == nil { - return - } - - if size.Height != lastSize.Height || size.Width != lastSize.Width { - lastSize.Height = size.Height - lastSize.Width = size.Width - resizeEvents <- *size - } - - // sleep to avoid hot looping - time.Sleep(250 * time.Millisecond) - } - }() -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/setsize.go b/vendor/k8s.io/kubernetes/pkg/util/term/setsize.go deleted file mode 100644 index 8cccd431a..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/setsize.go +++ /dev/null @@ -1,29 +0,0 @@ -// +build !windows - -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "github.com/docker/docker/pkg/term" - "k8s.io/client-go/tools/remotecommand" -) - -// SetSize sets the terminal size associated with fd. -func SetSize(fd uintptr, size remotecommand.TerminalSize) error { - return term.SetWinsize(fd, &term.Winsize{Height: size.Height, Width: size.Width}) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/setsize_unsupported.go b/vendor/k8s.io/kubernetes/pkg/util/term/setsize_unsupported.go deleted file mode 100644 index 82220217a..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/setsize_unsupported.go +++ /dev/null @@ -1,28 +0,0 @@ -// +build windows - -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "k8s.io/client-go/tools/remotecommand" -) - -func SetSize(fd uintptr, size remotecommand.TerminalSize) error { - // NOP - return nil -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/term.go b/vendor/k8s.io/kubernetes/pkg/util/term/term.go deleted file mode 100644 index 58baee831..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/term.go +++ /dev/null @@ -1,110 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "io" - "os" - - "github.com/docker/docker/pkg/term" - - "k8s.io/kubernetes/pkg/util/interrupt" -) - -// SafeFunc is a function to be invoked by TTY. -type SafeFunc func() error - -// TTY helps invoke a function and preserve the state of the terminal, even if the process is -// terminated during execution. It also provides support for terminal resizing for remote command -// execution/attachment. -type TTY struct { - // In is a reader representing stdin. It is a required field. - In io.Reader - // Out is a writer representing stdout. It must be set to support terminal resizing. It is an - // optional field. - Out io.Writer - // Raw is true if the terminal should be set raw. - Raw bool - // TryDev indicates the TTY should try to open /dev/tty if the provided input - // is not a file descriptor. - TryDev bool - // Parent is an optional interrupt handler provided to this function - if provided - // it will be invoked after the terminal state is restored. If it is not provided, - // a signal received during the TTY will result in os.Exit(0) being invoked. - Parent *interrupt.Handler - - // sizeQueue is set after a call to MonitorSize() and is used to monitor SIGWINCH signals when the - // user's terminal resizes. - sizeQueue *sizeQueue -} - -// IsTerminalIn returns true if t.In is a terminal. Does not check /dev/tty -// even if TryDev is set. -func (t TTY) IsTerminalIn() bool { - return IsTerminal(t.In) -} - -// IsTerminalOut returns true if t.Out is a terminal. Does not check /dev/tty -// even if TryDev is set. -func (t TTY) IsTerminalOut() bool { - return IsTerminal(t.Out) -} - -// IsTerminal returns whether the passed object is a terminal or not -func IsTerminal(i interface{}) bool { - _, terminal := term.GetFdInfo(i) - return terminal -} - -// Safe invokes the provided function and will attempt to ensure that when the -// function returns (or a termination signal is sent) that the terminal state -// is reset to the condition it was in prior to the function being invoked. If -// t.Raw is true the terminal will be put into raw mode prior to calling the function. -// If the input file descriptor is not a TTY and TryDev is true, the /dev/tty file -// will be opened (if available). -func (t TTY) Safe(fn SafeFunc) error { - inFd, isTerminal := term.GetFdInfo(t.In) - - if !isTerminal && t.TryDev { - if f, err := os.Open("/dev/tty"); err == nil { - defer f.Close() - inFd = f.Fd() - isTerminal = term.IsTerminal(inFd) - } - } - if !isTerminal { - return fn() - } - - var state *term.State - var err error - if t.Raw { - state, err = term.MakeRaw(inFd) - } else { - state, err = term.SaveState(inFd) - } - if err != nil { - return err - } - return interrupt.Chain(t.Parent, func() { - if t.sizeQueue != nil { - t.sizeQueue.stop() - } - - term.RestoreTerminal(inFd, state) - }).Run(fn) -} diff --git a/vendor/k8s.io/kubernetes/pkg/util/term/term_writer.go b/vendor/k8s.io/kubernetes/pkg/util/term/term_writer.go deleted file mode 100644 index 2d72d1e45..000000000 --- a/vendor/k8s.io/kubernetes/pkg/util/term/term_writer.go +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package term - -import ( - "io" - "os" - - "github.com/docker/docker/pkg/term" - wordwrap "github.com/mitchellh/go-wordwrap" -) - -type wordWrapWriter struct { - limit uint - writer io.Writer -} - -// NewResponsiveWriter creates a Writer that detects the column width of the -// terminal we are in, and adjusts every line width to fit and use recommended -// terminal sizes for better readability. Does proper word wrapping automatically. -// if terminal width >= 120 columns use 120 columns -// if terminal width >= 100 columns use 100 columns -// if terminal width >= 80 columns use 80 columns -// In case we're not in a terminal or if it's smaller than 80 columns width, -// doesn't do any wrapping. -func NewResponsiveWriter(w io.Writer) io.Writer { - file, ok := w.(*os.File) - if !ok { - return w - } - fd := file.Fd() - if !term.IsTerminal(fd) { - return w - } - - terminalSize := GetSize(fd) - if terminalSize == nil { - return w - } - - var limit uint - switch { - case terminalSize.Width >= 120: - limit = 120 - case terminalSize.Width >= 100: - limit = 100 - case terminalSize.Width >= 80: - limit = 80 - } - - return NewWordWrapWriter(w, limit) -} - -// NewWordWrapWriter is a Writer that supports a limit of characters on every line -// and does auto word wrapping that respects that limit. -func NewWordWrapWriter(w io.Writer, limit uint) io.Writer { - return &wordWrapWriter{ - limit: limit, - writer: w, - } -} - -func (w wordWrapWriter) Write(p []byte) (nn int, err error) { - if w.limit == 0 { - return w.writer.Write(p) - } - original := string(p) - wrapped := wordwrap.WrapString(original, w.limit) - return w.writer.Write([]byte(wrapped)) -} - -// NewPunchCardWriter is a NewWordWrapWriter that limits the line width to 80 columns. -func NewPunchCardWriter(w io.Writer) io.Writer { - return NewWordWrapWriter(w, 80) -} - -type maxWidthWriter struct { - maxWidth uint - currentWidth uint - written uint - writer io.Writer -} - -// NewMaxWidthWriter is a Writer that supports a limit of characters on every -// line, but doesn't do any word wrapping automatically. -func NewMaxWidthWriter(w io.Writer, maxWidth uint) io.Writer { - return &maxWidthWriter{ - maxWidth: maxWidth, - writer: w, - } -} - -func (m maxWidthWriter) Write(p []byte) (nn int, err error) { - for _, b := range p { - if m.currentWidth == m.maxWidth { - m.writer.Write([]byte{'\n'}) - m.currentWidth = 0 - } - if b == '\n' { - m.currentWidth = 0 - } - _, err := m.writer.Write([]byte{b}) - if err != nil { - return int(m.written), err - } - m.written++ - m.currentWidth++ - } - return len(p), nil -} |