summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apiserver/pkg/util
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apiserver/pkg/util')
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/feature/feature_gate.go211
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go349
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go21
-rw-r--r--vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go177
4 files changed, 758 insertions, 0 deletions
diff --git a/vendor/k8s.io/apiserver/pkg/util/feature/feature_gate.go b/vendor/k8s.io/apiserver/pkg/util/feature/feature_gate.go
new file mode 100644
index 000000000..e7226688c
--- /dev/null
+++ b/vendor/k8s.io/apiserver/pkg/util/feature/feature_gate.go
@@ -0,0 +1,211 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package feature
+
+import (
+ "fmt"
+ "sort"
+ "strconv"
+ "strings"
+
+ "github.com/golang/glog"
+ "github.com/spf13/pflag"
+)
+
+type Feature string
+
+const (
+ flagName = "feature-gates"
+
+ // allAlphaGate is a global toggle for alpha features. Per-feature key
+ // values override the default set by allAlphaGate. Examples:
+ // AllAlpha=false,NewFeature=true will result in newFeature=true
+ // AllAlpha=true,NewFeature=false will result in newFeature=false
+ allAlphaGate Feature = "AllAlpha"
+)
+
+var (
+ // The generic features.
+ defaultFeatures = map[Feature]FeatureSpec{
+ allAlphaGate: {Default: false, PreRelease: Alpha},
+ }
+
+ // Special handling for a few gates.
+ specialFeatures = map[Feature]func(f *featureGate, val bool){
+ allAlphaGate: setUnsetAlphaGates,
+ }
+
+ // DefaultFeatureGate is a shared global FeatureGate.
+ DefaultFeatureGate FeatureGate = NewFeatureGate()
+)
+
+type FeatureSpec struct {
+ Default bool
+ PreRelease prerelease
+}
+
+type prerelease string
+
+const (
+ // Values for PreRelease.
+ Alpha = prerelease("ALPHA")
+ Beta = prerelease("BETA")
+ GA = prerelease("")
+)
+
+// FeatureGate parses and stores flag gates for known features from
+// a string like feature1=true,feature2=false,...
+type FeatureGate interface {
+ AddFlag(fs *pflag.FlagSet)
+ Set(value string) error
+ Enabled(key Feature) bool
+ Add(features map[Feature]FeatureSpec) error
+ KnownFeatures() []string
+}
+
+// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
+type featureGate struct {
+ known map[Feature]FeatureSpec
+ special map[Feature]func(*featureGate, bool)
+ enabled map[Feature]bool
+
+ // is set to true when AddFlag is called. Note: initialization is not go-routine safe, lookup is
+ closed bool
+}
+
+func setUnsetAlphaGates(f *featureGate, val bool) {
+ for k, v := range f.known {
+ if v.PreRelease == Alpha {
+ if _, found := f.enabled[k]; !found {
+ f.enabled[k] = val
+ }
+ }
+ }
+}
+
+// Set, String, and Type implement pflag.Value
+var _ pflag.Value = &featureGate{}
+
+func NewFeatureGate() *featureGate {
+ f := &featureGate{
+ known: map[Feature]FeatureSpec{},
+ special: specialFeatures,
+ enabled: map[Feature]bool{},
+ }
+ for k, v := range defaultFeatures {
+ f.known[k] = v
+ }
+ return f
+}
+
+// Set Parses a string of the form // "key1=value1,key2=value2,..." into a
+// map[string]bool of known keys or returns an error.
+func (f *featureGate) Set(value string) error {
+ for _, s := range strings.Split(value, ",") {
+ if len(s) == 0 {
+ continue
+ }
+ arr := strings.SplitN(s, "=", 2)
+ k := Feature(strings.TrimSpace(arr[0]))
+ _, ok := f.known[Feature(k)]
+ if !ok {
+ return fmt.Errorf("unrecognized key: %s", k)
+ }
+ if len(arr) != 2 {
+ return fmt.Errorf("missing bool value for %s", k)
+ }
+ v := strings.TrimSpace(arr[1])
+ boolValue, err := strconv.ParseBool(v)
+ if err != nil {
+ return fmt.Errorf("invalid value of %s: %s, err: %v", k, v, err)
+ }
+ f.enabled[k] = boolValue
+
+ // Handle "special" features like "all alpha gates"
+ if fn, found := f.special[k]; found {
+ fn(f, boolValue)
+ }
+ }
+
+ glog.Infof("feature gates: %v", f.enabled)
+ return nil
+}
+
+func (f *featureGate) String() string {
+ pairs := []string{}
+ for k, v := range f.enabled {
+ pairs = append(pairs, fmt.Sprintf("%s=%t", k, v))
+ }
+ sort.Strings(pairs)
+ return strings.Join(pairs, ",")
+}
+
+func (f *featureGate) Type() string {
+ return "mapStringBool"
+}
+
+func (f *featureGate) Add(features map[Feature]FeatureSpec) error {
+ if f.closed {
+ return fmt.Errorf("cannot add a feature gate after adding it to the flag set")
+ }
+
+ for name, spec := range features {
+ if existingSpec, found := f.known[name]; found {
+ if existingSpec == spec {
+ continue
+ }
+ return fmt.Errorf("feature gate %q with different spec already exists: %v", name, existingSpec)
+ }
+
+ f.known[name] = spec
+ }
+ return nil
+}
+
+func (f *featureGate) Enabled(key Feature) bool {
+ defaultValue := f.known[key].Default
+ if f.enabled != nil {
+ if v, ok := f.enabled[key]; ok {
+ return v
+ }
+ }
+ return defaultValue
+}
+
+// AddFlag adds a flag for setting global feature gates to the specified FlagSet.
+func (f *featureGate) AddFlag(fs *pflag.FlagSet) {
+ f.closed = true
+
+ known := f.KnownFeatures()
+ fs.Var(f, flagName, ""+
+ "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
+ "Options are:\n"+strings.Join(known, "\n"))
+}
+
+// Returns a string describing the FeatureGate's known features.
+func (f *featureGate) KnownFeatures() []string {
+ var known []string
+ for k, v := range f.known {
+ pre := ""
+ if v.PreRelease != GA {
+ pre = fmt.Sprintf("%s - ", v.PreRelease)
+ }
+ known = append(known, fmt.Sprintf("%s=true|false (%sdefault=%t)", k, pre, v.Default))
+ }
+ sort.Strings(known)
+ return known
+}
diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go
new file mode 100644
index 000000000..f01638ad6
--- /dev/null
+++ b/vendor/k8s.io/apiserver/pkg/util/wsstream/conn.go
@@ -0,0 +1,349 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package wsstream
+
+import (
+ "encoding/base64"
+ "fmt"
+ "io"
+ "net/http"
+ "regexp"
+ "strings"
+ "time"
+
+ "github.com/golang/glog"
+ "golang.org/x/net/websocket"
+
+ "k8s.io/apimachinery/pkg/util/runtime"
+)
+
+// The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
+// the channel number (zero indexed) the message was sent on. Messages in both directions should
+// prefix their messages with this channel byte. When used for remote execution, the channel numbers
+// are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR
+// (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they
+// are received by the server.
+//
+// Example client session:
+//
+// CONNECT http://server.com with subprotocol "channel.k8s.io"
+// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
+// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
+// CLOSE
+//
+const ChannelWebSocketProtocol = "channel.k8s.io"
+
+// The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
+// indicating the channel number (zero indexed) the message was sent on. Messages in both directions
+// should prefix their messages with this channel char. When used for remote execution, the channel
+// numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT,
+// and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be
+// be valid) and data written by the server to the client is base64 encoded.
+//
+// Example client session:
+//
+// CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
+// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
+// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
+// CLOSE
+//
+const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
+
+type codecType int
+
+const (
+ rawCodec codecType = iota
+ base64Codec
+)
+
+type ChannelType int
+
+const (
+ IgnoreChannel ChannelType = iota
+ ReadChannel
+ WriteChannel
+ ReadWriteChannel
+)
+
+var (
+ // connectionUpgradeRegex matches any Connection header value that includes upgrade
+ connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
+)
+
+// IsWebSocketRequest returns true if the incoming request contains connection upgrade headers
+// for WebSockets.
+func IsWebSocketRequest(req *http.Request) bool {
+ return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
+}
+
+// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
+// read and write deadlines are pushed every time a new message is received.
+func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
+ defer runtime.HandleCrash()
+ var data []byte
+ for {
+ resetTimeout(ws, timeout)
+ if err := websocket.Message.Receive(ws, &data); err != nil {
+ return
+ }
+ }
+}
+
+// handshake ensures the provided user protocol matches one of the allowed protocols. It returns
+// no error if no protocol is specified.
+func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
+ protocols := config.Protocol
+ if len(protocols) == 0 {
+ protocols = []string{""}
+ }
+
+ for _, protocol := range protocols {
+ for _, allow := range allowed {
+ if allow == protocol {
+ config.Protocol = []string{protocol}
+ return nil
+ }
+ }
+ }
+
+ return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
+}
+
+// ChannelProtocolConfig describes a websocket subprotocol with channels.
+type ChannelProtocolConfig struct {
+ Binary bool
+ Channels []ChannelType
+}
+
+// NewDefaultChannelProtocols returns a channel protocol map with the
+// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given
+// channels.
+func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig {
+ return map[string]ChannelProtocolConfig{
+ "": {Binary: true, Channels: channels},
+ ChannelWebSocketProtocol: {Binary: true, Channels: channels},
+ Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels},
+ }
+}
+
+// Conn supports sending multiple binary channels over a websocket connection.
+type Conn struct {
+ protocols map[string]ChannelProtocolConfig
+ selectedProtocol string
+ channels []*websocketChannel
+ codec codecType
+ ready chan struct{}
+ ws *websocket.Conn
+ timeout time.Duration
+}
+
+// NewConn creates a WebSocket connection that supports a set of channels. Channels begin each
+// web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
+// future use. The channel types for each channel are passed as an array, supporting the different
+// duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
+//
+// The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol
+// name is used if websocket.Config.Protocol is empty.
+func NewConn(protocols map[string]ChannelProtocolConfig) *Conn {
+ return &Conn{
+ ready: make(chan struct{}),
+ protocols: protocols,
+ }
+}
+
+// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
+// there is no timeout on the connection.
+func (conn *Conn) SetIdleTimeout(duration time.Duration) {
+ conn.timeout = duration
+}
+
+// Open the connection and create channels for reading and writing. It returns
+// the selected subprotocol, a slice of channels and an error.
+func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
+ go func() {
+ defer runtime.HandleCrash()
+ defer conn.Close()
+ websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
+ }()
+ <-conn.ready
+ rwc := make([]io.ReadWriteCloser, len(conn.channels))
+ for i := range conn.channels {
+ rwc[i] = conn.channels[i]
+ }
+ return conn.selectedProtocol, rwc, nil
+}
+
+func (conn *Conn) initialize(ws *websocket.Conn) {
+ negotiated := ws.Config().Protocol
+ conn.selectedProtocol = negotiated[0]
+ p := conn.protocols[conn.selectedProtocol]
+ if p.Binary {
+ conn.codec = rawCodec
+ } else {
+ conn.codec = base64Codec
+ }
+ conn.ws = ws
+ conn.channels = make([]*websocketChannel, len(p.Channels))
+ for i, t := range p.Channels {
+ switch t {
+ case ReadChannel:
+ conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
+ case WriteChannel:
+ conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
+ case ReadWriteChannel:
+ conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
+ case IgnoreChannel:
+ conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
+ }
+ }
+
+ close(conn.ready)
+}
+
+func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
+ supportedProtocols := make([]string, 0, len(conn.protocols))
+ for p := range conn.protocols {
+ supportedProtocols = append(supportedProtocols, p)
+ }
+ return handshake(config, req, supportedProtocols)
+}
+
+func (conn *Conn) resetTimeout() {
+ if conn.timeout > 0 {
+ conn.ws.SetDeadline(time.Now().Add(conn.timeout))
+ }
+}
+
+// Close is only valid after Open has been called
+func (conn *Conn) Close() error {
+ <-conn.ready
+ for _, s := range conn.channels {
+ s.Close()
+ }
+ conn.ws.Close()
+ return nil
+}
+
+// handle implements a websocket handler.
+func (conn *Conn) handle(ws *websocket.Conn) {
+ defer conn.Close()
+ conn.initialize(ws)
+
+ for {
+ conn.resetTimeout()
+ var data []byte
+ if err := websocket.Message.Receive(ws, &data); err != nil {
+ if err != io.EOF {
+ glog.Errorf("Error on socket receive: %v", err)
+ }
+ break
+ }
+ if len(data) == 0 {
+ continue
+ }
+ channel := data[0]
+ if conn.codec == base64Codec {
+ channel = channel - '0'
+ }
+ data = data[1:]
+ if int(channel) >= len(conn.channels) {
+ glog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
+ continue
+ }
+ if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
+ glog.Errorf("Unable to write frame to %d: %v\n%s", channel, err, string(data))
+ continue
+ }
+ }
+}
+
+// write multiplexes the specified channel onto the websocket
+func (conn *Conn) write(num byte, data []byte) (int, error) {
+ conn.resetTimeout()
+ switch conn.codec {
+ case rawCodec:
+ frame := make([]byte, len(data)+1)
+ frame[0] = num
+ copy(frame[1:], data)
+ if err := websocket.Message.Send(conn.ws, frame); err != nil {
+ return 0, err
+ }
+ case base64Codec:
+ frame := string('0'+num) + base64.StdEncoding.EncodeToString(data)
+ if err := websocket.Message.Send(conn.ws, frame); err != nil {
+ return 0, err
+ }
+ }
+ return len(data), nil
+}
+
+// websocketChannel represents a channel in a connection
+type websocketChannel struct {
+ conn *Conn
+ num byte
+ r io.Reader
+ w io.WriteCloser
+
+ read, write bool
+}
+
+// newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe
+// prior to the connection being opened. It may be no, half, or full duplex depending on
+// read and write.
+func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel {
+ r, w := io.Pipe()
+ return &websocketChannel{conn, num, r, w, read, write}
+}
+
+func (p *websocketChannel) Write(data []byte) (int, error) {
+ if !p.write {
+ return len(data), nil
+ }
+ return p.conn.write(p.num, data)
+}
+
+// DataFromSocket is invoked by the connection receiver to move data from the connection
+// into a specific channel.
+func (p *websocketChannel) DataFromSocket(data []byte) (int, error) {
+ if !p.read {
+ return len(data), nil
+ }
+
+ switch p.conn.codec {
+ case rawCodec:
+ return p.w.Write(data)
+ case base64Codec:
+ dst := make([]byte, len(data))
+ n, err := base64.StdEncoding.Decode(dst, data)
+ if err != nil {
+ return 0, err
+ }
+ return p.w.Write(dst[:n])
+ }
+ return 0, nil
+}
+
+func (p *websocketChannel) Read(data []byte) (int, error) {
+ if !p.read {
+ return 0, io.EOF
+ }
+ return p.r.Read(data)
+}
+
+func (p *websocketChannel) Close() error {
+ return p.w.Close()
+}
diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go
new file mode 100644
index 000000000..694ce81d2
--- /dev/null
+++ b/vendor/k8s.io/apiserver/pkg/util/wsstream/doc.go
@@ -0,0 +1,21 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package wsstream contains utilities for streaming content over WebSockets.
+// The Conn type allows callers to multiplex multiple read/write channels over
+// a single websocket. The Reader type allows an io.Reader to be copied over
+// a websocket channel as binary content.
+package wsstream // import "k8s.io/apiserver/pkg/util/wsstream"
diff --git a/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go b/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go
new file mode 100644
index 000000000..9dd165bfa
--- /dev/null
+++ b/vendor/k8s.io/apiserver/pkg/util/wsstream/stream.go
@@ -0,0 +1,177 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package wsstream
+
+import (
+ "encoding/base64"
+ "io"
+ "net/http"
+ "sync"
+ "time"
+
+ "golang.org/x/net/websocket"
+
+ "k8s.io/apimachinery/pkg/util/runtime"
+)
+
+// The WebSocket subprotocol "binary.k8s.io" will only send messages to the
+// client and ignore messages sent to the server. The received messages are
+// the exact bytes written to the stream. Zero byte messages are possible.
+const binaryWebSocketProtocol = "binary.k8s.io"
+
+// The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the
+// client and ignore messages sent to the server. The received messages are
+// a base64 version of the bytes written to the stream. Zero byte messages are
+// possible.
+const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
+
+// ReaderProtocolConfig describes a websocket subprotocol with one stream.
+type ReaderProtocolConfig struct {
+ Binary bool
+}
+
+// NewDefaultReaderProtocols returns a stream protocol map with the
+// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io".
+func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
+ return map[string]ReaderProtocolConfig{
+ "": {Binary: true},
+ binaryWebSocketProtocol: {Binary: true},
+ base64BinaryWebSocketProtocol: {Binary: false},
+ }
+}
+
+// Reader supports returning an arbitrary byte stream over a websocket channel.
+type Reader struct {
+ err chan error
+ r io.Reader
+ ping bool
+ timeout time.Duration
+ protocols map[string]ReaderProtocolConfig
+ selectedProtocol string
+
+ handleCrash func() // overridable for testing
+}
+
+// NewReader creates a WebSocket pipe that will copy the contents of r to a provided
+// WebSocket connection. If ping is true, a zero length message will be sent to the client
+// before the stream begins reading.
+//
+// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
+// subprotocol name is used if websocket.Config.Protocol is empty.
+func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
+ return &Reader{
+ r: r,
+ err: make(chan error),
+ ping: ping,
+ protocols: protocols,
+ handleCrash: func() { runtime.HandleCrash() },
+ }
+}
+
+// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
+// there is no timeout on the reader.
+func (r *Reader) SetIdleTimeout(duration time.Duration) {
+ r.timeout = duration
+}
+
+func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
+ supportedProtocols := make([]string, 0, len(r.protocols))
+ for p := range r.protocols {
+ supportedProtocols = append(supportedProtocols, p)
+ }
+ return handshake(config, req, supportedProtocols)
+}
+
+// Copy the reader to the response. The created WebSocket is closed after this
+// method completes.
+func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
+ go func() {
+ defer r.handleCrash()
+ websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
+ }()
+ return <-r.err
+}
+
+// handle implements a WebSocket handler.
+func (r *Reader) handle(ws *websocket.Conn) {
+ // Close the connection when the client requests it, or when we finish streaming, whichever happens first
+ closeConnOnce := &sync.Once{}
+ closeConn := func() {
+ closeConnOnce.Do(func() {
+ ws.Close()
+ })
+ }
+
+ negotiated := ws.Config().Protocol
+ r.selectedProtocol = negotiated[0]
+ defer close(r.err)
+ defer closeConn()
+
+ go func() {
+ defer runtime.HandleCrash()
+ // This blocks until the connection is closed.
+ // Client should not send anything.
+ IgnoreReceives(ws, r.timeout)
+ // Once the client closes, we should also close
+ closeConn()
+ }()
+
+ r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout)
+}
+
+func resetTimeout(ws *websocket.Conn, timeout time.Duration) {
+ if timeout > 0 {
+ ws.SetDeadline(time.Now().Add(timeout))
+ }
+}
+
+func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error {
+ buf := make([]byte, 2048)
+ if ping {
+ resetTimeout(ws, timeout)
+ if base64Encode {
+ if err := websocket.Message.Send(ws, ""); err != nil {
+ return err
+ }
+ } else {
+ if err := websocket.Message.Send(ws, []byte{}); err != nil {
+ return err
+ }
+ }
+ }
+ for {
+ resetTimeout(ws, timeout)
+ n, err := r.Read(buf)
+ if err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ if n > 0 {
+ if base64Encode {
+ if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil {
+ return err
+ }
+ } else {
+ if err := websocket.Message.Send(ws, buf[:n]); err != nil {
+ return err
+ }
+ }
+ }
+ }
+}