summaryrefslogtreecommitdiff
path: root/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy')
-rw-r--r--vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go50
-rw-r--r--vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go49
-rw-r--r--vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go17
3 files changed, 104 insertions, 12 deletions
diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go
index 7a6881250..336b4908b 100644
--- a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go
+++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go
@@ -34,38 +34,62 @@ type connection struct {
streams []httpstream.Stream
streamLock sync.Mutex
newStreamHandler httpstream.NewStreamHandler
+ ping func() (time.Duration, error)
}
// NewClientConnection creates a new SPDY client connection.
func NewClientConnection(conn net.Conn) (httpstream.Connection, error) {
+ return NewClientConnectionWithPings(conn, 0)
+}
+
+// NewClientConnectionWithPings creates a new SPDY client connection.
+//
+// If pingPeriod is non-zero, a background goroutine will send periodic Ping
+// frames to the server. Use this to keep idle connections through certain load
+// balancers alive longer.
+func NewClientConnectionWithPings(conn net.Conn, pingPeriod time.Duration) (httpstream.Connection, error) {
spdyConn, err := spdystream.NewConnection(conn, false)
if err != nil {
defer conn.Close()
return nil, err
}
- return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil
+ return newConnection(spdyConn, httpstream.NoOpNewStreamHandler, pingPeriod, spdyConn.Ping), nil
}
// NewServerConnection creates a new SPDY server connection. newStreamHandler
// will be invoked when the server receives a newly created stream from the
// client.
func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
+ return NewServerConnectionWithPings(conn, newStreamHandler, 0)
+}
+
+// NewServerConnectionWithPings creates a new SPDY server connection.
+// newStreamHandler will be invoked when the server receives a newly created
+// stream from the client.
+//
+// If pingPeriod is non-zero, a background goroutine will send periodic Ping
+// frames to the server. Use this to keep idle connections through certain load
+// balancers alive longer.
+func NewServerConnectionWithPings(conn net.Conn, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration) (httpstream.Connection, error) {
spdyConn, err := spdystream.NewConnection(conn, true)
if err != nil {
defer conn.Close()
return nil, err
}
- return newConnection(spdyConn, newStreamHandler), nil
+ return newConnection(spdyConn, newStreamHandler, pingPeriod, spdyConn.Ping), nil
}
// newConnection returns a new connection wrapping conn. newStreamHandler
// will be invoked when the server receives a newly created stream from the
// client.
-func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
- c := &connection{conn: conn, newStreamHandler: newStreamHandler}
+func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration, pingFn func() (time.Duration, error)) httpstream.Connection {
+ c := &connection{conn: conn, newStreamHandler: newStreamHandler, ping: pingFn}
go conn.Serve(c.newSpdyStream)
+ if pingPeriod > 0 && pingFn != nil {
+ go c.sendPings(pingPeriod)
+ }
return c
}
@@ -143,3 +167,21 @@ func (c *connection) newSpdyStream(stream *spdystream.Stream) {
func (c *connection) SetIdleTimeout(timeout time.Duration) {
c.conn.SetIdleTimeout(timeout)
}
+
+func (c *connection) sendPings(period time.Duration) {
+ t := time.NewTicker(period)
+ defer t.Stop()
+ for {
+ select {
+ case <-c.conn.CloseChan():
+ return
+ case <-t.C:
+ }
+ if _, err := c.ping(); err != nil {
+ klog.V(3).Infof("SPDY Ping failed: %v", err)
+ // Continue, in case this is a transient failure.
+ // c.conn.CloseChan above will tell us when the connection is
+ // actually closed.
+ }
+ }
+}
diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go
index 6309fbc26..4cb1cfadc 100644
--- a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go
+++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go
@@ -30,6 +30,7 @@ import (
"net/http/httputil"
"net/url"
"strings"
+ "time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -70,6 +71,9 @@ type SpdyRoundTripper struct {
// requireSameHostRedirects restricts redirect following to only follow redirects to the same host
// as the original request.
requireSameHostRedirects bool
+ // pingPeriod is a period for sending Ping frames over established
+ // connections.
+ pingPeriod time.Duration
}
var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{}
@@ -79,20 +83,53 @@ var _ utilnet.Dialer = &SpdyRoundTripper{}
// NewRoundTripper creates a new SpdyRoundTripper that will use the specified
// tlsConfig.
func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper {
- return NewRoundTripperWithProxy(tlsConfig, followRedirects, requireSameHostRedirects, utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment))
+ return NewRoundTripperWithConfig(RoundTripperConfig{
+ TLS: tlsConfig,
+ FollowRedirects: followRedirects,
+ RequireSameHostRedirects: requireSameHostRedirects,
+ })
}
// NewRoundTripperWithProxy creates a new SpdyRoundTripper that will use the
// specified tlsConfig and proxy func.
func NewRoundTripperWithProxy(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool, proxier func(*http.Request) (*url.URL, error)) *SpdyRoundTripper {
+ return NewRoundTripperWithConfig(RoundTripperConfig{
+ TLS: tlsConfig,
+ FollowRedirects: followRedirects,
+ RequireSameHostRedirects: requireSameHostRedirects,
+ Proxier: proxier,
+ })
+}
+
+// NewRoundTripperWithProxy creates a new SpdyRoundTripper with the specified
+// configuration.
+func NewRoundTripperWithConfig(cfg RoundTripperConfig) *SpdyRoundTripper {
+ if cfg.Proxier == nil {
+ cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
+ }
return &SpdyRoundTripper{
- tlsConfig: tlsConfig,
- followRedirects: followRedirects,
- requireSameHostRedirects: requireSameHostRedirects,
- proxier: proxier,
+ tlsConfig: cfg.TLS,
+ followRedirects: cfg.FollowRedirects,
+ requireSameHostRedirects: cfg.RequireSameHostRedirects,
+ proxier: cfg.Proxier,
+ pingPeriod: cfg.PingPeriod,
}
}
+// RoundTripperConfig is a set of options for an SpdyRoundTripper.
+type RoundTripperConfig struct {
+ // TLS configuration used by the round tripper.
+ TLS *tls.Config
+ // Proxier is a proxy function invoked on each request. Optional.
+ Proxier func(*http.Request) (*url.URL, error)
+ // PingPeriod is a period for sending SPDY Pings on the connection.
+ // Optional.
+ PingPeriod time.Duration
+
+ FollowRedirects bool
+ RequireSameHostRedirects bool
+}
+
// TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during
// proxying with a spdy roundtripper.
func (s *SpdyRoundTripper) TLSClientConfig() *tls.Config {
@@ -316,7 +353,7 @@ func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connec
return nil, fmt.Errorf("unable to upgrade connection: %s", responseError)
}
- return NewClientConnection(s.conn)
+ return NewClientConnectionWithPings(s.conn, s.pingPeriod)
}
// statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection
diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go
index 045d214d2..f17eb09e9 100644
--- a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go
+++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go
@@ -24,6 +24,7 @@ import (
"net/http"
"strings"
"sync/atomic"
+ "time"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
@@ -34,6 +35,7 @@ const HeaderSpdy31 = "SPDY/3.1"
// responseUpgrader knows how to upgrade HTTP responses. It
// implements the httpstream.ResponseUpgrader interface.
type responseUpgrader struct {
+ pingPeriod time.Duration
}
// connWrapper is used to wrap a hijacked connection and its bufio.Reader. All
@@ -64,7 +66,18 @@ func (w *connWrapper) Close() error {
// capable of upgrading HTTP responses using SPDY/3.1 via the
// spdystream package.
func NewResponseUpgrader() httpstream.ResponseUpgrader {
- return responseUpgrader{}
+ return NewResponseUpgraderWithPings(0)
+}
+
+// NewResponseUpgraderWithPings returns a new httpstream.ResponseUpgrader that
+// is capable of upgrading HTTP responses using SPDY/3.1 via the spdystream
+// package.
+//
+// If pingPeriod is non-zero, for each incoming connection a background
+// goroutine will send periodic Ping frames to the server. Use this to keep
+// idle connections through certain load balancers alive longer.
+func NewResponseUpgraderWithPings(pingPeriod time.Duration) httpstream.ResponseUpgrader {
+ return responseUpgrader{pingPeriod: pingPeriod}
}
// UpgradeResponse upgrades an HTTP response to one that supports multiplexed
@@ -97,7 +110,7 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque
}
connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader}
- spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler)
+ spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod)
if err != nil {
runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err))
return nil