diff options
author | Valentin Rothberg <rothberg@redhat.com> | 2019-01-08 14:52:57 +0100 |
---|---|---|
committer | Valentin Rothberg <rothberg@redhat.com> | 2019-01-11 13:38:11 +0100 |
commit | bd40dcfc2bc7c9014ea1f33482fb63aacbcdfe87 (patch) | |
tree | 5f06e4e289f16d9164d692590a3fe6541b5384cf /vendor/golang.org/x/net/http2/server.go | |
parent | 545f24421247c9f6251a634764db3f8f8070a812 (diff) | |
download | podman-bd40dcfc2bc7c9014ea1f33482fb63aacbcdfe87.tar.gz podman-bd40dcfc2bc7c9014ea1f33482fb63aacbcdfe87.tar.bz2 podman-bd40dcfc2bc7c9014ea1f33482fb63aacbcdfe87.zip |
vendor: update everything
* If possible, update each dependency to the latest available version.
* Use releases over commit IDs and avoid vendoring branches.
Signed-off-by: Valentin Rothberg <rothberg@redhat.com>
Diffstat (limited to 'vendor/golang.org/x/net/http2/server.go')
-rw-r--r-- | vendor/golang.org/x/net/http2/server.go | 652 |
1 files changed, 400 insertions, 252 deletions
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go index 3c6b90ccd..b57b6e2d0 100644 --- a/vendor/golang.org/x/net/http2/server.go +++ b/vendor/golang.org/x/net/http2/server.go @@ -28,6 +28,7 @@ package http2 import ( "bufio" "bytes" + "context" "crypto/tls" "errors" "fmt" @@ -46,6 +47,7 @@ import ( "sync" "time" + "golang.org/x/net/http/httpguts" "golang.org/x/net/http2/hpack" ) @@ -110,9 +112,41 @@ type Server struct { // activity for the purposes of IdleTimeout. IdleTimeout time.Duration + // MaxUploadBufferPerConnection is the size of the initial flow + // control window for each connections. The HTTP/2 spec does not + // allow this to be smaller than 65535 or larger than 2^32-1. + // If the value is outside this range, a default value will be + // used instead. + MaxUploadBufferPerConnection int32 + + // MaxUploadBufferPerStream is the size of the initial flow control + // window for each stream. The HTTP/2 spec does not allow this to + // be larger than 2^32-1. If the value is zero or larger than the + // maximum, a default value will be used instead. + MaxUploadBufferPerStream int32 + // NewWriteScheduler constructs a write scheduler for a connection. // If nil, a default scheduler is chosen. NewWriteScheduler func() WriteScheduler + + // Internal state. This is a pointer (rather than embedded directly) + // so that we don't embed a Mutex in this struct, which will make the + // struct non-copyable, which might break some callers. + state *serverInternalState +} + +func (s *Server) initialConnRecvWindowSize() int32 { + if s.MaxUploadBufferPerConnection > initialWindowSize { + return s.MaxUploadBufferPerConnection + } + return 1 << 20 +} + +func (s *Server) initialStreamRecvWindowSize() int32 { + if s.MaxUploadBufferPerStream > 0 { + return s.MaxUploadBufferPerStream + } + return 1 << 20 } func (s *Server) maxReadFrameSize() uint32 { @@ -129,6 +163,40 @@ func (s *Server) maxConcurrentStreams() uint32 { return defaultMaxStreams } +type serverInternalState struct { + mu sync.Mutex + activeConns map[*serverConn]struct{} +} + +func (s *serverInternalState) registerConn(sc *serverConn) { + if s == nil { + return // if the Server was used without calling ConfigureServer + } + s.mu.Lock() + s.activeConns[sc] = struct{}{} + s.mu.Unlock() +} + +func (s *serverInternalState) unregisterConn(sc *serverConn) { + if s == nil { + return // if the Server was used without calling ConfigureServer + } + s.mu.Lock() + delete(s.activeConns, sc) + s.mu.Unlock() +} + +func (s *serverInternalState) startGracefulShutdown() { + if s == nil { + return // if the Server was used without calling ConfigureServer + } + s.mu.Lock() + for sc := range s.activeConns { + sc.startGracefulShutdown() + } + s.mu.Unlock() +} + // ConfigureServer adds HTTP/2 support to a net/http Server. // // The configuration conf may be nil. @@ -141,21 +209,30 @@ func ConfigureServer(s *http.Server, conf *Server) error { if conf == nil { conf = new(Server) } - if err := configureServer18(s, conf); err != nil { - return err + conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})} + if h1, h2 := s, conf; h2.IdleTimeout == 0 { + if h1.IdleTimeout != 0 { + h2.IdleTimeout = h1.IdleTimeout + } else { + h2.IdleTimeout = h1.ReadTimeout + } } + s.RegisterOnShutdown(conf.state.startGracefulShutdown) if s.TLSConfig == nil { s.TLSConfig = new(tls.Config) } else if s.TLSConfig.CipherSuites != nil { // If they already provided a CipherSuite list, return // an error if it has a bad order or is missing - // ECDHE_RSA_WITH_AES_128_GCM_SHA256. - const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 + // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256. haveRequired := false sawBad := false for i, cs := range s.TLSConfig.CipherSuites { - if cs == requiredCipher { + switch cs { + case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + // Alternative MTI cipher to not discourage ECDSA-only servers. + // See http://golang.org/cl/30721 for further information. + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: haveRequired = true } if isBadCipher(cs) { @@ -165,7 +242,7 @@ func ConfigureServer(s *http.Server, conf *Server) error { } } if !haveRequired { - return fmt.Errorf("http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.") } } @@ -255,35 +332,37 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { defer cancel() sc := &serverConn{ - srv: s, - hs: opts.baseConfig(), - conn: c, - baseCtx: baseCtx, - remoteAddrStr: c.RemoteAddr().String(), - bw: newBufferedWriter(c), - handler: opts.handler(), - streams: make(map[uint32]*stream), - readFrameCh: make(chan readFrameResult), - wantWriteFrameCh: make(chan FrameWriteRequest, 8), - wantStartPushCh: make(chan startPushRequest, 8), - wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync - bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way - doneServing: make(chan struct{}), - clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" - advMaxStreams: s.maxConcurrentStreams(), - initialWindowSize: initialWindowSize, - maxFrameSize: initialMaxFrameSize, - headerTableSize: initialHeaderTableSize, - serveG: newGoroutineLock(), - pushEnabled: true, - } + srv: s, + hs: opts.baseConfig(), + conn: c, + baseCtx: baseCtx, + remoteAddrStr: c.RemoteAddr().String(), + bw: newBufferedWriter(c), + handler: opts.handler(), + streams: make(map[uint32]*stream), + readFrameCh: make(chan readFrameResult), + wantWriteFrameCh: make(chan FrameWriteRequest, 8), + serveMsgCh: make(chan interface{}, 8), + wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync + bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way + doneServing: make(chan struct{}), + clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" + advMaxStreams: s.maxConcurrentStreams(), + initialStreamSendWindowSize: initialWindowSize, + maxFrameSize: initialMaxFrameSize, + headerTableSize: initialHeaderTableSize, + serveG: newGoroutineLock(), + pushEnabled: true, + } + + s.state.registerConn(sc) + defer s.state.unregisterConn(sc) // The net/http package sets the write deadline from the // http.Server.WriteTimeout during the TLS handshake, but then - // passes the connection off to us with the deadline already - // set. Disarm it here so that it is not applied to additional - // streams opened on this connection. - // TODO: implement WriteTimeout fully. See Issue 18437. + // passes the connection off to us with the deadline already set. + // Write deadlines are set per stream in serverConn.newStream. + // Disarm the net.Conn write deadline here. if sc.hs.WriteTimeout != 0 { sc.conn.SetWriteDeadline(time.Time{}) } @@ -294,6 +373,9 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { sc.writeSched = NewRandomWriteScheduler() } + // These start at the RFC-specified defaults. If there is a higher + // configured value for inflow, that will be updated when we send a + // WINDOW_UPDATE shortly after sending SETTINGS. sc.flow.add(initialWindowSize) sc.inflow.add(initialWindowSize) sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) @@ -328,7 +410,7 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { // addresses during development. // // TODO: optionally enforce? Or enforce at the time we receive - // a new request, and verify the the ServerName matches the :authority? + // a new request, and verify the ServerName matches the :authority? // But that precludes proxy situations, perhaps. // // So for now, do nothing here again. @@ -356,6 +438,15 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { sc.serve() } +func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) { + ctx, cancel = context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr()) + if hs := opts.baseConfig(); hs != nil { + ctx = context.WithValue(ctx, http.ServerContextKey, hs) + } + return +} + func (sc *serverConn) rejectConn(err ErrCode, debug string) { sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) // ignoring errors. hanging up anyway. @@ -371,15 +462,14 @@ type serverConn struct { conn net.Conn bw *bufferedWriter // writing to conn handler http.Handler - baseCtx contextContext + baseCtx context.Context framer *Framer doneServing chan struct{} // closed when serverConn.serve ends readFrameCh chan readFrameResult // written by serverConn.readFrames wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve - wantStartPushCh chan startPushRequest // from handlers -> serve wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes bodyReadCh chan bodyReadMsg // from handlers -> serve - testHookCh chan func(int) // code to run on the serve loop + serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop flow flow // conn-wide (not stream-specific) outbound flow control inflow flow // conn-wide inbound flow control tlsState *tls.ConnectionState // shared by all handlers, like net/http @@ -387,38 +477,39 @@ type serverConn struct { writeSched WriteScheduler // Everything following is owned by the serve loop; use serveG.check(): - serveG goroutineLock // used to verify funcs are on serve() - pushEnabled bool - sawFirstSettings bool // got the initial SETTINGS frame after the preface - needToSendSettingsAck bool - unackedSettings int // how many SETTINGS have we sent without ACKs? - clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) - advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client - curClientStreams uint32 // number of open streams initiated by the client - curPushedStreams uint32 // number of open streams initiated by server push - maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests - maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes - streams map[uint32]*stream - initialWindowSize int32 - maxFrameSize int32 - headerTableSize uint32 - peerMaxHeaderListSize uint32 // zero means unknown (default) - canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case - writingFrame bool // started writing a frame (on serve goroutine or separate) - writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh - needsFrameFlush bool // last frame write wasn't a flush - inGoAway bool // we've started to or sent GOAWAY - inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop - needToSendGoAway bool // we need to schedule a GOAWAY frame write - goAwayCode ErrCode - shutdownTimerCh <-chan time.Time // nil until used - shutdownTimer *time.Timer // nil until used - idleTimer *time.Timer // nil if unused - idleTimerCh <-chan time.Time // nil if unused + serveG goroutineLock // used to verify funcs are on serve() + pushEnabled bool + sawFirstSettings bool // got the initial SETTINGS frame after the preface + needToSendSettingsAck bool + unackedSettings int // how many SETTINGS have we sent without ACKs? + clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) + advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client + curClientStreams uint32 // number of open streams initiated by the client + curPushedStreams uint32 // number of open streams initiated by server push + maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests + maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes + streams map[uint32]*stream + initialStreamSendWindowSize int32 + maxFrameSize int32 + headerTableSize uint32 + peerMaxHeaderListSize uint32 // zero means unknown (default) + canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case + writingFrame bool // started writing a frame (on serve goroutine or separate) + writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh + needsFrameFlush bool // last frame write wasn't a flush + inGoAway bool // we've started to or sent GOAWAY + inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop + needToSendGoAway bool // we need to schedule a GOAWAY frame write + goAwayCode ErrCode + shutdownTimer *time.Timer // nil until used + idleTimer *time.Timer // nil if unused // Owned by the writeFrameAsync goroutine: headerWriteBuf bytes.Buffer hpackEncoder *hpack.Encoder + + // Used by startGracefulShutdown. + shutdownOnce sync.Once } func (sc *serverConn) maxHeaderListSize() uint32 { @@ -451,7 +542,7 @@ type stream struct { id uint32 body *pipe // non-nil if expecting DATA frames cw closeWaiter // closed wait stream transitions to closed state - ctx contextContext + ctx context.Context cancelCtx func() // owned by serverConn's serve loop: @@ -463,10 +554,10 @@ type stream struct { numTrailerValues int64 weight uint8 state streamState - resetQueued bool // RST_STREAM queued for write; set by sc.resetStream - gotTrailerHeader bool // HEADER frame for trailers was seen - wroteHeaders bool // whether we wrote headers (not status 100) - reqBuf []byte // if non-nil, body pipe buffer to return later at EOF + resetQueued bool // RST_STREAM queued for write; set by sc.resetStream + gotTrailerHeader bool // HEADER frame for trailers was seen + wroteHeaders bool // whether we wrote headers (not status 100) + writeDeadline *time.Timer // nil if unused trailer http.Header // accumulated trailers reqTrailer http.Header // handler's Request.Trailer @@ -574,7 +665,7 @@ func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { if err == nil { return } - if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) { + if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout { // Boring, expected errors. sc.vlogf(format, args...) } else { @@ -584,6 +675,7 @@ func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { func (sc *serverConn) canonicalHeader(v string) string { sc.serveG.check() + buildCommonHeaderMapsOnce() cv, ok := commonCanonHeader[v] if ok { return cv @@ -696,48 +788,48 @@ func (sc *serverConn) serve() { {SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, {SettingMaxConcurrentStreams, sc.advMaxStreams}, {SettingMaxHeaderListSize, sc.maxHeaderListSize()}, - - // TODO: more actual settings, notably - // SettingInitialWindowSize, but then we also - // want to bump up the conn window size the - // same amount here right after the settings + {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, }, }) sc.unackedSettings++ + // Each connection starts with intialWindowSize inflow tokens. + // If a higher value is configured, we add more tokens. + if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 { + sc.sendWindowUpdate(nil, int(diff)) + } + if err := sc.readPreface(); err != nil { sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) return } // Now that we've got the preface, get us out of the - // "StateNew" state. We can't go directly to idle, though. + // "StateNew" state. We can't go directly to idle, though. // Active means we read some data and anticipate a request. We'll // do another Active when we get a HEADERS frame. sc.setConnState(http.StateActive) sc.setConnState(http.StateIdle) if sc.srv.IdleTimeout != 0 { - sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout) + sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) defer sc.idleTimer.Stop() - sc.idleTimerCh = sc.idleTimer.C - } - - var gracefulShutdownCh <-chan struct{} - if sc.hs != nil { - gracefulShutdownCh = h1ServerShutdownChan(sc.hs) } go sc.readFrames() // closed by defer sc.conn.Close above - settingsTimer := time.NewTimer(firstSettingsTimeout) + settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer) + defer settingsTimer.Stop() + loopNum := 0 for { loopNum++ select { case wr := <-sc.wantWriteFrameCh: + if se, ok := wr.write.(StreamError); ok { + sc.resetStream(se) + break + } sc.writeFrame(wr) - case spr := <-sc.wantStartPushCh: - sc.startPush(spr) case res := <-sc.wroteFrameCh: sc.wroteFrame(res) case res := <-sc.readFrameCh: @@ -745,36 +837,85 @@ func (sc *serverConn) serve() { return } res.readMore() - if settingsTimer.C != nil { + if settingsTimer != nil { settingsTimer.Stop() - settingsTimer.C = nil + settingsTimer = nil } case m := <-sc.bodyReadCh: sc.noteBodyRead(m.st, m.n) - case <-settingsTimer.C: - sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) - return - case <-gracefulShutdownCh: - gracefulShutdownCh = nil - sc.startGracefulShutdown() - case <-sc.shutdownTimerCh: - sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) - return - case <-sc.idleTimerCh: - sc.vlogf("connection is idle") - sc.goAway(ErrCodeNo) - case fn := <-sc.testHookCh: - fn(loopNum) + case msg := <-sc.serveMsgCh: + switch v := msg.(type) { + case func(int): + v(loopNum) // for testing + case *serverMessage: + switch v { + case settingsTimerMsg: + sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) + return + case idleTimerMsg: + sc.vlogf("connection is idle") + sc.goAway(ErrCodeNo) + case shutdownTimerMsg: + sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) + return + case gracefulShutdownMsg: + sc.startGracefulShutdownInternal() + default: + panic("unknown timer") + } + case *startPushRequest: + sc.startPush(v) + default: + panic(fmt.Sprintf("unexpected type %T", v)) + } } - if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { - return + // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY + // with no error code (graceful shutdown), don't start the timer until + // all open streams have been completed. + sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame + gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0 + if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) { + sc.shutDownIn(goAwayTimeout) } } } -// readPreface reads the ClientPreface greeting from the peer -// or returns an error on timeout or an invalid greeting. +func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { + select { + case <-sc.doneServing: + case <-sharedCh: + close(privateCh) + } +} + +type serverMessage int + +// Message values sent to serveMsgCh. +var ( + settingsTimerMsg = new(serverMessage) + idleTimerMsg = new(serverMessage) + shutdownTimerMsg = new(serverMessage) + gracefulShutdownMsg = new(serverMessage) +) + +func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } +func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) } +func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) } + +func (sc *serverConn) sendServeMsg(msg interface{}) { + sc.serveG.checkNotOn() // NOT + select { + case sc.serveMsgCh <- msg: + case <-sc.doneServing: + } +} + +var errPrefaceTimeout = errors.New("timeout waiting for client preface") + +// readPreface reads the ClientPreface greeting from the peer or +// returns errPrefaceTimeout on timeout, or an error if the greeting +// is invalid. func (sc *serverConn) readPreface() error { errc := make(chan error, 1) go func() { @@ -792,7 +933,7 @@ func (sc *serverConn) readPreface() error { defer timer.Stop() select { case <-timer.C: - return errors.New("timeout waiting for client preface") + return errPrefaceTimeout case err := <-errc: if err == nil { if VerboseLogs { @@ -981,7 +1122,7 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { // errHandlerPanicked is the error given to any callers blocked in a read from // Request.Body when the main goroutine panics. Since most handlers read in the -// the main ServeHTTP goroutine, this will show up rarely. +// main ServeHTTP goroutine, this will show up rarely. var errHandlerPanicked = errors.New("http2: handler panicked") // wroteFrame is called on the serve goroutine with the result of @@ -1014,7 +1155,11 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) { // stateClosed after the RST_STREAM frame is // written. st.state = stateHalfClosedLocal - sc.resetStream(streamError(st.id, ErrCodeCancel)) + // Section 8.1: a server MAY request that the client abort + // transmission of a request without error by sending a + // RST_STREAM with an error code of NO_ERROR after sending + // a complete response. + sc.resetStream(streamError(st.id, ErrCodeNo)) case stateHalfClosedRemote: sc.closeStream(st, errHandlerComplete) } @@ -1086,33 +1231,43 @@ func (sc *serverConn) scheduleFrameWrite() { sc.inFrameScheduleLoop = false } -// startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the -// client we're gracefully shutting down. The connection isn't closed -// until all current streams are done. +// startGracefulShutdown gracefully shuts down a connection. This +// sends GOAWAY with ErrCodeNo to tell the client we're gracefully +// shutting down. The connection isn't closed until all current +// streams are done. +// +// startGracefulShutdown returns immediately; it does not wait until +// the connection has shut down. func (sc *serverConn) startGracefulShutdown() { - sc.goAwayIn(ErrCodeNo, 0) + sc.serveG.checkNotOn() // NOT + sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) }) } -func (sc *serverConn) goAway(code ErrCode) { - sc.serveG.check() - var forceCloseIn time.Duration - if code != ErrCodeNo { - forceCloseIn = 250 * time.Millisecond - } else { - // TODO: configurable - forceCloseIn = 1 * time.Second - } - sc.goAwayIn(code, forceCloseIn) +// After sending GOAWAY, the connection will close after goAwayTimeout. +// If we close the connection immediately after sending GOAWAY, there may +// be unsent data in our kernel receive buffer, which will cause the kernel +// to send a TCP RST on close() instead of a FIN. This RST will abort the +// connection immediately, whether or not the client had received the GOAWAY. +// +// Ideally we should delay for at least 1 RTT + epsilon so the client has +// a chance to read the GOAWAY and stop sending messages. Measuring RTT +// is hard, so we approximate with 1 second. See golang.org/issue/18701. +// +// This is a var so it can be shorter in tests, where all requests uses the +// loopback interface making the expected RTT very small. +// +// TODO: configurable? +var goAwayTimeout = 1 * time.Second + +func (sc *serverConn) startGracefulShutdownInternal() { + sc.goAway(ErrCodeNo) } -func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) { +func (sc *serverConn) goAway(code ErrCode) { sc.serveG.check() if sc.inGoAway { return } - if forceCloseIn != 0 { - sc.shutDownIn(forceCloseIn) - } sc.inGoAway = true sc.needToSendGoAway = true sc.goAwayCode = code @@ -1121,8 +1276,7 @@ func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) { func (sc *serverConn) shutDownIn(d time.Duration) { sc.serveG.check() - sc.shutdownTimer = time.NewTimer(d) - sc.shutdownTimerCh = sc.shutdownTimer.C + sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) } func (sc *serverConn) resetStream(se StreamError) { @@ -1305,6 +1459,9 @@ func (sc *serverConn) closeStream(st *stream, err error) { panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) } st.state = stateClosed + if st.writeDeadline != nil { + st.writeDeadline.Stop() + } if st.isPushed() { sc.curPushedStreams-- } else { @@ -1317,7 +1474,7 @@ func (sc *serverConn) closeStream(st *stream, err error) { sc.idleTimer.Reset(sc.srv.IdleTimeout) } if h1ServerKeepAlivesDisabled(sc.hs) { - sc.startGracefulShutdown() + sc.startGracefulShutdownInternal() } } if p := st.body; p != nil { @@ -1343,6 +1500,12 @@ func (sc *serverConn) processSettings(f *SettingsFrame) error { } return nil } + if f.NumSettings() > 100 || f.HasDuplicates() { + // This isn't actually in the spec, but hang up on + // suspiciously large settings frames or those with + // duplicate entries. + return ConnectionError(ErrCodeProtocol) + } if err := f.ForeachSetting(sc.processSetting); err != nil { return err } @@ -1395,9 +1558,9 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { // adjust the size of all stream flow control windows that it // maintains by the difference between the new value and the // old value." - old := sc.initialWindowSize - sc.initialWindowSize = int32(val) - growth := sc.initialWindowSize - old // may be negative + old := sc.initialStreamSendWindowSize + sc.initialStreamSendWindowSize = int32(val) + growth := int32(val) - old // may be negative for _, st := range sc.streams { if !st.flow.add(growth) { // 6.9.2 Initial Flow Control Window Size @@ -1431,6 +1594,12 @@ func (sc *serverConn) processData(f *DataFrame) error { // type PROTOCOL_ERROR." return ConnectionError(ErrCodeProtocol) } + // RFC 7540, sec 6.1: If a DATA frame is received whose stream is not in + // "open" or "half-closed (local)" state, the recipient MUST respond with a + // stream error (Section 5.4.2) of type STREAM_CLOSED. + if state == stateClosed { + return streamError(id, ErrCodeStreamClosed) + } if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued { // This includes sending a RST_STREAM if the stream is // in stateHalfClosedLocal (which currently means that @@ -1464,7 +1633,10 @@ func (sc *serverConn) processData(f *DataFrame) error { // Sender sending more than they'd declared? if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) - return streamError(id, ErrCodeStreamClosed) + // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the + // value of a content-length header field does not equal the sum of the + // DATA frame payload lengths that form the body. + return streamError(id, ErrCodeProtocol) } if f.Length > 0 { // Check whether the client has flow control quota. @@ -1504,7 +1676,7 @@ func (sc *serverConn) processGoAway(f *GoAwayFrame) error { } else { sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) } - sc.startGracefulShutdown() + sc.startGracefulShutdownInternal() // http://tools.ietf.org/html/rfc7540#section-6.8 // We should not create any new streams, which means we should disable push. sc.pushEnabled = false @@ -1543,6 +1715,12 @@ func (st *stream) copyTrailersToHandlerRequest() { } } +// onWriteTimeout is run on its own goroutine (from time.AfterFunc) +// when the stream's WriteTimeout has fired. +func (st *stream) onWriteTimeout() { + st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)}) +} + func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { sc.serveG.check() id := f.StreamID @@ -1568,6 +1746,13 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { // processing this frame. return nil } + // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than + // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in + // this state, it MUST respond with a stream error (Section 5.4.2) of + // type STREAM_CLOSED. + if st.state == stateHalfClosedRemote { + return streamError(id, ErrCodeStreamClosed) + } return st.processTrailerHeaders(f) } @@ -1668,7 +1853,7 @@ func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error { if st.trailer != nil { for _, hf := range f.RegularFields() { key := sc.canonicalHeader(hf.Name) - if !ValidTrailerHeader(key) { + if !httpguts.ValidTrailerHeader(key) { // TODO: send more details to the peer somehow. But http2 has // no way to send debug data at a stream level. Discuss with // HTTP folk. @@ -1709,7 +1894,7 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream panic("internal error: cannot create stream with id 0") } - ctx, cancelCtx := contextWithCancel(sc.baseCtx) + ctx, cancelCtx := context.WithCancel(sc.baseCtx) st := &stream{ sc: sc, id: id, @@ -1719,9 +1904,12 @@ func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream } st.cw.Init() st.flow.conn = &sc.flow // link to conn-level counter - st.flow.add(sc.initialWindowSize) - st.inflow.conn = &sc.inflow // link to conn-level counter - st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings + st.flow.add(sc.initialStreamSendWindowSize) + st.inflow.conn = &sc.inflow // link to conn-level counter + st.inflow.add(sc.srv.initialStreamRecvWindowSize()) + if sc.hs.WriteTimeout != 0 { + st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) + } sc.streams[id] = st sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID}) @@ -1785,16 +1973,14 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res return nil, nil, err } if bodyOpen { - st.reqBuf = getRequestBodyBuf() - req.Body.(*requestBody).pipe = &pipe{ - b: &fixedBuffer{buf: st.reqBuf}, - } - if vv, ok := rp.header["Content-Length"]; ok { req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) } else { req.ContentLength = -1 } + req.Body.(*requestBody).pipe = &pipe{ + b: &dataBuffer{expected: req.ContentLength}, + } } return rw, req, nil } @@ -1874,7 +2060,7 @@ func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*r Body: body, Trailer: trailer, } - req = requestWithContext(req, st.ctx) + req = req.WithContext(st.ctx) rws := responseWriterStatePool.Get().(*responseWriterState) bwSave := rws.bw @@ -1890,24 +2076,6 @@ func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*r return rw, req, nil } -var reqBodyCache = make(chan []byte, 8) - -func getRequestBodyBuf() []byte { - select { - case b := <-reqBodyCache: - return b - default: - return make([]byte, initialWindowSize) - } -} - -func putRequestBodyBuf(b []byte) { - select { - case reqBodyCache <- b: - default: - } -} - // Run on its own goroutine. func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { didPanic := true @@ -1920,7 +2088,7 @@ func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler stream: rw.rws.stream, }) // Same as net/http: - if shouldLogPanic(e) { + if e != nil && e != http.ErrAbortHandler { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] @@ -2003,12 +2171,6 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) { case <-sc.doneServing: } } - if err == io.EOF { - if buf := st.reqBuf; buf != nil { - st.reqBuf = nil // shouldn't matter; field unused by other - putRequestBodyBuf(buf) - } - } } func (sc *serverConn) noteBodyRead(st *stream, n int) { @@ -2103,8 +2265,8 @@ func (b *requestBody) Read(p []byte) (n int, err error) { return } -// responseWriter is the http.ResponseWriter implementation. It's -// intentionally small (1 pointer wide) to minimize garbage. The +// responseWriter is the http.ResponseWriter implementation. It's +// intentionally small (1 pointer wide) to minimize garbage. The // responseWriterState pointer inside is zeroed at the end of a // request (in handlerDone) and calls on the responseWriter thereafter // simply crash (caller's mistake), but the much larger responseWriterState @@ -2138,6 +2300,7 @@ type responseWriterState struct { wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. sentHeader bool // have we sent the header frame? handlerDone bool // handler has finished + dirty bool // a Write failed; don't reuse this responseWriterState sentContentLen int64 // non-zero if handler set a Content-Length header wroteBytes int64 @@ -2157,8 +2320,8 @@ func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) != // written in the trailers at the end of the response. func (rws *responseWriterState) declareTrailer(k string) { k = http.CanonicalHeaderKey(k) - if !ValidTrailerHeader(k) { - // Forbidden by RFC 2616 14.40. + if !httpguts.ValidTrailerHeader(k) { + // Forbidden by RFC 7230, section 4.1.2. rws.conn.logf("ignoring invalid trailer %q", k) return } @@ -2195,7 +2358,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { clen = strconv.Itoa(len(p)) } _, hasContentType := rws.snapHeader["Content-Type"] - if !hasContentType && bodyAllowedForStatus(rws.status) { + if !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 { ctype = http.DetectContentType(p) } var date string @@ -2208,6 +2371,19 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { foreachHeaderElement(v, rws.declareTrailer) } + // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2), + // but respect "Connection" == "close" to mean sending a GOAWAY and tearing + // down the TCP connection when idle, like we do for HTTP/1. + // TODO: remove more Connection-specific header fields here, in addition + // to "Connection". + if _, ok := rws.snapHeader["Connection"]; ok { + v := rws.snapHeader.Get("Connection") + delete(rws.snapHeader, "Connection") + if v == "close" { + rws.conn.startGracefulShutdown() + } + } + endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ streamID: rws.stream.id, @@ -2219,6 +2395,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { date: date, }) if err != nil { + rws.dirty = true return 0, err } if endStream { @@ -2240,6 +2417,7 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { if len(p) > 0 || endStream { // only send a 0 byte DATA frame if we're ending the stream. if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { + rws.dirty = true return 0, err } } @@ -2251,6 +2429,9 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { trailers: rws.trailers, endStream: true, }) + if err != nil { + rws.dirty = true + } return len(p), err } return len(p), nil @@ -2274,11 +2455,11 @@ const TrailerPrefix = "Trailer:" // after the header has already been flushed. Because the Go // ResponseWriter interface has no way to set Trailers (only the // Header), and because we didn't want to expand the ResponseWriter -// interface, and because nobody used trailers, and because RFC 2616 +// interface, and because nobody used trailers, and because RFC 7230 // says you SHOULD (but not must) predeclare any trailers in the // header, the official ResponseWriter rules said trailers in Go must // be predeclared, and then we reuse the same ResponseWriter.Header() -// map to mean both Headers and Trailers. When it's time to write the +// map to mean both Headers and Trailers. When it's time to write the // Trailers, we pick out the fields of Headers that were declared as // trailers. That worked for a while, until we found the first major // user of Trailers in the wild: gRPC (using them only over http2), @@ -2358,6 +2539,24 @@ func (w *responseWriter) Header() http.Header { return rws.handlerHeader } +// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode. +func checkWriteHeaderCode(code int) { + // Issue 22880: require valid WriteHeader status codes. + // For now we only enforce that it's three digits. + // In the future we might block things over 599 (600 and above aren't defined + // at http://httpwg.org/specs/rfc7231.html#status.codes) + // and we might block under 200 (once we have more mature 1xx support). + // But for now any three digits. + // + // We used to send "HTTP/1.1 000 0" on the wire in responses but there's + // no equivalent bogus thing we can realistically send in HTTP/2, + // so we'll consistently panic instead and help people find their bugs + // early. (We can't return an error from WriteHeader even if we wanted to.) + if code < 100 || code > 999 { + panic(fmt.Sprintf("invalid WriteHeader code %v", code)) + } +} + func (w *responseWriter) WriteHeader(code int) { rws := w.rws if rws == nil { @@ -2368,6 +2567,7 @@ func (w *responseWriter) WriteHeader(code int) { func (rws *responseWriterState) writeHeader(code int) { if !rws.wroteHeader { + checkWriteHeaderCode(code) rws.wroteHeader = true rws.status = code if len(rws.handlerHeader) > 0 { @@ -2390,7 +2590,7 @@ func cloneHeader(h http.Header) http.Header { // // * Handler calls w.Write or w.WriteString -> // * -> rws.bw (*bufio.Writer) -> -// * (Handler migth call Flush) +// * (Handler might call Flush) // * -> chunkWriter{rws} // * -> responseWriterState.writeChunk(p []byte) // * -> responseWriterState.writeChunk (most of the magic; see comment there) @@ -2429,10 +2629,19 @@ func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, func (w *responseWriter) handlerDone() { rws := w.rws + dirty := rws.dirty rws.handlerDone = true w.Flush() w.rws = nil - responseWriterStatePool.Put(rws) + if !dirty { + // Only recycle the pool if all prior Write calls to + // the serverConn goroutine completed successfully. If + // they returned earlier due to resets from the peer + // there might still be write goroutines outstanding + // from the serverConn referencing the rws memory. See + // issue 20704. + responseWriterStatePool.Put(rws) + } } // Push errors. @@ -2441,14 +2650,9 @@ var ( ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS") ) -// pushOptions is the internal version of http.PushOptions, which we -// cannot include here because it's only defined in Go 1.8 and later. -type pushOptions struct { - Method string - Header http.Header -} +var _ http.Pusher = (*responseWriter)(nil) -func (w *responseWriter) push(target string, opts pushOptions) error { +func (w *responseWriter) Push(target string, opts *http.PushOptions) error { st := w.rws.stream sc := st.sc sc.serveG.checkNotOn() @@ -2459,6 +2663,10 @@ func (w *responseWriter) push(target string, opts pushOptions) error { return ErrRecursivePush } + if opts == nil { + opts = new(http.PushOptions) + } + // Default options. if opts.Method == "" { opts.Method = "GET" @@ -2514,7 +2722,7 @@ func (w *responseWriter) push(target string, opts pushOptions) error { return fmt.Errorf("method %q must be GET or HEAD", opts.Method) } - msg := startPushRequest{ + msg := &startPushRequest{ parent: st, method: opts.Method, url: u, @@ -2527,7 +2735,7 @@ func (w *responseWriter) push(target string, opts pushOptions) error { return errClientDisconnected case <-st.cw: return errStreamClosed - case sc.wantStartPushCh <- msg: + case sc.serveMsgCh <- msg: } select { @@ -2549,7 +2757,7 @@ type startPushRequest struct { done chan error } -func (sc *serverConn) startPush(msg startPushRequest) { +func (sc *serverConn) startPush(msg *startPushRequest) { sc.serveG.check() // http://tools.ietf.org/html/rfc7540#section-6.6. @@ -2588,7 +2796,7 @@ func (sc *serverConn) startPush(msg startPushRequest) { // A server that is unable to establish a new stream identifier can send a GOAWAY // frame so that the client is forced to open a new connection for new streams. if sc.maxPushPromiseID+2 >= 1<<31 { - sc.startGracefulShutdown() + sc.startGracefulShutdownInternal() return 0, ErrPushLimitReached } sc.maxPushPromiseID += 2 @@ -2630,7 +2838,7 @@ func (sc *serverConn) startPush(msg startPushRequest) { } // foreachHeaderElement splits v according to the "#rule" construction -// in RFC 2616 section 2.1 and calls fn for each non-empty element. +// in RFC 7230 section 7 and calls fn for each non-empty element. func foreachHeaderElement(v string, fn func(string)) { v = textproto.TrimString(v) if v == "" { @@ -2678,66 +2886,6 @@ func new400Handler(err error) http.HandlerFunc { } } -// ValidTrailerHeader reports whether name is a valid header field name to appear -// in trailers. -// See: http://tools.ietf.org/html/rfc7230#section-4.1.2 -func ValidTrailerHeader(name string) bool { - name = http.CanonicalHeaderKey(name) - if strings.HasPrefix(name, "If-") || badTrailer[name] { - return false - } - return true -} - -var badTrailer = map[string]bool{ - "Authorization": true, - "Cache-Control": true, - "Connection": true, - "Content-Encoding": true, - "Content-Length": true, - "Content-Range": true, - "Content-Type": true, - "Expect": true, - "Host": true, - "Keep-Alive": true, - "Max-Forwards": true, - "Pragma": true, - "Proxy-Authenticate": true, - "Proxy-Authorization": true, - "Proxy-Connection": true, - "Range": true, - "Realm": true, - "Te": true, - "Trailer": true, - "Transfer-Encoding": true, - "Www-Authenticate": true, -} - -// h1ServerShutdownChan returns a channel that will be closed when the -// provided *http.Server wants to shut down. -// -// This is a somewhat hacky way to get at http1 innards. It works -// when the http2 code is bundled into the net/http package in the -// standard library. The alternatives ended up making the cmd/go tool -// depend on http Servers. This is the lightest option for now. -// This is tested via the TestServeShutdown* tests in net/http. -func h1ServerShutdownChan(hs *http.Server) <-chan struct{} { - if fn := testh1ServerShutdownChan; fn != nil { - return fn(hs) - } - var x interface{} = hs - type I interface { - getDoneChan() <-chan struct{} - } - if hs, ok := x.(I); ok { - return hs.getDoneChan() - } - return nil -} - -// optional test hook for h1ServerShutdownChan. -var testh1ServerShutdownChan func(hs *http.Server) <-chan struct{} - // h1ServerKeepAlivesDisabled reports whether hs has its keep-alives // disabled. See comments on h1ServerShutdownChan above for why // the code is written this way. |