aboutsummaryrefslogtreecommitdiff
path: root/vendor/golang.org/x/net/http2/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/transport.go')
-rw-r--r--vendor/golang.org/x/net/http2/transport.go914
1 files changed, 694 insertions, 220 deletions
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index 0c7e859db..f272e8f9f 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -10,6 +10,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
+ "context"
"crypto/rand"
"crypto/tls"
"errors"
@@ -18,17 +19,20 @@ import (
"io/ioutil"
"log"
"math"
+ mathrand "math/rand"
"net"
"net/http"
+ "net/http/httptrace"
+ "net/textproto"
"sort"
"strconv"
"strings"
"sync"
"time"
+ "golang.org/x/net/http/httpguts"
"golang.org/x/net/http2/hpack"
"golang.org/x/net/idna"
- "golang.org/x/net/lex/httplex"
)
const (
@@ -86,13 +90,23 @@ type Transport struct {
// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
// send in the initial settings frame. It is how many bytes
- // of response headers are allow. Unlike the http2 spec, zero here
+ // of response headers are allowed. Unlike the http2 spec, zero here
// means to use a default limit (currently 10MB). If you actually
// want to advertise an ulimited value to the peer, Transport
// interprets the highest possible value here (0xffffffff or 1<<32-1)
// to mean no limit.
MaxHeaderListSize uint32
+ // StrictMaxConcurrentStreams controls whether the server's
+ // SETTINGS_MAX_CONCURRENT_STREAMS should be respected
+ // globally. If false, new TCP connections are created to the
+ // server as needed to keep each under the per-connection
+ // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
+ // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
+ // a global limit and callers of RoundTrip block when needed,
+ // waiting for their turn.
+ StrictMaxConcurrentStreams bool
+
// t1, if non-nil, is the standard library Transport using
// this transport. Its settings are used (but not its
// RoundTrip method, etc).
@@ -116,16 +130,56 @@ func (t *Transport) disableCompression() bool {
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
}
-var errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6")
-
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
-// It requires Go 1.6 or later and returns an error if the net/http package is too old
-// or if t1 has already been HTTP/2-enabled.
+// It returns an error if t1 has already been HTTP/2-enabled.
func ConfigureTransport(t1 *http.Transport) error {
- _, err := configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go
+ _, err := configureTransport(t1)
return err
}
+func configureTransport(t1 *http.Transport) (*Transport, error) {
+ connPool := new(clientConnPool)
+ t2 := &Transport{
+ ConnPool: noDialClientConnPool{connPool},
+ t1: t1,
+ }
+ connPool.t = t2
+ if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
+ return nil, err
+ }
+ if t1.TLSClientConfig == nil {
+ t1.TLSClientConfig = new(tls.Config)
+ }
+ if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
+ t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
+ }
+ if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
+ t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
+ }
+ upgradeFn := func(authority string, c *tls.Conn) http.RoundTripper {
+ addr := authorityAddr("https", authority)
+ if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
+ go c.Close()
+ return erringRoundTripper{err}
+ } else if !used {
+ // Turns out we don't need this c.
+ // For example, two goroutines made requests to the same host
+ // at the same time, both kicking off TCP dials. (since protocol
+ // was unknown)
+ go c.Close()
+ }
+ return t2
+ }
+ if m := t1.TLSNextProto; len(m) == 0 {
+ t1.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{
+ "h2": upgradeFn,
+ }
+ } else {
+ m["h2"] = upgradeFn
+ }
+ return t2, nil
+}
+
func (t *Transport) connPool() ClientConnPool {
t.connPoolOnce.Do(t.initConnPool)
return t.connPoolOrDef
@@ -158,21 +212,24 @@ type ClientConn struct {
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow flow // our conn-level flow control quota (cs.flow is per stream)
inflow flow // peer's conn-level flow control
+ closing bool
closed bool
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
nextStreamID uint32
+ pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
fr *Framer
lastActive time.Time
// Settings from peer: (also guarded by mu)
- maxFrameSize uint32
- maxConcurrentStreams uint32
- initialWindowSize uint32
+ maxFrameSize uint32
+ maxConcurrentStreams uint32
+ peerMaxHeaderListSize uint64
+ initialWindowSize uint32
hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder
@@ -187,7 +244,7 @@ type ClientConn struct {
type clientStream struct {
cc *ClientConn
req *http.Request
- trace *clientTrace // or nil
+ trace *httptrace.ClientTrace // or nil
ID uint32
resc chan resAndError
bufPipe pipe // buffered pipe with the flow-controlled response payload
@@ -208,43 +265,65 @@ type clientStream struct {
done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu
// owned by clientConnReadLoop:
- firstByte bool // got the first response byte
- pastHeaders bool // got first MetaHeadersFrame (actual headers)
- pastTrailers bool // got optional second MetaHeadersFrame (trailers)
+ firstByte bool // got the first response byte
+ pastHeaders bool // got first MetaHeadersFrame (actual headers)
+ pastTrailers bool // got optional second MetaHeadersFrame (trailers)
+ num1xx uint8 // number of 1xx responses seen
trailer http.Header // accumulated trailers
resTrailer *http.Header // client's Response.Trailer
}
-// awaitRequestCancel runs in its own goroutine and waits for the user
-// to cancel a RoundTrip request, its context to expire, or for the
-// request to be done (any way it might be removed from the cc.streams
-// map: peer reset, successful completion, TCP connection breakage,
-// etc)
-func (cs *clientStream) awaitRequestCancel(req *http.Request) {
- ctx := reqContext(req)
+// awaitRequestCancel waits for the user to cancel a request or for the done
+// channel to be signaled. A non-nil error is returned only if the request was
+// canceled.
+func awaitRequestCancel(req *http.Request, done <-chan struct{}) error {
+ ctx := req.Context()
if req.Cancel == nil && ctx.Done() == nil {
- return
+ return nil
}
select {
case <-req.Cancel:
- cs.cancelStream()
- cs.bufPipe.CloseWithError(errRequestCanceled)
+ return errRequestCanceled
case <-ctx.Done():
+ return ctx.Err()
+ case <-done:
+ return nil
+ }
+}
+
+var got1xxFuncForTests func(int, textproto.MIMEHeader) error
+
+// get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
+// if any. It returns nil if not set or if the Go version is too old.
+func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
+ if fn := got1xxFuncForTests; fn != nil {
+ return fn
+ }
+ return traceGot1xxResponseFunc(cs.trace)
+}
+
+// awaitRequestCancel waits for the user to cancel a request, its context to
+// expire, or for the request to be done (any way it might be removed from the
+// cc.streams map: peer reset, successful completion, TCP connection breakage,
+// etc). If the request is canceled, then cs will be canceled and closed.
+func (cs *clientStream) awaitRequestCancel(req *http.Request) {
+ if err := awaitRequestCancel(req, cs.done); err != nil {
cs.cancelStream()
- cs.bufPipe.CloseWithError(ctx.Err())
- case <-cs.done:
+ cs.bufPipe.CloseWithError(err)
}
}
func (cs *clientStream) cancelStream() {
- cs.cc.mu.Lock()
+ cc := cs.cc
+ cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
- cs.cc.mu.Unlock()
+ cc.mu.Unlock()
if !didReset {
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+ cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+ cc.forgetStreamID(cs.ID)
}
}
@@ -261,6 +340,13 @@ func (cs *clientStream) checkResetOrDone() error {
}
}
+func (cs *clientStream) getStartedWrite() bool {
+ cc := cs.cc
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return cs.startedWrite
+}
+
func (cs *clientStream) abortRequestBodyWrite(err error) {
if err == nil {
panic("nil error")
@@ -286,7 +372,26 @@ func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
return
}
-var ErrNoCachedConn = errors.New("http2: no cached connection was available")
+// noCachedConnError is the concrete type of ErrNoCachedConn, which
+// needs to be detected by net/http regardless of whether it's its
+// bundled version (in h2_bundle.go with a rewritten type name) or
+// from a user's x/net/http2. As such, as it has a unique method name
+// (IsHTTP2NoCachedConnError) that net/http sniffs for via func
+// isNoCachedConnError.
+type noCachedConnError struct{}
+
+func (noCachedConnError) IsHTTP2NoCachedConnError() {}
+func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
+
+// isNoCachedConnError reports whether err is of type noCachedConnError
+// or its equivalent renamed type in net/http2's h2_bundle.go. Both types
+// may coexist in the same running program.
+func isNoCachedConnError(err error) bool {
+ _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
+ return ok
+}
+
+var ErrNoCachedConn error = noCachedConnError{}
// RoundTripOpt are options for the Transport.RoundTripOpt method.
type RoundTripOpt struct {
@@ -329,17 +434,28 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res
}
addr := authorityAddr(req.URL.Scheme, req.URL.Host)
- for {
+ for retry := 0; ; retry++ {
cc, err := t.connPool().GetClientConn(req, addr)
if err != nil {
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
return nil, err
}
traceGotConn(req, cc)
- res, err := cc.RoundTrip(req)
- if err != nil {
- if req, err = shouldRetryRequest(req, err); err == nil {
- continue
+ res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
+ if err != nil && retry <= 6 {
+ if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
+ // After the first retry, do exponential backoff with 10% jitter.
+ if retry == 0 {
+ continue
+ }
+ backoff := float64(uint(1) << (uint(retry) - 1))
+ backoff += backoff * (0.1 * mathrand.Float64())
+ select {
+ case <-time.After(time.Second * time.Duration(backoff)):
+ continue
+ case <-req.Context().Done():
+ return nil, req.Context().Err()
+ }
}
}
if err != nil {
@@ -360,36 +476,30 @@ func (t *Transport) CloseIdleConnections() {
}
var (
- errClientConnClosed = errors.New("http2: client conn is closed")
- errClientConnUnusable = errors.New("http2: client conn not usable")
-
- errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
- errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
+ errClientConnClosed = errors.New("http2: client conn is closed")
+ errClientConnUnusable = errors.New("http2: client conn not usable")
+ errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
// shouldRetryRequest is called by RoundTrip when a request fails to get
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
-func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
- switch err {
- default:
+func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) {
+ if !canRetryError(err) {
return nil, err
- case errClientConnUnusable, errClientConnGotGoAway:
+ }
+ // If the Body is nil (or http.NoBody), it's safe to reuse
+ // this request and its Body.
+ if req.Body == nil || req.Body == http.NoBody {
return req, nil
- case errClientConnGotGoAwayAfterSomeReqBody:
- // If the Body is nil (or http.NoBody), it's safe to reuse
- // this request and its Body.
- if req.Body == nil || reqBodyIsNoBody(req.Body) {
- return req, nil
- }
- // Otherwise we depend on the Request having its GetBody
- // func defined.
- getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody
- if getBody == nil {
- return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
- }
- body, err := getBody()
+ }
+
+ // If the request body can be reset back to its original
+ // state via the optional req.GetBody, do that.
+ if req.GetBody != nil {
+ // TODO: consider a req.Body.Close here? or audit that all caller paths do?
+ body, err := req.GetBody()
if err != nil {
return nil, err
}
@@ -397,6 +507,27 @@ func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
newReq.Body = body
return &newReq, nil
}
+
+ // The Request.Body can't reset back to the beginning, but we
+ // don't seem to have started to read from it yet, so reuse
+ // the request directly. The "afterBodyWrite" means the
+ // bodyWrite process has started, which becomes true before
+ // the first Read.
+ if !afterBodyWrite {
+ return req, nil
+ }
+
+ return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
+}
+
+func canRetryError(err error) bool {
+ if err == errClientConnUnusable || err == errClientConnGotGoAway {
+ return true
+ }
+ if se, ok := err.(StreamError); ok {
+ return se.Code == ErrCodeRefusedStream
+ }
+ return false
}
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
@@ -414,7 +545,7 @@ func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, er
func (t *Transport) newTLSConfig(host string) *tls.Config {
cfg := new(tls.Config)
if t.TLSClientConfig != nil {
- *cfg = *cloneTLSConfig(t.TLSClientConfig)
+ *cfg = *t.TLSClientConfig.Clone()
}
if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
@@ -465,7 +596,7 @@ func (t *Transport) expectContinueTimeout() time.Duration {
if t.t1 == nil {
return 0
}
- return transportExpectContinueTimeout(t.t1)
+ return t.t1.ExpectContinueTimeout
}
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
@@ -474,17 +605,18 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{
- t: t,
- tconn: c,
- readerDone: make(chan struct{}),
- nextStreamID: 1,
- maxFrameSize: 16 << 10, // spec default
- initialWindowSize: 65535, // spec default
- maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
- streams: make(map[uint32]*clientStream),
- singleUse: singleUse,
- wantSettingsAck: true,
- pings: make(map[[8]byte]chan struct{}),
+ t: t,
+ tconn: c,
+ readerDone: make(chan struct{}),
+ nextStreamID: 1,
+ maxFrameSize: 16 << 10, // spec default
+ initialWindowSize: 65535, // spec default
+ maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
+ streams: make(map[uint32]*clientStream),
+ singleUse: singleUse,
+ wantSettingsAck: true,
+ pings: make(map[[8]byte]chan struct{}),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
@@ -509,6 +641,10 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
// henc in response to SETTINGS frames?
cc.henc = hpack.NewEncoder(&cc.hbuf)
+ if t.AllowHTTP {
+ cc.nextStreamID = 3
+ }
+
if cs, ok := c.(connectionStater); ok {
state := cs.ConnectionState()
cc.tlsState = &state
@@ -560,22 +696,54 @@ func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
}
}
+// CanTakeNewRequest reports whether the connection can take a new request,
+// meaning it has not been closed or received or sent a GOAWAY.
func (cc *ClientConn) CanTakeNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.canTakeNewRequestLocked()
}
-func (cc *ClientConn) canTakeNewRequestLocked() bool {
+// clientConnIdleState describes the suitability of a client
+// connection to initiate a new RoundTrip request.
+type clientConnIdleState struct {
+ canTakeNewRequest bool
+ freshConn bool // whether it's unused by any previous request
+}
+
+func (cc *ClientConn) idleState() clientConnIdleState {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return cc.idleStateLocked()
+}
+
+func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
if cc.singleUse && cc.nextStreamID > 1 {
- return false
+ return
+ }
+ var maxConcurrentOkay bool
+ if cc.t.StrictMaxConcurrentStreams {
+ // We'll tell the caller we can take a new request to
+ // prevent the caller from dialing a new TCP
+ // connection, but then we'll block later before
+ // writing it.
+ maxConcurrentOkay = true
+ } else {
+ maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams)
}
- return cc.goAway == nil && !cc.closed &&
- int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
- cc.nextStreamID < math.MaxInt32
+
+ st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
+ int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32
+ st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
+ return
+}
+
+func (cc *ClientConn) canTakeNewRequestLocked() bool {
+ st := cc.idleStateLocked()
+ return st.canTakeNewRequest
}
-// onIdleTimeout is called from a time.AfterFunc goroutine. It will
+// onIdleTimeout is called from a time.AfterFunc goroutine. It will
// only be called when we're idle, but because we're coming from a new
// goroutine, there could be a new request coming in at the same time,
// so this simply calls the synchronized closeIfIdle to shut down this
@@ -602,6 +770,87 @@ func (cc *ClientConn) closeIfIdle() {
cc.tconn.Close()
}
+var shutdownEnterWaitStateHook = func() {}
+
+// Shutdown gracefully close the client connection, waiting for running streams to complete.
+func (cc *ClientConn) Shutdown(ctx context.Context) error {
+ if err := cc.sendGoAway(); err != nil {
+ return err
+ }
+ // Wait for all in-flight streams to complete or connection to close
+ done := make(chan error, 1)
+ cancelled := false // guarded by cc.mu
+ go func() {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ for {
+ if len(cc.streams) == 0 || cc.closed {
+ cc.closed = true
+ done <- cc.tconn.Close()
+ break
+ }
+ if cancelled {
+ break
+ }
+ cc.cond.Wait()
+ }
+ }()
+ shutdownEnterWaitStateHook()
+ select {
+ case err := <-done:
+ return err
+ case <-ctx.Done():
+ cc.mu.Lock()
+ // Free the goroutine above
+ cancelled = true
+ cc.cond.Broadcast()
+ cc.mu.Unlock()
+ return ctx.Err()
+ }
+}
+
+func (cc *ClientConn) sendGoAway() error {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+ if cc.closing {
+ // GOAWAY sent already
+ return nil
+ }
+ // Send a graceful shutdown frame to server
+ maxStreamID := cc.nextStreamID
+ if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
+ return err
+ }
+ if err := cc.bw.Flush(); err != nil {
+ return err
+ }
+ // Prevent new requests
+ cc.closing = true
+ return nil
+}
+
+// Close closes the client connection immediately.
+//
+// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
+func (cc *ClientConn) Close() error {
+ cc.mu.Lock()
+ defer cc.cond.Broadcast()
+ defer cc.mu.Unlock()
+ err := errors.New("http2: client connection force closed via ClientConn.Close")
+ for id, cs := range cc.streams {
+ select {
+ case cs.resc <- resAndError{err: err}:
+ default:
+ }
+ cs.bufPipe.CloseWithError(err)
+ delete(cc.streams, id)
+ }
+ cc.closed = true
+ return cc.tconn.Close()
+}
+
const maxAllocFrameSize = 512 << 10
// frameBuffer returns a scratch buffer suitable for writing DATA frames.
@@ -684,7 +933,7 @@ func checkConnHeaders(req *http.Request) error {
if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
}
- if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "close" && vv[0] != "keep-alive") {
+ if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
return fmt.Errorf("http2: invalid Connection request header: %q", vv)
}
return nil
@@ -694,7 +943,7 @@ func checkConnHeaders(req *http.Request) error {
// req.ContentLength, where 0 actually means zero (not unknown) and -1
// means unknown.
func actualContentLength(req *http.Request) int64 {
- if req.Body == nil {
+ if req.Body == nil || req.Body == http.NoBody {
return 0
}
if req.ContentLength != 0 {
@@ -704,8 +953,13 @@ func actualContentLength(req *http.Request) int64 {
}
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
+ resp, _, err := cc.roundTrip(req)
+ return resp, err
+}
+
+func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) {
if err := checkConnHeaders(req); err != nil {
- return nil, err
+ return nil, false, err
}
if cc.idleTimer != nil {
cc.idleTimer.Stop()
@@ -713,20 +967,19 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
trailers, err := commaSeparatedTrailers(req)
if err != nil {
- return nil, err
+ return nil, false, err
}
hasTrailers := trailers != ""
cc.mu.Lock()
- cc.lastActive = time.Now()
- if cc.closed || !cc.canTakeNewRequestLocked() {
+ if err := cc.awaitOpenSlotForRequest(req); err != nil {
cc.mu.Unlock()
- return nil, errClientConnUnusable
+ return nil, false, err
}
body := req.Body
- hasBody := body != nil
contentLen := actualContentLength(req)
+ hasBody := contentLen != 0
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
var requestedGzip bool
@@ -755,19 +1008,19 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
if err != nil {
cc.mu.Unlock()
- return nil, err
+ return nil, false, err
}
cs := cc.newStream()
cs.req = req
- cs.trace = requestTrace(req)
+ cs.trace = httptrace.ContextClientTrace(req.Context())
cs.requestedGzip = requestedGzip
bodyWriter := cc.t.getBodyWriterState(cs, body)
cs.on100 = bodyWriter.on100
cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
- werr := cc.writeHeaders(cs.ID, endStream, hdrs)
+ werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()
traceWroteHeaders(cs.trace)
cc.mu.Unlock()
@@ -781,7 +1034,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
// Don't bother sending a RST_STREAM (our write already failed;
// no need to keep writing)
traceWroteRequest(cs.trace, werr)
- return nil, werr
+ return nil, false, werr
}
var respHeaderTimer <-chan time.Time
@@ -798,9 +1051,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
readLoopResCh := cs.resc
bodyWritten := false
- ctx := reqContext(req)
+ ctx := req.Context()
- handleReadLoopResponse := func(re resAndError) (*http.Response, error) {
+ handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
@@ -809,26 +1062,19 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
// 2xx, however, then assume the server DOES potentially
// want our body (e.g. full-duplex streaming:
// golang.org/issue/13444). If it turns out the server
- // doesn't, they'll RST_STREAM us soon enough. This is a
- // heuristic to avoid adding knobs to Transport. Hopefully
+ // doesn't, they'll RST_STREAM us soon enough. This is a
+ // heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWrite)
}
if re.err != nil {
- if re.err == errClientConnGotGoAway {
- cc.mu.Lock()
- if cs.startedWrite {
- re.err = errClientConnGotGoAwayAfterSomeReqBody
- }
- cc.mu.Unlock()
- }
cc.forgetStreamID(cs.ID)
- return nil, re.err
+ return nil, cs.getStartedWrite(), re.err
}
res.Request = req
res.TLS = cc.tlsState
- return res, nil
+ return res, false, nil
}
for {
@@ -836,37 +1082,37 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
case <-respHeaderTimer:
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
}
- return nil, errTimeout
- case <-ctx.Done():
cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), errTimeout
+ case <-ctx.Done():
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
}
- return nil, ctx.Err()
- case <-req.Cancel:
cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), ctx.Err()
+ case <-req.Cancel:
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
}
- return nil, errRequestCanceled
+ cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), errRequestCanceled
case <-cs.peerReset:
// processResetStream already removed the
// stream from the streams map; no need for
// forgetStreamID.
- return nil, cs.resetErr
+ return nil, cs.getStartedWrite(), cs.resetErr
case err := <-bodyWriter.resc:
// Prefer the read loop's response, if available. Issue 16102.
select {
@@ -875,7 +1121,8 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
default:
}
if err != nil {
- return nil, err
+ cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), err
}
bodyWritten = true
if d := cc.responseHeaderTimeout(); d != 0 {
@@ -887,14 +1134,55 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
}
}
+// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
+// Must hold cc.mu.
+func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
+ var waitingForConn chan struct{}
+ var waitingForConnErr error // guarded by cc.mu
+ for {
+ cc.lastActive = time.Now()
+ if cc.closed || !cc.canTakeNewRequestLocked() {
+ if waitingForConn != nil {
+ close(waitingForConn)
+ }
+ return errClientConnUnusable
+ }
+ if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
+ if waitingForConn != nil {
+ close(waitingForConn)
+ }
+ return nil
+ }
+ // Unfortunately, we cannot wait on a condition variable and channel at
+ // the same time, so instead, we spin up a goroutine to check if the
+ // request is canceled while we wait for a slot to open in the connection.
+ if waitingForConn == nil {
+ waitingForConn = make(chan struct{})
+ go func() {
+ if err := awaitRequestCancel(req, waitingForConn); err != nil {
+ cc.mu.Lock()
+ waitingForConnErr = err
+ cc.cond.Broadcast()
+ cc.mu.Unlock()
+ }
+ }()
+ }
+ cc.pendingRequests++
+ cc.cond.Wait()
+ cc.pendingRequests--
+ if waitingForConnErr != nil {
+ return waitingForConnErr
+ }
+ }
+}
+
// requires cc.wmu be held
-func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
+func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
first := true // first frame written (HEADERS is first, then CONTINUATION)
- frameSize := int(cc.maxFrameSize)
for len(hdrs) > 0 && cc.werr == nil {
chunk := hdrs
- if len(chunk) > frameSize {
- chunk = chunk[:frameSize]
+ if len(chunk) > maxFrameSize {
+ chunk = chunk[:maxFrameSize]
}
hdrs = hdrs[len(chunk):]
endHeaders := len(hdrs) == 0
@@ -955,6 +1243,7 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
sawEOF = true
err = nil
} else if err != nil {
+ cc.writeStreamReset(cs.ID, ErrCodeCancel, err)
return err
}
@@ -1002,17 +1291,26 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
var trls []byte
if hasTrailers {
cc.mu.Lock()
- defer cc.mu.Unlock()
- trls = cc.encodeTrailers(req)
+ trls, err = cc.encodeTrailers(req)
+ cc.mu.Unlock()
+ if err != nil {
+ cc.writeStreamReset(cs.ID, ErrCodeInternal, err)
+ cc.forgetStreamID(cs.ID)
+ return err
+ }
}
+ cc.mu.Lock()
+ maxFrameSize := int(cc.maxFrameSize)
+ cc.mu.Unlock()
+
cc.wmu.Lock()
defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
if len(trls) > 0 {
- err = cc.writeHeaders(cs.ID, true, trls)
+ err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
} else {
err = cc.fr.WriteData(cs.ID, true, nil)
}
@@ -1071,7 +1369,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
if host == "" {
host = req.URL.Host
}
- host, err := httplex.PunycodeHostPort(host)
+ host, err := httpguts.PunycodeHostPort(host)
if err != nil {
return nil, err
}
@@ -1096,72 +1394,103 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
// potentially pollute our hpack state. (We want to be able to
// continue to reuse the hpack encoder for future requests)
for k, vv := range req.Header {
- if !httplex.ValidHeaderFieldName(k) {
+ if !httpguts.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("invalid HTTP header name %q", k)
}
for _, v := range vv {
- if !httplex.ValidHeaderFieldValue(v) {
+ if !httpguts.ValidHeaderFieldValue(v) {
return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k)
}
}
}
- // 8.1.2.3 Request Pseudo-Header Fields
- // The :path pseudo-header field includes the path and query parts of the
- // target URI (the path-absolute production and optionally a '?' character
- // followed by the query production (see Sections 3.3 and 3.4 of
- // [RFC3986]).
- cc.writeHeader(":authority", host)
- cc.writeHeader(":method", req.Method)
- if req.Method != "CONNECT" {
- cc.writeHeader(":path", path)
- cc.writeHeader(":scheme", req.URL.Scheme)
- }
- if trailers != "" {
- cc.writeHeader("trailer", trailers)
- }
+ enumerateHeaders := func(f func(name, value string)) {
+ // 8.1.2.3 Request Pseudo-Header Fields
+ // The :path pseudo-header field includes the path and query parts of the
+ // target URI (the path-absolute production and optionally a '?' character
+ // followed by the query production (see Sections 3.3 and 3.4 of
+ // [RFC3986]).
+ f(":authority", host)
+ f(":method", req.Method)
+ if req.Method != "CONNECT" {
+ f(":path", path)
+ f(":scheme", req.URL.Scheme)
+ }
+ if trailers != "" {
+ f("trailer", trailers)
+ }
- var didUA bool
- for k, vv := range req.Header {
- lowKey := strings.ToLower(k)
- switch lowKey {
- case "host", "content-length":
- // Host is :authority, already sent.
- // Content-Length is automatic, set below.
- continue
- case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive":
- // Per 8.1.2.2 Connection-Specific Header
- // Fields, don't send connection-specific
- // fields. We have already checked if any
- // are error-worthy so just ignore the rest.
- continue
- case "user-agent":
- // Match Go's http1 behavior: at most one
- // User-Agent. If set to nil or empty string,
- // then omit it. Otherwise if not mentioned,
- // include the default (below).
- didUA = true
- if len(vv) < 1 {
+ var didUA bool
+ for k, vv := range req.Header {
+ if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") {
+ // Host is :authority, already sent.
+ // Content-Length is automatic, set below.
continue
- }
- vv = vv[:1]
- if vv[0] == "" {
+ } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") ||
+ strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") ||
+ strings.EqualFold(k, "keep-alive") {
+ // Per 8.1.2.2 Connection-Specific Header
+ // Fields, don't send connection-specific
+ // fields. We have already checked if any
+ // are error-worthy so just ignore the rest.
continue
+ } else if strings.EqualFold(k, "user-agent") {
+ // Match Go's http1 behavior: at most one
+ // User-Agent. If set to nil or empty string,
+ // then omit it. Otherwise if not mentioned,
+ // include the default (below).
+ didUA = true
+ if len(vv) < 1 {
+ continue
+ }
+ vv = vv[:1]
+ if vv[0] == "" {
+ continue
+ }
+
+ }
+
+ for _, v := range vv {
+ f(k, v)
}
}
- for _, v := range vv {
- cc.writeHeader(lowKey, v)
+ if shouldSendReqContentLength(req.Method, contentLength) {
+ f("content-length", strconv.FormatInt(contentLength, 10))
+ }
+ if addGzipHeader {
+ f("accept-encoding", "gzip")
+ }
+ if !didUA {
+ f("user-agent", defaultUserAgent)
}
}
- if shouldSendReqContentLength(req.Method, contentLength) {
- cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10))
- }
- if addGzipHeader {
- cc.writeHeader("accept-encoding", "gzip")
- }
- if !didUA {
- cc.writeHeader("user-agent", defaultUserAgent)
+
+ // Do a first pass over the headers counting bytes to ensure
+ // we don't exceed cc.peerMaxHeaderListSize. This is done as a
+ // separate pass before encoding the headers to prevent
+ // modifying the hpack state.
+ hlSize := uint64(0)
+ enumerateHeaders(func(name, value string) {
+ hf := hpack.HeaderField{Name: name, Value: value}
+ hlSize += uint64(hf.Size())
+ })
+
+ if hlSize > cc.peerMaxHeaderListSize {
+ return nil, errRequestHeaderListSize
}
+
+ trace := httptrace.ContextClientTrace(req.Context())
+ traceHeaders := traceHasWroteHeaderField(trace)
+
+ // Header list size is ok. Write the headers.
+ enumerateHeaders(func(name, value string) {
+ name = strings.ToLower(name)
+ cc.writeHeader(name, value)
+ if traceHeaders {
+ traceWroteHeaderField(trace, name, value)
+ }
+ })
+
return cc.hbuf.Bytes(), nil
}
@@ -1188,17 +1517,29 @@ func shouldSendReqContentLength(method string, contentLength int64) bool {
}
// requires cc.mu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) []byte {
+func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
cc.hbuf.Reset()
+
+ hlSize := uint64(0)
+ for k, vv := range req.Trailer {
+ for _, v := range vv {
+ hf := hpack.HeaderField{Name: k, Value: v}
+ hlSize += uint64(hf.Size())
+ }
+ }
+ if hlSize > cc.peerMaxHeaderListSize {
+ return nil, errRequestHeaderListSize
+ }
+
for k, vv := range req.Trailer {
- // Transfer-Encoding, etc.. have already been filter at the
+ // Transfer-Encoding, etc.. have already been filtered at the
// start of RoundTrip
lowKey := strings.ToLower(k)
for _, v := range vv {
cc.writeHeader(lowKey, v)
}
}
- return cc.hbuf.Bytes()
+ return cc.hbuf.Bytes(), nil
}
func (cc *ClientConn) writeHeader(name, value string) {
@@ -1246,7 +1587,9 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
- cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
+ // Wake up checkResetOrDone via clientStream.awaitFlowControl and
+ // wake up RoundTrip if there is a pending request.
+ cc.cond.Broadcast()
}
return cs
}
@@ -1254,17 +1597,12 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
type clientConnReadLoop struct {
cc *ClientConn
- activeRes map[uint32]*clientStream // keyed by streamID
closeWhenIdle bool
}
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *ClientConn) readLoop() {
- rl := &clientConnReadLoop{
- cc: cc,
- activeRes: make(map[uint32]*clientStream),
- }
-
+ rl := &clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(ConnectionError); ok {
@@ -1319,10 +1657,8 @@ func (rl *clientConnReadLoop) cleanup() {
} else if err == io.EOF {
err = io.ErrUnexpectedEOF
}
- for _, cs := range rl.activeRes {
- cs.bufPipe.CloseWithError(err)
- }
for _, cs := range cc.streams {
+ cs.bufPipe.CloseWithError(err) // no-op if already closed
select {
case cs.resc <- resAndError{err: err}:
default:
@@ -1345,8 +1681,9 @@ func (rl *clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(StreamError); ok {
- if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
+ if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
+ cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
@@ -1399,7 +1736,7 @@ func (rl *clientConnReadLoop) run() error {
}
return err
}
- if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 {
+ if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
}
@@ -1407,13 +1744,31 @@ func (rl *clientConnReadLoop) run() error {
func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
cc := rl.cc
- cs := cc.streamByID(f.StreamID, f.StreamEnded())
+ cs := cc.streamByID(f.StreamID, false)
if cs == nil {
// We'd get here if we canceled a request while the
// server had its response still in flight. So if this
// was just something we canceled, ignore it.
return nil
}
+ if f.StreamEnded() {
+ // Issue 20521: If the stream has ended, streamByID() causes
+ // clientStream.done to be closed, which causes the request's bodyWriter
+ // to be closed with an errStreamClosed, which may be received by
+ // clientConn.RoundTrip before the result of processing these headers.
+ // Deferring stream closure allows the header processing to occur first.
+ // clientConn.RoundTrip may still receive the bodyWriter error first, but
+ // the fix for issue 16102 prioritises any response.
+ //
+ // Issue 22413: If there is no request body, we should close the
+ // stream before writing to cs.resc so that the stream is closed
+ // immediately once RoundTrip returns.
+ if cs.req.Body != nil {
+ defer cc.forgetStreamID(f.StreamID)
+ } else {
+ cc.forgetStreamID(f.StreamID)
+ }
+ }
if !cs.firstByte {
if cs.trace != nil {
// TODO(bradfitz): move first response byte earlier,
@@ -1437,6 +1792,7 @@ func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
}
// Any other error type is a stream error.
cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err)
+ cc.forgetStreamID(cs.ID)
cs.resc <- resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
}
@@ -1444,9 +1800,6 @@ func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
// (nil, nil) special case. See handleResponse docs.
return nil
}
- if res.Body != noBody {
- rl.activeRes[cs.ID] = cs
- }
cs.resTrailer = &res.Trailer
cs.resc <- resAndError{res: res}
return nil
@@ -1457,8 +1810,7 @@ func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
// is the detail.
//
// As a special case, handleResponse may return (nil, nil) to skip the
-// frame (currently only used for 100 expect continue). This special
-// case is going away after Issue 13851 is fixed.
+// frame (currently only used for 1xx responses).
func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
if f.Truncated {
return nil, errResponseHeaderListSize
@@ -1466,20 +1818,11 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
status := f.PseudoValue("status")
if status == "" {
- return nil, errors.New("missing status pseudo header")
+ return nil, errors.New("malformed response from server: missing status pseudo header")
}
statusCode, err := strconv.Atoi(status)
if err != nil {
- return nil, errors.New("malformed non-numeric status pseudo header")
- }
-
- if statusCode == 100 {
- traceGot100Continue(cs.trace)
- if cs.on100 != nil {
- cs.on100() // forces any write delay timer to fire
- }
- cs.pastHeaders = false // do it all again
- return nil, nil
+ return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
}
header := make(http.Header)
@@ -1506,6 +1849,27 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
}
}
+ if statusCode >= 100 && statusCode <= 199 {
+ cs.num1xx++
+ const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http
+ if cs.num1xx > max1xxResponses {
+ return nil, errors.New("http2: too many 1xx informational responses")
+ }
+ if fn := cs.get1xxTraceFunc(); fn != nil {
+ if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
+ return nil, err
+ }
+ }
+ if statusCode == 100 {
+ traceGot100Continue(cs.trace)
+ if cs.on100 != nil {
+ cs.on100() // forces any write delay timer to fire
+ }
+ }
+ cs.pastHeaders = false // do it all again
+ return nil, nil
+ }
+
streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
@@ -1528,8 +1892,7 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
return res, nil
}
- buf := new(bytes.Buffer) // TODO(bradfitz): recycle this garbage
- cs.bufPipe = pipe{b: buf}
+ cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)
@@ -1539,7 +1902,7 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
res.Header.Del("Content-Length")
res.ContentLength = -1
res.Body = &gzipReader{body: res.Body}
- setResponseUncompressed(res)
+ res.Uncompressed = true
}
return res, nil
}
@@ -1656,6 +2019,7 @@ func (b transportResponseBody) Close() error {
cc.wmu.Lock()
if !serverSentStreamEnd {
cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
+ cs.didReset = true
}
// Return connection-level flow control.
if unread > 0 {
@@ -1668,6 +2032,7 @@ func (b transportResponseBody) Close() error {
}
cs.bufPipe.BreakWithError(errClosedResponseBody)
+ cc.forgetStreamID(cs.ID)
return nil
}
@@ -1702,13 +2067,23 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
}
return nil
}
+ if !cs.firstByte {
+ cc.logf("protocol error: received DATA before a HEADERS frame")
+ rl.endStreamError(cs, StreamError{
+ StreamID: f.StreamID,
+ Code: ErrCodeProtocol,
+ })
+ return nil
+ }
if f.Length > 0 {
- if len(data) > 0 && cs.bufPipe.b == nil {
- // Data frame after it's already closed?
- cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
- return ConnectionError(ErrCodeProtocol)
+ if cs.req.Method == "HEAD" && len(data) > 0 {
+ cc.logf("protocol error: received DATA on a HEAD request")
+ rl.endStreamError(cs, StreamError{
+ StreamID: f.StreamID,
+ Code: ErrCodeProtocol,
+ })
+ return nil
}
-
// Check connection-level flow control.
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
@@ -1719,16 +2094,27 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
- if pad := int32(f.Length) - int32(len(data)); pad > 0 {
- cs.inflow.add(pad)
- cc.inflow.add(pad)
+ var refund int
+ if pad := int(f.Length) - len(data); pad > 0 {
+ refund += pad
+ }
+ // Return len(data) now if the stream is already closed,
+ // since data will never be read.
+ didReset := cs.didReset
+ if didReset {
+ refund += len(data)
+ }
+ if refund > 0 {
+ cc.inflow.add(int32(refund))
cc.wmu.Lock()
- cc.fr.WriteWindowUpdate(0, uint32(pad))
- cc.fr.WriteWindowUpdate(cs.ID, uint32(pad))
+ cc.fr.WriteWindowUpdate(0, uint32(refund))
+ if !didReset {
+ cs.inflow.add(int32(refund))
+ cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
+ }
cc.bw.Flush()
cc.wmu.Unlock()
}
- didReset := cs.didReset
cc.mu.Unlock()
if len(data) > 0 && !didReset {
@@ -1759,11 +2145,10 @@ func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
err = io.EOF
code = cs.copyTrailers
}
- cs.bufPipe.closeWithErrorAndCode(err, code)
- delete(rl.activeRes, cs.ID)
if isConnectionCloseRequest(cs.req) {
rl.closeWhenIdle = true
}
+ cs.bufPipe.closeWithErrorAndCode(err, code)
select {
case cs.resc <- resAndError{err: err}:
@@ -1811,6 +2196,8 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
cc.maxFrameSize = s.Val
case SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
+ case SettingMaxHeaderListSize:
+ cc.peerMaxHeaderListSize = uint64(s.Val)
case SettingInitialWindowSize:
// Values above the maximum flow-control
// window size of 2^31-1 MUST be treated as a
@@ -1888,13 +2275,11 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
cs.bufPipe.CloseWithError(err)
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
- delete(rl.activeRes, cs.ID)
return nil
}
// Ping sends a PING frame to the server and waits for the ack.
-// Public implementation is in go17.go and not_go17.go
-func (cc *ClientConn) ping(ctx contextContext) error {
+func (cc *ClientConn) Ping(ctx context.Context) error {
c := make(chan struct{})
// Generate a random payload
var p [8]byte
@@ -1977,6 +2362,7 @@ func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error)
var (
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
+ errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
)
@@ -2070,7 +2456,7 @@ func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s body
}
s.delay = t.expectContinueTimeout()
if s.delay == 0 ||
- !httplex.HeaderValuesContainsToken(
+ !httpguts.HeaderValuesContainsToken(
cs.req.Header["Expect"],
"100-continue") {
return
@@ -2125,5 +2511,93 @@ func (s bodyWriterState) scheduleBodyWrite() {
// isConnectionCloseRequest reports whether req should use its own
// connection for a single request and then close the connection.
func isConnectionCloseRequest(req *http.Request) bool {
- return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close")
+ return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
+}
+
+// registerHTTPSProtocol calls Transport.RegisterProtocol but
+// converting panics into errors.
+func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
+ defer func() {
+ if e := recover(); e != nil {
+ err = fmt.Errorf("%v", e)
+ }
+ }()
+ t.RegisterProtocol("https", rt)
+ return nil
+}
+
+// noDialH2RoundTripper is a RoundTripper which only tries to complete the request
+// if there's already has a cached connection to the host.
+// (The field is exported so it can be accessed via reflect from net/http; tested
+// by TestNoDialH2RoundTripperType)
+type noDialH2RoundTripper struct{ *Transport }
+
+func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+ res, err := rt.Transport.RoundTrip(req)
+ if isNoCachedConnError(err) {
+ return nil, http.ErrSkipAltProtocol
+ }
+ return res, err
+}
+
+func (t *Transport) idleConnTimeout() time.Duration {
+ if t.t1 != nil {
+ return t.t1.IdleConnTimeout
+ }
+ return 0
+}
+
+func traceGetConn(req *http.Request, hostPort string) {
+ trace := httptrace.ContextClientTrace(req.Context())
+ if trace == nil || trace.GetConn == nil {
+ return
+ }
+ trace.GetConn(hostPort)
+}
+
+func traceGotConn(req *http.Request, cc *ClientConn) {
+ trace := httptrace.ContextClientTrace(req.Context())
+ if trace == nil || trace.GotConn == nil {
+ return
+ }
+ ci := httptrace.GotConnInfo{Conn: cc.tconn}
+ cc.mu.Lock()
+ ci.Reused = cc.nextStreamID > 1
+ ci.WasIdle = len(cc.streams) == 0 && ci.Reused
+ if ci.WasIdle && !cc.lastActive.IsZero() {
+ ci.IdleTime = time.Now().Sub(cc.lastActive)
+ }
+ cc.mu.Unlock()
+
+ trace.GotConn(ci)
+}
+
+func traceWroteHeaders(trace *httptrace.ClientTrace) {
+ if trace != nil && trace.WroteHeaders != nil {
+ trace.WroteHeaders()
+ }
+}
+
+func traceGot100Continue(trace *httptrace.ClientTrace) {
+ if trace != nil && trace.Got100Continue != nil {
+ trace.Got100Continue()
+ }
+}
+
+func traceWait100Continue(trace *httptrace.ClientTrace) {
+ if trace != nil && trace.Wait100Continue != nil {
+ trace.Wait100Continue()
+ }
+}
+
+func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
+ if trace != nil && trace.WroteRequest != nil {
+ trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
+ }
+}
+
+func traceFirstResponseByte(trace *httptrace.ClientTrace) {
+ if trace != nil && trace.GotFirstResponseByte != nil {
+ trace.GotFirstResponseByte()
+ }
}