diff options
Diffstat (limited to 'vendor/k8s.io/apiserver')
-rw-r--r-- | vendor/k8s.io/apiserver/pkg/server/httplog/doc.go | 19 | ||||
-rw-r--r-- | vendor/k8s.io/apiserver/pkg/server/httplog/log.go | 225 | ||||
-rw-r--r-- | vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go | 349 | ||||
-rw-r--r-- | vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go | 21 | ||||
-rw-r--r-- | vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go | 177 |
5 files changed, 0 insertions, 791 deletions
diff --git a/vendor/k8s.io/apiserver/pkg/server/httplog/doc.go b/vendor/k8s.io/apiserver/pkg/server/httplog/doc.go deleted file mode 100644 index caa6572c7..000000000 --- a/vendor/k8s.io/apiserver/pkg/server/httplog/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package httplog contains a helper object and functions to maintain a log -// along with an http response. -package httplog // import "k8s.io/apiserver/pkg/server/httplog" diff --git a/vendor/k8s.io/apiserver/pkg/server/httplog/log.go b/vendor/k8s.io/apiserver/pkg/server/httplog/log.go deleted file mode 100644 index 4a4894cee..000000000 --- a/vendor/k8s.io/apiserver/pkg/server/httplog/log.go +++ /dev/null @@ -1,225 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package httplog - -import ( - "bufio" - "fmt" - "net" - "net/http" - "runtime" - "time" - - "github.com/golang/glog" -) - -// Handler wraps all HTTP calls to delegate with nice logging. -// delegate may use LogOf(w).Addf(...) to write additional info to -// the per-request log message. -// -// Intended to wrap calls to your ServeMux. -func Handler(delegate http.Handler, pred StacktracePred) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - defer NewLogged(req, &w).StacktraceWhen(pred).Log() - delegate.ServeHTTP(w, req) - }) -} - -// StacktracePred returns true if a stacktrace should be logged for this status. -type StacktracePred func(httpStatus int) (logStacktrace bool) - -type logger interface { - Addf(format string, data ...interface{}) -} - -// Add a layer on top of ResponseWriter, so we can track latency and error -// message sources. -// -// TODO now that we're using go-restful, we shouldn't need to be wrapping -// the http.ResponseWriter. We can recover panics from go-restful, and -// the logging value is questionable. -type respLogger struct { - hijacked bool - statusRecorded bool - status int - statusStack string - addedInfo string - startTime time.Time - - captureErrorOutput bool - - req *http.Request - w http.ResponseWriter - - logStacktracePred StacktracePred -} - -// Simple logger that logs immediately when Addf is called -type passthroughLogger struct{} - -// Addf logs info immediately. -func (passthroughLogger) Addf(format string, data ...interface{}) { - glog.V(2).Info(fmt.Sprintf(format, data...)) -} - -// DefaultStacktracePred is the default implementation of StacktracePred. -func DefaultStacktracePred(status int) bool { - return (status < http.StatusOK || status >= http.StatusInternalServerError) && status != http.StatusSwitchingProtocols -} - -// NewLogged turns a normal response writer into a logged response writer. -// -// Usage: -// -// defer NewLogged(req, &w).StacktraceWhen(StatusIsNot(200, 202)).Log() -// -// (Only the call to Log() is deferred, so you can set everything up in one line!) -// -// Note that this *changes* your writer, to route response writing actions -// through the logger. -// -// Use LogOf(w).Addf(...) to log something along with the response result. -func NewLogged(req *http.Request, w *http.ResponseWriter) *respLogger { - if _, ok := (*w).(*respLogger); ok { - // Don't double-wrap! - panic("multiple NewLogged calls!") - } - rl := &respLogger{ - startTime: time.Now(), - req: req, - w: *w, - logStacktracePred: DefaultStacktracePred, - } - *w = rl // hijack caller's writer! - return rl -} - -// LogOf returns the logger hiding in w. If there is not an existing logger -// then a passthroughLogger will be created which will log to stdout immediately -// when Addf is called. -func LogOf(req *http.Request, w http.ResponseWriter) logger { - if _, exists := w.(*respLogger); !exists { - pl := &passthroughLogger{} - return pl - } - if rl, ok := w.(*respLogger); ok { - return rl - } - panic("Unable to find or create the logger!") -} - -// Unlogged returns the original ResponseWriter, or w if it is not our inserted logger. -func Unlogged(w http.ResponseWriter) http.ResponseWriter { - if rl, ok := w.(*respLogger); ok { - return rl.w - } - return w -} - -// StacktraceWhen sets the stacktrace logging predicate, which decides when to log a stacktrace. -// There's a default, so you don't need to call this unless you don't like the default. -func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger { - rl.logStacktracePred = pred - return rl -} - -// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged -// for any status *not* in the given list. -func StatusIsNot(statuses ...int) StacktracePred { - return func(status int) bool { - for _, s := range statuses { - if status == s { - return false - } - } - return true - } -} - -// Addf adds additional data to be logged with this request. -func (rl *respLogger) Addf(format string, data ...interface{}) { - rl.addedInfo += "\n" + fmt.Sprintf(format, data...) -} - -// Log is intended to be called once at the end of your request handler, via defer -func (rl *respLogger) Log() { - latency := time.Since(rl.startTime) - if glog.V(2) { - if !rl.hijacked { - glog.InfoDepth(1, fmt.Sprintf("%s %s: (%v) %v%v%v [%s %s]", rl.req.Method, rl.req.RequestURI, latency, rl.status, rl.statusStack, rl.addedInfo, rl.req.Header["User-Agent"], rl.req.RemoteAddr)) - } else { - glog.InfoDepth(1, fmt.Sprintf("%s %s: (%v) hijacked [%s %s]", rl.req.Method, rl.req.RequestURI, latency, rl.req.Header["User-Agent"], rl.req.RemoteAddr)) - } - } -} - -// Header implements http.ResponseWriter. -func (rl *respLogger) Header() http.Header { - return rl.w.Header() -} - -// Write implements http.ResponseWriter. -func (rl *respLogger) Write(b []byte) (int, error) { - if !rl.statusRecorded { - rl.recordStatus(http.StatusOK) // Default if WriteHeader hasn't been called - } - if rl.captureErrorOutput { - rl.Addf("logging error output: %q\n", string(b)) - } - return rl.w.Write(b) -} - -// Flush implements http.Flusher even if the underlying http.Writer doesn't implement it. -// Flush is used for streaming purposes and allows to flush buffered data to the client. -func (rl *respLogger) Flush() { - if flusher, ok := rl.w.(http.Flusher); ok { - flusher.Flush() - } else if glog.V(2) { - glog.InfoDepth(1, fmt.Sprintf("Unable to convert %+v into http.Flusher", rl.w)) - } -} - -// WriteHeader implements http.ResponseWriter. -func (rl *respLogger) WriteHeader(status int) { - rl.recordStatus(status) - rl.w.WriteHeader(status) -} - -// Hijack implements http.Hijacker. -func (rl *respLogger) Hijack() (net.Conn, *bufio.ReadWriter, error) { - rl.hijacked = true - return rl.w.(http.Hijacker).Hijack() -} - -// CloseNotify implements http.CloseNotifier -func (rl *respLogger) CloseNotify() <-chan bool { - return rl.w.(http.CloseNotifier).CloseNotify() -} - -func (rl *respLogger) recordStatus(status int) { - rl.status = status - rl.statusRecorded = true - if rl.logStacktracePred(status) { - // Only log stacks for errors - stack := make([]byte, 50*1024) - stack = stack[:runtime.Stack(stack, false)] - rl.statusStack = "\n" + string(stack) - rl.captureErrorOutput = true - } else { - rl.statusStack = "" - } -} diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go deleted file mode 100644 index f01638ad6..000000000 --- a/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go +++ /dev/null @@ -1,349 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package wsstream - -import ( - "encoding/base64" - "fmt" - "io" - "net/http" - "regexp" - "strings" - "time" - - "github.com/golang/glog" - "golang.org/x/net/websocket" - - "k8s.io/apimachinery/pkg/util/runtime" -) - -// The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating -// the channel number (zero indexed) the message was sent on. Messages in both directions should -// prefix their messages with this channel byte. When used for remote execution, the channel numbers -// are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR -// (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they -// are received by the server. -// -// Example client session: -// -// CONNECT http://server.com with subprotocol "channel.k8s.io" -// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN) -// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT) -// CLOSE -// -const ChannelWebSocketProtocol = "channel.k8s.io" - -// The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character -// indicating the channel number (zero indexed) the message was sent on. Messages in both directions -// should prefix their messages with this channel char. When used for remote execution, the channel -// numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, -// and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be -// be valid) and data written by the server to the client is base64 encoded. -// -// Example client session: -// -// CONNECT http://server.com with subprotocol "base64.channel.k8s.io" -// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN) -// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT) -// CLOSE -// -const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io" - -type codecType int - -const ( - rawCodec codecType = iota - base64Codec -) - -type ChannelType int - -const ( - IgnoreChannel ChannelType = iota - ReadChannel - WriteChannel - ReadWriteChannel -) - -var ( - // connectionUpgradeRegex matches any Connection header value that includes upgrade - connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)") -) - -// IsWebSocketRequest returns true if the incoming request contains connection upgrade headers -// for WebSockets. -func IsWebSocketRequest(req *http.Request) bool { - return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket" -} - -// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the -// read and write deadlines are pushed every time a new message is received. -func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) { - defer runtime.HandleCrash() - var data []byte - for { - resetTimeout(ws, timeout) - if err := websocket.Message.Receive(ws, &data); err != nil { - return - } - } -} - -// handshake ensures the provided user protocol matches one of the allowed protocols. It returns -// no error if no protocol is specified. -func handshake(config *websocket.Config, req *http.Request, allowed []string) error { - protocols := config.Protocol - if len(protocols) == 0 { - protocols = []string{""} - } - - for _, protocol := range protocols { - for _, allow := range allowed { - if allow == protocol { - config.Protocol = []string{protocol} - return nil - } - } - } - - return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed) -} - -// ChannelProtocolConfig describes a websocket subprotocol with channels. -type ChannelProtocolConfig struct { - Binary bool - Channels []ChannelType -} - -// NewDefaultChannelProtocols returns a channel protocol map with the -// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given -// channels. -func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig { - return map[string]ChannelProtocolConfig{ - "": {Binary: true, Channels: channels}, - ChannelWebSocketProtocol: {Binary: true, Channels: channels}, - Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels}, - } -} - -// Conn supports sending multiple binary channels over a websocket connection. -type Conn struct { - protocols map[string]ChannelProtocolConfig - selectedProtocol string - channels []*websocketChannel - codec codecType - ready chan struct{} - ws *websocket.Conn - timeout time.Duration -} - -// NewConn creates a WebSocket connection that supports a set of channels. Channels begin each -// web socket message with a single byte indicating the channel number (0-N). 255 is reserved for -// future use. The channel types for each channel are passed as an array, supporting the different -// duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer. -// -// The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol -// name is used if websocket.Config.Protocol is empty. -func NewConn(protocols map[string]ChannelProtocolConfig) *Conn { - return &Conn{ - ready: make(chan struct{}), - protocols: protocols, - } -} - -// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified, -// there is no timeout on the connection. -func (conn *Conn) SetIdleTimeout(duration time.Duration) { - conn.timeout = duration -} - -// Open the connection and create channels for reading and writing. It returns -// the selected subprotocol, a slice of channels and an error. -func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) { - go func() { - defer runtime.HandleCrash() - defer conn.Close() - websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req) - }() - <-conn.ready - rwc := make([]io.ReadWriteCloser, len(conn.channels)) - for i := range conn.channels { - rwc[i] = conn.channels[i] - } - return conn.selectedProtocol, rwc, nil -} - -func (conn *Conn) initialize(ws *websocket.Conn) { - negotiated := ws.Config().Protocol - conn.selectedProtocol = negotiated[0] - p := conn.protocols[conn.selectedProtocol] - if p.Binary { - conn.codec = rawCodec - } else { - conn.codec = base64Codec - } - conn.ws = ws - conn.channels = make([]*websocketChannel, len(p.Channels)) - for i, t := range p.Channels { - switch t { - case ReadChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false) - case WriteChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true) - case ReadWriteChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true) - case IgnoreChannel: - conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false) - } - } - - close(conn.ready) -} - -func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error { - supportedProtocols := make([]string, 0, len(conn.protocols)) - for p := range conn.protocols { - supportedProtocols = append(supportedProtocols, p) - } - return handshake(config, req, supportedProtocols) -} - -func (conn *Conn) resetTimeout() { - if conn.timeout > 0 { - conn.ws.SetDeadline(time.Now().Add(conn.timeout)) - } -} - -// Close is only valid after Open has been called -func (conn *Conn) Close() error { - <-conn.ready - for _, s := range conn.channels { - s.Close() - } - conn.ws.Close() - return nil -} - -// handle implements a websocket handler. -func (conn *Conn) handle(ws *websocket.Conn) { - defer conn.Close() - conn.initialize(ws) - - for { - conn.resetTimeout() - var data []byte - if err := websocket.Message.Receive(ws, &data); err != nil { - if err != io.EOF { - glog.Errorf("Error on socket receive: %v", err) - } - break - } - if len(data) == 0 { - continue - } - channel := data[0] - if conn.codec == base64Codec { - channel = channel - '0' - } - data = data[1:] - if int(channel) >= len(conn.channels) { - glog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel) - continue - } - if _, err := conn.channels[channel].DataFromSocket(data); err != nil { - glog.Errorf("Unable to write frame to %d: %v\n%s", channel, err, string(data)) - continue - } - } -} - -// write multiplexes the specified channel onto the websocket -func (conn *Conn) write(num byte, data []byte) (int, error) { - conn.resetTimeout() - switch conn.codec { - case rawCodec: - frame := make([]byte, len(data)+1) - frame[0] = num - copy(frame[1:], data) - if err := websocket.Message.Send(conn.ws, frame); err != nil { - return 0, err - } - case base64Codec: - frame := string('0'+num) + base64.StdEncoding.EncodeToString(data) - if err := websocket.Message.Send(conn.ws, frame); err != nil { - return 0, err - } - } - return len(data), nil -} - -// websocketChannel represents a channel in a connection -type websocketChannel struct { - conn *Conn - num byte - r io.Reader - w io.WriteCloser - - read, write bool -} - -// newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe -// prior to the connection being opened. It may be no, half, or full duplex depending on -// read and write. -func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel { - r, w := io.Pipe() - return &websocketChannel{conn, num, r, w, read, write} -} - -func (p *websocketChannel) Write(data []byte) (int, error) { - if !p.write { - return len(data), nil - } - return p.conn.write(p.num, data) -} - -// DataFromSocket is invoked by the connection receiver to move data from the connection -// into a specific channel. -func (p *websocketChannel) DataFromSocket(data []byte) (int, error) { - if !p.read { - return len(data), nil - } - - switch p.conn.codec { - case rawCodec: - return p.w.Write(data) - case base64Codec: - dst := make([]byte, len(data)) - n, err := base64.StdEncoding.Decode(dst, data) - if err != nil { - return 0, err - } - return p.w.Write(dst[:n]) - } - return 0, nil -} - -func (p *websocketChannel) Read(data []byte) (int, error) { - if !p.read { - return 0, io.EOF - } - return p.r.Read(data) -} - -func (p *websocketChannel) Close() error { - return p.w.Close() -} diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go deleted file mode 100644 index 694ce81d2..000000000 --- a/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go +++ /dev/null @@ -1,21 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package wsstream contains utilities for streaming content over WebSockets. -// The Conn type allows callers to multiplex multiple read/write channels over -// a single websocket. The Reader type allows an io.Reader to be copied over -// a websocket channel as binary content. -package wsstream // import "k8s.io/apiserver/pkg/util/wsstream" diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go deleted file mode 100644 index 9dd165bfa..000000000 --- a/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go +++ /dev/null @@ -1,177 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package wsstream - -import ( - "encoding/base64" - "io" - "net/http" - "sync" - "time" - - "golang.org/x/net/websocket" - - "k8s.io/apimachinery/pkg/util/runtime" -) - -// The WebSocket subprotocol "binary.k8s.io" will only send messages to the -// client and ignore messages sent to the server. The received messages are -// the exact bytes written to the stream. Zero byte messages are possible. -const binaryWebSocketProtocol = "binary.k8s.io" - -// The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the -// client and ignore messages sent to the server. The received messages are -// a base64 version of the bytes written to the stream. Zero byte messages are -// possible. -const base64BinaryWebSocketProtocol = "base64.binary.k8s.io" - -// ReaderProtocolConfig describes a websocket subprotocol with one stream. -type ReaderProtocolConfig struct { - Binary bool -} - -// NewDefaultReaderProtocols returns a stream protocol map with the -// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io". -func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig { - return map[string]ReaderProtocolConfig{ - "": {Binary: true}, - binaryWebSocketProtocol: {Binary: true}, - base64BinaryWebSocketProtocol: {Binary: false}, - } -} - -// Reader supports returning an arbitrary byte stream over a websocket channel. -type Reader struct { - err chan error - r io.Reader - ping bool - timeout time.Duration - protocols map[string]ReaderProtocolConfig - selectedProtocol string - - handleCrash func() // overridable for testing -} - -// NewReader creates a WebSocket pipe that will copy the contents of r to a provided -// WebSocket connection. If ping is true, a zero length message will be sent to the client -// before the stream begins reading. -// -// The protocols parameter maps subprotocol names to StreamProtocols. The empty string -// subprotocol name is used if websocket.Config.Protocol is empty. -func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader { - return &Reader{ - r: r, - err: make(chan error), - ping: ping, - protocols: protocols, - handleCrash: func() { runtime.HandleCrash() }, - } -} - -// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified, -// there is no timeout on the reader. -func (r *Reader) SetIdleTimeout(duration time.Duration) { - r.timeout = duration -} - -func (r *Reader) handshake(config *websocket.Config, req *http.Request) error { - supportedProtocols := make([]string, 0, len(r.protocols)) - for p := range r.protocols { - supportedProtocols = append(supportedProtocols, p) - } - return handshake(config, req, supportedProtocols) -} - -// Copy the reader to the response. The created WebSocket is closed after this -// method completes. -func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error { - go func() { - defer r.handleCrash() - websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req) - }() - return <-r.err -} - -// handle implements a WebSocket handler. -func (r *Reader) handle(ws *websocket.Conn) { - // Close the connection when the client requests it, or when we finish streaming, whichever happens first - closeConnOnce := &sync.Once{} - closeConn := func() { - closeConnOnce.Do(func() { - ws.Close() - }) - } - - negotiated := ws.Config().Protocol - r.selectedProtocol = negotiated[0] - defer close(r.err) - defer closeConn() - - go func() { - defer runtime.HandleCrash() - // This blocks until the connection is closed. - // Client should not send anything. - IgnoreReceives(ws, r.timeout) - // Once the client closes, we should also close - closeConn() - }() - - r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout) -} - -func resetTimeout(ws *websocket.Conn, timeout time.Duration) { - if timeout > 0 { - ws.SetDeadline(time.Now().Add(timeout)) - } -} - -func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error { - buf := make([]byte, 2048) - if ping { - resetTimeout(ws, timeout) - if base64Encode { - if err := websocket.Message.Send(ws, ""); err != nil { - return err - } - } else { - if err := websocket.Message.Send(ws, []byte{}); err != nil { - return err - } - } - } - for { - resetTimeout(ws, timeout) - n, err := r.Read(buf) - if err != nil { - if err == io.EOF { - return nil - } - return err - } - if n > 0 { - if base64Encode { - if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil { - return err - } - } else { - if err := websocket.Message.Send(ws, buf[:n]); err != nil { - return err - } - } - } - } -} |