summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apiserver
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apiserver')
-rw-r--r--vendor/k8s.io/apiserver/pkg/server/httplog/doc.go19
-rw-r--r--vendor/k8s.io/apiserver/pkg/server/httplog/log.go225
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go349
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go21
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go177
5 files changed, 0 insertions, 791 deletions
diff --git a/vendor/k8s.io/apiserver/pkg/server/httplog/doc.go b/vendor/k8s.io/apiserver/pkg/server/httplog/doc.go
deleted file mode 100644
index caa6572c7..000000000
--- a/vendor/k8s.io/apiserver/pkg/server/httplog/doc.go
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
-Copyright 2014 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Package httplog contains a helper object and functions to maintain a log
-// along with an http response.
-package httplog // import "k8s.io/apiserver/pkg/server/httplog"
diff --git a/vendor/k8s.io/apiserver/pkg/server/httplog/log.go b/vendor/k8s.io/apiserver/pkg/server/httplog/log.go
deleted file mode 100644
index 4a4894cee..000000000
--- a/vendor/k8s.io/apiserver/pkg/server/httplog/log.go
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
-Copyright 2014 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package httplog
-
-import (
- "bufio"
- "fmt"
- "net"
- "net/http"
- "runtime"
- "time"
-
- "github.com/golang/glog"
-)
-
-// Handler wraps all HTTP calls to delegate with nice logging.
-// delegate may use LogOf(w).Addf(...) to write additional info to
-// the per-request log message.
-//
-// Intended to wrap calls to your ServeMux.
-func Handler(delegate http.Handler, pred StacktracePred) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- defer NewLogged(req, &w).StacktraceWhen(pred).Log()
- delegate.ServeHTTP(w, req)
- })
-}
-
-// StacktracePred returns true if a stacktrace should be logged for this status.
-type StacktracePred func(httpStatus int) (logStacktrace bool)
-
-type logger interface {
- Addf(format string, data ...interface{})
-}
-
-// Add a layer on top of ResponseWriter, so we can track latency and error
-// message sources.
-//
-// TODO now that we're using go-restful, we shouldn't need to be wrapping
-// the http.ResponseWriter. We can recover panics from go-restful, and
-// the logging value is questionable.
-type respLogger struct {
- hijacked bool
- statusRecorded bool
- status int
- statusStack string
- addedInfo string
- startTime time.Time
-
- captureErrorOutput bool
-
- req *http.Request
- w http.ResponseWriter
-
- logStacktracePred StacktracePred
-}
-
-// Simple logger that logs immediately when Addf is called
-type passthroughLogger struct{}
-
-// Addf logs info immediately.
-func (passthroughLogger) Addf(format string, data ...interface{}) {
- glog.V(2).Info(fmt.Sprintf(format, data...))
-}
-
-// DefaultStacktracePred is the default implementation of StacktracePred.
-func DefaultStacktracePred(status int) bool {
- return (status < http.StatusOK || status >= http.StatusInternalServerError) && status != http.StatusSwitchingProtocols
-}
-
-// NewLogged turns a normal response writer into a logged response writer.
-//
-// Usage:
-//
-// defer NewLogged(req, &w).StacktraceWhen(StatusIsNot(200, 202)).Log()
-//
-// (Only the call to Log() is deferred, so you can set everything up in one line!)
-//
-// Note that this *changes* your writer, to route response writing actions
-// through the logger.
-//
-// Use LogOf(w).Addf(...) to log something along with the response result.
-func NewLogged(req *http.Request, w *http.ResponseWriter) *respLogger {
- if _, ok := (*w).(*respLogger); ok {
- // Don't double-wrap!
- panic("multiple NewLogged calls!")
- }
- rl := &respLogger{
- startTime: time.Now(),
- req: req,
- w: *w,
- logStacktracePred: DefaultStacktracePred,
- }
- *w = rl // hijack caller's writer!
- return rl
-}
-
-// LogOf returns the logger hiding in w. If there is not an existing logger
-// then a passthroughLogger will be created which will log to stdout immediately
-// when Addf is called.
-func LogOf(req *http.Request, w http.ResponseWriter) logger {
- if _, exists := w.(*respLogger); !exists {
- pl := &passthroughLogger{}
- return pl
- }
- if rl, ok := w.(*respLogger); ok {
- return rl
- }
- panic("Unable to find or create the logger!")
-}
-
-// Unlogged returns the original ResponseWriter, or w if it is not our inserted logger.
-func Unlogged(w http.ResponseWriter) http.ResponseWriter {
- if rl, ok := w.(*respLogger); ok {
- return rl.w
- }
- return w
-}
-
-// StacktraceWhen sets the stacktrace logging predicate, which decides when to log a stacktrace.
-// There's a default, so you don't need to call this unless you don't like the default.
-func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger {
- rl.logStacktracePred = pred
- return rl
-}
-
-// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged
-// for any status *not* in the given list.
-func StatusIsNot(statuses ...int) StacktracePred {
- return func(status int) bool {
- for _, s := range statuses {
- if status == s {
- return false
- }
- }
- return true
- }
-}
-
-// Addf adds additional data to be logged with this request.
-func (rl *respLogger) Addf(format string, data ...interface{}) {
- rl.addedInfo += "\n" + fmt.Sprintf(format, data...)
-}
-
-// Log is intended to be called once at the end of your request handler, via defer
-func (rl *respLogger) Log() {
- latency := time.Since(rl.startTime)
- if glog.V(2) {
- if !rl.hijacked {
- glog.InfoDepth(1, fmt.Sprintf("%s %s: (%v) %v%v%v [%s %s]", rl.req.Method, rl.req.RequestURI, latency, rl.status, rl.statusStack, rl.addedInfo, rl.req.Header["User-Agent"], rl.req.RemoteAddr))
- } else {
- glog.InfoDepth(1, fmt.Sprintf("%s %s: (%v) hijacked [%s %s]", rl.req.Method, rl.req.RequestURI, latency, rl.req.Header["User-Agent"], rl.req.RemoteAddr))
- }
- }
-}
-
-// Header implements http.ResponseWriter.
-func (rl *respLogger) Header() http.Header {
- return rl.w.Header()
-}
-
-// Write implements http.ResponseWriter.
-func (rl *respLogger) Write(b []byte) (int, error) {
- if !rl.statusRecorded {
- rl.recordStatus(http.StatusOK) // Default if WriteHeader hasn't been called
- }
- if rl.captureErrorOutput {
- rl.Addf("logging error output: %q\n", string(b))
- }
- return rl.w.Write(b)
-}
-
-// Flush implements http.Flusher even if the underlying http.Writer doesn't implement it.
-// Flush is used for streaming purposes and allows to flush buffered data to the client.
-func (rl *respLogger) Flush() {
- if flusher, ok := rl.w.(http.Flusher); ok {
- flusher.Flush()
- } else if glog.V(2) {
- glog.InfoDepth(1, fmt.Sprintf("Unable to convert %+v into http.Flusher", rl.w))
- }
-}
-
-// WriteHeader implements http.ResponseWriter.
-func (rl *respLogger) WriteHeader(status int) {
- rl.recordStatus(status)
- rl.w.WriteHeader(status)
-}
-
-// Hijack implements http.Hijacker.
-func (rl *respLogger) Hijack() (net.Conn, *bufio.ReadWriter, error) {
- rl.hijacked = true
- return rl.w.(http.Hijacker).Hijack()
-}
-
-// CloseNotify implements http.CloseNotifier
-func (rl *respLogger) CloseNotify() <-chan bool {
- return rl.w.(http.CloseNotifier).CloseNotify()
-}
-
-func (rl *respLogger) recordStatus(status int) {
- rl.status = status
- rl.statusRecorded = true
- if rl.logStacktracePred(status) {
- // Only log stacks for errors
- stack := make([]byte, 50*1024)
- stack = stack[:runtime.Stack(stack, false)]
- rl.statusStack = "\n" + string(stack)
- rl.captureErrorOutput = true
- } else {
- rl.statusStack = ""
- }
-}
diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go
deleted file mode 100644
index f01638ad6..000000000
--- a/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package wsstream
-
-import (
- "encoding/base64"
- "fmt"
- "io"
- "net/http"
- "regexp"
- "strings"
- "time"
-
- "github.com/golang/glog"
- "golang.org/x/net/websocket"
-
- "k8s.io/apimachinery/pkg/util/runtime"
-)
-
-// The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
-// the channel number (zero indexed) the message was sent on. Messages in both directions should
-// prefix their messages with this channel byte. When used for remote execution, the channel numbers
-// are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR
-// (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they
-// are received by the server.
-//
-// Example client session:
-//
-// CONNECT http://server.com with subprotocol "channel.k8s.io"
-// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
-// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
-// CLOSE
-//
-const ChannelWebSocketProtocol = "channel.k8s.io"
-
-// The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
-// indicating the channel number (zero indexed) the message was sent on. Messages in both directions
-// should prefix their messages with this channel char. When used for remote execution, the channel
-// numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT,
-// and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be
-// be valid) and data written by the server to the client is base64 encoded.
-//
-// Example client session:
-//
-// CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
-// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
-// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
-// CLOSE
-//
-const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
-
-type codecType int
-
-const (
- rawCodec codecType = iota
- base64Codec
-)
-
-type ChannelType int
-
-const (
- IgnoreChannel ChannelType = iota
- ReadChannel
- WriteChannel
- ReadWriteChannel
-)
-
-var (
- // connectionUpgradeRegex matches any Connection header value that includes upgrade
- connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
-)
-
-// IsWebSocketRequest returns true if the incoming request contains connection upgrade headers
-// for WebSockets.
-func IsWebSocketRequest(req *http.Request) bool {
- return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
-}
-
-// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
-// read and write deadlines are pushed every time a new message is received.
-func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
- defer runtime.HandleCrash()
- var data []byte
- for {
- resetTimeout(ws, timeout)
- if err := websocket.Message.Receive(ws, &data); err != nil {
- return
- }
- }
-}
-
-// handshake ensures the provided user protocol matches one of the allowed protocols. It returns
-// no error if no protocol is specified.
-func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
- protocols := config.Protocol
- if len(protocols) == 0 {
- protocols = []string{""}
- }
-
- for _, protocol := range protocols {
- for _, allow := range allowed {
- if allow == protocol {
- config.Protocol = []string{protocol}
- return nil
- }
- }
- }
-
- return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
-}
-
-// ChannelProtocolConfig describes a websocket subprotocol with channels.
-type ChannelProtocolConfig struct {
- Binary bool
- Channels []ChannelType
-}
-
-// NewDefaultChannelProtocols returns a channel protocol map with the
-// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given
-// channels.
-func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig {
- return map[string]ChannelProtocolConfig{
- "": {Binary: true, Channels: channels},
- ChannelWebSocketProtocol: {Binary: true, Channels: channels},
- Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels},
- }
-}
-
-// Conn supports sending multiple binary channels over a websocket connection.
-type Conn struct {
- protocols map[string]ChannelProtocolConfig
- selectedProtocol string
- channels []*websocketChannel
- codec codecType
- ready chan struct{}
- ws *websocket.Conn
- timeout time.Duration
-}
-
-// NewConn creates a WebSocket connection that supports a set of channels. Channels begin each
-// web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
-// future use. The channel types for each channel are passed as an array, supporting the different
-// duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
-//
-// The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol
-// name is used if websocket.Config.Protocol is empty.
-func NewConn(protocols map[string]ChannelProtocolConfig) *Conn {
- return &Conn{
- ready: make(chan struct{}),
- protocols: protocols,
- }
-}
-
-// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
-// there is no timeout on the connection.
-func (conn *Conn) SetIdleTimeout(duration time.Duration) {
- conn.timeout = duration
-}
-
-// Open the connection and create channels for reading and writing. It returns
-// the selected subprotocol, a slice of channels and an error.
-func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
- go func() {
- defer runtime.HandleCrash()
- defer conn.Close()
- websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
- }()
- <-conn.ready
- rwc := make([]io.ReadWriteCloser, len(conn.channels))
- for i := range conn.channels {
- rwc[i] = conn.channels[i]
- }
- return conn.selectedProtocol, rwc, nil
-}
-
-func (conn *Conn) initialize(ws *websocket.Conn) {
- negotiated := ws.Config().Protocol
- conn.selectedProtocol = negotiated[0]
- p := conn.protocols[conn.selectedProtocol]
- if p.Binary {
- conn.codec = rawCodec
- } else {
- conn.codec = base64Codec
- }
- conn.ws = ws
- conn.channels = make([]*websocketChannel, len(p.Channels))
- for i, t := range p.Channels {
- switch t {
- case ReadChannel:
- conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
- case WriteChannel:
- conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
- case ReadWriteChannel:
- conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
- case IgnoreChannel:
- conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
- }
- }
-
- close(conn.ready)
-}
-
-func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
- supportedProtocols := make([]string, 0, len(conn.protocols))
- for p := range conn.protocols {
- supportedProtocols = append(supportedProtocols, p)
- }
- return handshake(config, req, supportedProtocols)
-}
-
-func (conn *Conn) resetTimeout() {
- if conn.timeout > 0 {
- conn.ws.SetDeadline(time.Now().Add(conn.timeout))
- }
-}
-
-// Close is only valid after Open has been called
-func (conn *Conn) Close() error {
- <-conn.ready
- for _, s := range conn.channels {
- s.Close()
- }
- conn.ws.Close()
- return nil
-}
-
-// handle implements a websocket handler.
-func (conn *Conn) handle(ws *websocket.Conn) {
- defer conn.Close()
- conn.initialize(ws)
-
- for {
- conn.resetTimeout()
- var data []byte
- if err := websocket.Message.Receive(ws, &data); err != nil {
- if err != io.EOF {
- glog.Errorf("Error on socket receive: %v", err)
- }
- break
- }
- if len(data) == 0 {
- continue
- }
- channel := data[0]
- if conn.codec == base64Codec {
- channel = channel - '0'
- }
- data = data[1:]
- if int(channel) >= len(conn.channels) {
- glog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
- continue
- }
- if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
- glog.Errorf("Unable to write frame to %d: %v\n%s", channel, err, string(data))
- continue
- }
- }
-}
-
-// write multiplexes the specified channel onto the websocket
-func (conn *Conn) write(num byte, data []byte) (int, error) {
- conn.resetTimeout()
- switch conn.codec {
- case rawCodec:
- frame := make([]byte, len(data)+1)
- frame[0] = num
- copy(frame[1:], data)
- if err := websocket.Message.Send(conn.ws, frame); err != nil {
- return 0, err
- }
- case base64Codec:
- frame := string('0'+num) + base64.StdEncoding.EncodeToString(data)
- if err := websocket.Message.Send(conn.ws, frame); err != nil {
- return 0, err
- }
- }
- return len(data), nil
-}
-
-// websocketChannel represents a channel in a connection
-type websocketChannel struct {
- conn *Conn
- num byte
- r io.Reader
- w io.WriteCloser
-
- read, write bool
-}
-
-// newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe
-// prior to the connection being opened. It may be no, half, or full duplex depending on
-// read and write.
-func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel {
- r, w := io.Pipe()
- return &websocketChannel{conn, num, r, w, read, write}
-}
-
-func (p *websocketChannel) Write(data []byte) (int, error) {
- if !p.write {
- return len(data), nil
- }
- return p.conn.write(p.num, data)
-}
-
-// DataFromSocket is invoked by the connection receiver to move data from the connection
-// into a specific channel.
-func (p *websocketChannel) DataFromSocket(data []byte) (int, error) {
- if !p.read {
- return len(data), nil
- }
-
- switch p.conn.codec {
- case rawCodec:
- return p.w.Write(data)
- case base64Codec:
- dst := make([]byte, len(data))
- n, err := base64.StdEncoding.Decode(dst, data)
- if err != nil {
- return 0, err
- }
- return p.w.Write(dst[:n])
- }
- return 0, nil
-}
-
-func (p *websocketChannel) Read(data []byte) (int, error) {
- if !p.read {
- return 0, io.EOF
- }
- return p.r.Read(data)
-}
-
-func (p *websocketChannel) Close() error {
- return p.w.Close()
-}
diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go
deleted file mode 100644
index 694ce81d2..000000000
--- a/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-// Package wsstream contains utilities for streaming content over WebSockets.
-// The Conn type allows callers to multiplex multiple read/write channels over
-// a single websocket. The Reader type allows an io.Reader to be copied over
-// a websocket channel as binary content.
-package wsstream // import "k8s.io/apiserver/pkg/util/wsstream"
diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go
deleted file mode 100644
index 9dd165bfa..000000000
--- a/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
-Copyright 2015 The Kubernetes Authors.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package wsstream
-
-import (
- "encoding/base64"
- "io"
- "net/http"
- "sync"
- "time"
-
- "golang.org/x/net/websocket"
-
- "k8s.io/apimachinery/pkg/util/runtime"
-)
-
-// The WebSocket subprotocol "binary.k8s.io" will only send messages to the
-// client and ignore messages sent to the server. The received messages are
-// the exact bytes written to the stream. Zero byte messages are possible.
-const binaryWebSocketProtocol = "binary.k8s.io"
-
-// The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the
-// client and ignore messages sent to the server. The received messages are
-// a base64 version of the bytes written to the stream. Zero byte messages are
-// possible.
-const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
-
-// ReaderProtocolConfig describes a websocket subprotocol with one stream.
-type ReaderProtocolConfig struct {
- Binary bool
-}
-
-// NewDefaultReaderProtocols returns a stream protocol map with the
-// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io".
-func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
- return map[string]ReaderProtocolConfig{
- "": {Binary: true},
- binaryWebSocketProtocol: {Binary: true},
- base64BinaryWebSocketProtocol: {Binary: false},
- }
-}
-
-// Reader supports returning an arbitrary byte stream over a websocket channel.
-type Reader struct {
- err chan error
- r io.Reader
- ping bool
- timeout time.Duration
- protocols map[string]ReaderProtocolConfig
- selectedProtocol string
-
- handleCrash func() // overridable for testing
-}
-
-// NewReader creates a WebSocket pipe that will copy the contents of r to a provided
-// WebSocket connection. If ping is true, a zero length message will be sent to the client
-// before the stream begins reading.
-//
-// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
-// subprotocol name is used if websocket.Config.Protocol is empty.
-func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
- return &Reader{
- r: r,
- err: make(chan error),
- ping: ping,
- protocols: protocols,
- handleCrash: func() { runtime.HandleCrash() },
- }
-}
-
-// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
-// there is no timeout on the reader.
-func (r *Reader) SetIdleTimeout(duration time.Duration) {
- r.timeout = duration
-}
-
-func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
- supportedProtocols := make([]string, 0, len(r.protocols))
- for p := range r.protocols {
- supportedProtocols = append(supportedProtocols, p)
- }
- return handshake(config, req, supportedProtocols)
-}
-
-// Copy the reader to the response. The created WebSocket is closed after this
-// method completes.
-func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
- go func() {
- defer r.handleCrash()
- websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
- }()
- return <-r.err
-}
-
-// handle implements a WebSocket handler.
-func (r *Reader) handle(ws *websocket.Conn) {
- // Close the connection when the client requests it, or when we finish streaming, whichever happens first
- closeConnOnce := &sync.Once{}
- closeConn := func() {
- closeConnOnce.Do(func() {
- ws.Close()
- })
- }
-
- negotiated := ws.Config().Protocol
- r.selectedProtocol = negotiated[0]
- defer close(r.err)
- defer closeConn()
-
- go func() {
- defer runtime.HandleCrash()
- // This blocks until the connection is closed.
- // Client should not send anything.
- IgnoreReceives(ws, r.timeout)
- // Once the client closes, we should also close
- closeConn()
- }()
-
- r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout)
-}
-
-func resetTimeout(ws *websocket.Conn, timeout time.Duration) {
- if timeout > 0 {
- ws.SetDeadline(time.Now().Add(timeout))
- }
-}
-
-func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error {
- buf := make([]byte, 2048)
- if ping {
- resetTimeout(ws, timeout)
- if base64Encode {
- if err := websocket.Message.Send(ws, ""); err != nil {
- return err
- }
- } else {
- if err := websocket.Message.Send(ws, []byte{}); err != nil {
- return err
- }
- }
- }
- for {
- resetTimeout(ws, timeout)
- n, err := r.Read(buf)
- if err != nil {
- if err == io.EOF {
- return nil
- }
- return err
- }
- if n > 0 {
- if base64Encode {
- if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil {
- return err
- }
- } else {
- if err := websocket.Message.Send(ws, buf[:n]); err != nil {
- return err
- }
- }
- }
- }
-}