summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go149
1 files changed, 149 insertions, 0 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go
new file mode 100644
index 000000000..7c9b791d4
--- /dev/null
+++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go
@@ -0,0 +1,149 @@
+/*
+Copyright 2015 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package httpstream
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+)
+
+const (
+ HeaderConnection = "Connection"
+ HeaderUpgrade = "Upgrade"
+ HeaderProtocolVersion = "X-Stream-Protocol-Version"
+ HeaderAcceptedProtocolVersions = "X-Accepted-Stream-Protocol-Versions"
+)
+
+// NewStreamHandler defines a function that is called when a new Stream is
+// received. If no error is returned, the Stream is accepted; otherwise,
+// the stream is rejected. After the reply frame has been sent, replySent is closed.
+type NewStreamHandler func(stream Stream, replySent <-chan struct{}) error
+
+// NoOpNewStreamHandler is a stream handler that accepts a new stream and
+// performs no other logic.
+func NoOpNewStreamHandler(stream Stream, replySent <-chan struct{}) error { return nil }
+
+// Dialer knows how to open a streaming connection to a server.
+type Dialer interface {
+
+ // Dial opens a streaming connection to a server using one of the protocols
+ // specified (in order of most preferred to least preferred).
+ Dial(protocols ...string) (Connection, string, error)
+}
+
+// UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade
+// HTTP requests to support multiplexed bidirectional streams. After RoundTrip()
+// is invoked, if the upgrade is successful, clients may retrieve the upgraded
+// connection by calling UpgradeRoundTripper.Connection().
+type UpgradeRoundTripper interface {
+ http.RoundTripper
+ // NewConnection validates the response and creates a new Connection.
+ NewConnection(resp *http.Response) (Connection, error)
+}
+
+// ResponseUpgrader knows how to upgrade HTTP requests and responses to
+// add streaming support to them.
+type ResponseUpgrader interface {
+ // UpgradeResponse upgrades an HTTP response to one that supports multiplexed
+ // streams. newStreamHandler will be called asynchronously whenever the
+ // other end of the upgraded connection creates a new stream.
+ UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler NewStreamHandler) Connection
+}
+
+// Connection represents an upgraded HTTP connection.
+type Connection interface {
+ // CreateStream creates a new Stream with the supplied headers.
+ CreateStream(headers http.Header) (Stream, error)
+ // Close resets all streams and closes the connection.
+ Close() error
+ // CloseChan returns a channel that is closed when the underlying connection is closed.
+ CloseChan() <-chan bool
+ // SetIdleTimeout sets the amount of time the connection may remain idle before
+ // it is automatically closed.
+ SetIdleTimeout(timeout time.Duration)
+}
+
+// Stream represents a bidirectional communications channel that is part of an
+// upgraded connection.
+type Stream interface {
+ io.ReadWriteCloser
+ // Reset closes both directions of the stream, indicating that neither client
+ // or server can use it any more.
+ Reset() error
+ // Headers returns the headers used to create the stream.
+ Headers() http.Header
+ // Identifier returns the stream's ID.
+ Identifier() uint32
+}
+
+// IsUpgradeRequest returns true if the given request is a connection upgrade request
+func IsUpgradeRequest(req *http.Request) bool {
+ for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
+ if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) {
+ return true
+ }
+ }
+ return false
+}
+
+func negotiateProtocol(clientProtocols, serverProtocols []string) string {
+ for i := range clientProtocols {
+ for j := range serverProtocols {
+ if clientProtocols[i] == serverProtocols[j] {
+ return clientProtocols[i]
+ }
+ }
+ }
+ return ""
+}
+
+// Handshake performs a subprotocol negotiation. If the client did request a
+// subprotocol, Handshake will select the first common value found in
+// serverProtocols. If a match is found, Handshake adds a response header
+// indicating the chosen subprotocol. If no match is found, HTTP forbidden is
+// returned, along with a response header containing the list of protocols the
+// server can accept.
+func Handshake(req *http.Request, w http.ResponseWriter, serverProtocols []string) (string, error) {
+ clientProtocols := req.Header[http.CanonicalHeaderKey(HeaderProtocolVersion)]
+ if len(clientProtocols) == 0 {
+ // Kube 1.0 clients didn't support subprotocol negotiation.
+ // TODO require clientProtocols once Kube 1.0 is no longer supported
+ return "", nil
+ }
+
+ if len(serverProtocols) == 0 {
+ // Kube 1.0 servers didn't support subprotocol negotiation. This is mainly for testing.
+ // TODO require serverProtocols once Kube 1.0 is no longer supported
+ return "", nil
+ }
+
+ negotiatedProtocol := negotiateProtocol(clientProtocols, serverProtocols)
+ if len(negotiatedProtocol) == 0 {
+ w.WriteHeader(http.StatusForbidden)
+ for i := range serverProtocols {
+ w.Header().Add(HeaderAcceptedProtocolVersions, serverProtocols[i])
+ }
+ fmt.Fprintf(w, "unable to upgrade: unable to negotiate protocol: client supports %v, server accepts %v", clientProtocols, serverProtocols)
+ return "", fmt.Errorf("unable to upgrade: unable to negotiate protocol: client supports %v, server supports %v", clientProtocols, serverProtocols)
+ }
+
+ w.Header().Add(HeaderProtocolVersion, negotiatedProtocol)
+ return negotiatedProtocol, nil
+}