summaryrefslogtreecommitdiff
path: root/vendor/github.com/docker/spdystream/connection.go
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2021-02-25 19:00:14 +0100
committerGitHub <noreply@github.com>2021-02-25 19:00:14 +0100
commitb220d6cd06360d6b868cd7fb7c32d2602aab69a5 (patch)
tree6777cc2c23306d1a6b87ef40b9fe4eab2764b7dd /vendor/github.com/docker/spdystream/connection.go
parent9ec8106841c55bc085012727748e2d73826be97d (diff)
parent24d9bda7ff8a3e6a9f249401e05e35e73284ae61 (diff)
downloadpodman-b220d6cd06360d6b868cd7fb7c32d2602aab69a5.tar.gz
podman-b220d6cd06360d6b868cd7fb7c32d2602aab69a5.tar.bz2
podman-b220d6cd06360d6b868cd7fb7c32d2602aab69a5.zip
Merge pull request #9518 from baude/pruneremotecommand
prune remotecommand dependency
Diffstat (limited to 'vendor/github.com/docker/spdystream/connection.go')
-rw-r--r--vendor/github.com/docker/spdystream/connection.go958
1 files changed, 0 insertions, 958 deletions
diff --git a/vendor/github.com/docker/spdystream/connection.go b/vendor/github.com/docker/spdystream/connection.go
deleted file mode 100644
index 6031a0db1..000000000
--- a/vendor/github.com/docker/spdystream/connection.go
+++ /dev/null
@@ -1,958 +0,0 @@
-package spdystream
-
-import (
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "sync"
- "time"
-
- "github.com/docker/spdystream/spdy"
-)
-
-var (
- ErrInvalidStreamId = errors.New("Invalid stream id")
- ErrTimeout = errors.New("Timeout occured")
- ErrReset = errors.New("Stream reset")
- ErrWriteClosedStream = errors.New("Write on closed stream")
-)
-
-const (
- FRAME_WORKERS = 5
- QUEUE_SIZE = 50
-)
-
-type StreamHandler func(stream *Stream)
-
-type AuthHandler func(header http.Header, slot uint8, parent uint32) bool
-
-type idleAwareFramer struct {
- f *spdy.Framer
- conn *Connection
- writeLock sync.Mutex
- resetChan chan struct{}
- setTimeoutLock sync.Mutex
- setTimeoutChan chan time.Duration
- timeout time.Duration
-}
-
-func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer {
- iaf := &idleAwareFramer{
- f: framer,
- resetChan: make(chan struct{}, 2),
- // setTimeoutChan needs to be buffered to avoid deadlocks when calling setIdleTimeout at about
- // the same time the connection is being closed
- setTimeoutChan: make(chan time.Duration, 1),
- }
- return iaf
-}
-
-func (i *idleAwareFramer) monitor() {
- var (
- timer *time.Timer
- expired <-chan time.Time
- resetChan = i.resetChan
- setTimeoutChan = i.setTimeoutChan
- )
-Loop:
- for {
- select {
- case timeout := <-i.setTimeoutChan:
- i.timeout = timeout
- if timeout == 0 {
- if timer != nil {
- timer.Stop()
- }
- } else {
- if timer == nil {
- timer = time.NewTimer(timeout)
- expired = timer.C
- } else {
- timer.Reset(timeout)
- }
- }
- case <-resetChan:
- if timer != nil && i.timeout > 0 {
- timer.Reset(i.timeout)
- }
- case <-expired:
- i.conn.streamCond.L.Lock()
- streams := i.conn.streams
- i.conn.streams = make(map[spdy.StreamId]*Stream)
- i.conn.streamCond.Broadcast()
- i.conn.streamCond.L.Unlock()
- go func() {
- for _, stream := range streams {
- stream.resetStream()
- }
- i.conn.Close()
- }()
- case <-i.conn.closeChan:
- if timer != nil {
- timer.Stop()
- }
-
- // Start a goroutine to drain resetChan. This is needed because we've seen
- // some unit tests with large numbers of goroutines get into a situation
- // where resetChan fills up, at least 1 call to Write() is still trying to
- // send to resetChan, the connection gets closed, and this case statement
- // attempts to grab the write lock that Write() already has, causing a
- // deadlock.
- //
- // See https://github.com/docker/spdystream/issues/49 for more details.
- go func() {
- for _ = range resetChan {
- }
- }()
-
- go func() {
- for _ = range setTimeoutChan {
- }
- }()
-
- i.writeLock.Lock()
- close(resetChan)
- i.resetChan = nil
- i.writeLock.Unlock()
-
- i.setTimeoutLock.Lock()
- close(i.setTimeoutChan)
- i.setTimeoutChan = nil
- i.setTimeoutLock.Unlock()
-
- break Loop
- }
- }
-
- // Drain resetChan
- for _ = range resetChan {
- }
-}
-
-func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
- i.writeLock.Lock()
- defer i.writeLock.Unlock()
- if i.resetChan == nil {
- return io.EOF
- }
- err := i.f.WriteFrame(frame)
- if err != nil {
- return err
- }
-
- i.resetChan <- struct{}{}
-
- return nil
-}
-
-func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
- frame, err := i.f.ReadFrame()
- if err != nil {
- return nil, err
- }
-
- // resetChan should never be closed since it is only closed
- // when the connection has closed its closeChan. This closure
- // only occurs after all Reads have finished
- // TODO (dmcgowan): refactor relationship into connection
- i.resetChan <- struct{}{}
-
- return frame, nil
-}
-
-func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration) {
- i.setTimeoutLock.Lock()
- defer i.setTimeoutLock.Unlock()
-
- if i.setTimeoutChan == nil {
- return
- }
-
- i.setTimeoutChan <- timeout
-}
-
-type Connection struct {
- conn net.Conn
- framer *idleAwareFramer
-
- closeChan chan bool
- goneAway bool
- lastStreamChan chan<- *Stream
- goAwayTimeout time.Duration
- closeTimeout time.Duration
-
- streamLock *sync.RWMutex
- streamCond *sync.Cond
- streams map[spdy.StreamId]*Stream
-
- nextIdLock sync.Mutex
- receiveIdLock sync.Mutex
- nextStreamId spdy.StreamId
- receivedStreamId spdy.StreamId
-
- pingIdLock sync.Mutex
- pingId uint32
- pingChans map[uint32]chan error
-
- shutdownLock sync.Mutex
- shutdownChan chan error
- hasShutdown bool
-
- // for testing https://github.com/docker/spdystream/pull/56
- dataFrameHandler func(*spdy.DataFrame) error
-}
-
-// NewConnection creates a new spdy connection from an existing
-// network connection.
-func NewConnection(conn net.Conn, server bool) (*Connection, error) {
- framer, framerErr := spdy.NewFramer(conn, conn)
- if framerErr != nil {
- return nil, framerErr
- }
- idleAwareFramer := newIdleAwareFramer(framer)
- var sid spdy.StreamId
- var rid spdy.StreamId
- var pid uint32
- if server {
- sid = 2
- rid = 1
- pid = 2
- } else {
- sid = 1
- rid = 2
- pid = 1
- }
-
- streamLock := new(sync.RWMutex)
- streamCond := sync.NewCond(streamLock)
-
- session := &Connection{
- conn: conn,
- framer: idleAwareFramer,
-
- closeChan: make(chan bool),
- goAwayTimeout: time.Duration(0),
- closeTimeout: time.Duration(0),
-
- streamLock: streamLock,
- streamCond: streamCond,
- streams: make(map[spdy.StreamId]*Stream),
- nextStreamId: sid,
- receivedStreamId: rid,
-
- pingId: pid,
- pingChans: make(map[uint32]chan error),
-
- shutdownChan: make(chan error),
- }
- session.dataFrameHandler = session.handleDataFrame
- idleAwareFramer.conn = session
- go idleAwareFramer.monitor()
-
- return session, nil
-}
-
-// Ping sends a ping frame across the connection and
-// returns the response time
-func (s *Connection) Ping() (time.Duration, error) {
- pid := s.pingId
- s.pingIdLock.Lock()
- if s.pingId > 0x7ffffffe {
- s.pingId = s.pingId - 0x7ffffffe
- } else {
- s.pingId = s.pingId + 2
- }
- s.pingIdLock.Unlock()
- pingChan := make(chan error)
- s.pingChans[pid] = pingChan
- defer delete(s.pingChans, pid)
-
- frame := &spdy.PingFrame{Id: pid}
- startTime := time.Now()
- writeErr := s.framer.WriteFrame(frame)
- if writeErr != nil {
- return time.Duration(0), writeErr
- }
- select {
- case <-s.closeChan:
- return time.Duration(0), errors.New("connection closed")
- case err, ok := <-pingChan:
- if ok && err != nil {
- return time.Duration(0), err
- }
- break
- }
- return time.Now().Sub(startTime), nil
-}
-
-// Serve handles frames sent from the server, including reply frames
-// which are needed to fully initiate connections. Both clients and servers
-// should call Serve in a separate goroutine before creating streams.
-func (s *Connection) Serve(newHandler StreamHandler) {
- // use a WaitGroup to wait for all frames to be drained after receiving
- // go-away.
- var wg sync.WaitGroup
-
- // Parition queues to ensure stream frames are handled
- // by the same worker, ensuring order is maintained
- frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
- for i := 0; i < FRAME_WORKERS; i++ {
- frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)
-
- // Ensure frame queue is drained when connection is closed
- go func(frameQueue *PriorityFrameQueue) {
- <-s.closeChan
- frameQueue.Drain()
- }(frameQueues[i])
-
- wg.Add(1)
- go func(frameQueue *PriorityFrameQueue) {
- // let the WaitGroup know this worker is done
- defer wg.Done()
-
- s.frameHandler(frameQueue, newHandler)
- }(frameQueues[i])
- }
-
- var (
- partitionRoundRobin int
- goAwayFrame *spdy.GoAwayFrame
- )
-Loop:
- for {
- readFrame, err := s.framer.ReadFrame()
- if err != nil {
- if err != io.EOF {
- fmt.Errorf("frame read error: %s", err)
- } else {
- debugMessage("(%p) EOF received", s)
- }
- break
- }
- var priority uint8
- var partition int
- switch frame := readFrame.(type) {
- case *spdy.SynStreamFrame:
- if s.checkStreamFrame(frame) {
- priority = frame.Priority
- partition = int(frame.StreamId % FRAME_WORKERS)
- debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
- s.addStreamFrame(frame)
- } else {
- debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
- continue
- }
- case *spdy.SynReplyFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.DataFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.RstStreamFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.HeadersFrame:
- priority = s.getStreamPriority(frame.StreamId)
- partition = int(frame.StreamId % FRAME_WORKERS)
- case *spdy.PingFrame:
- priority = 0
- partition = partitionRoundRobin
- partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
- case *spdy.GoAwayFrame:
- // hold on to the go away frame and exit the loop
- goAwayFrame = frame
- break Loop
- default:
- priority = 7
- partition = partitionRoundRobin
- partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
- }
- frameQueues[partition].Push(readFrame, priority)
- }
- close(s.closeChan)
-
- // wait for all frame handler workers to indicate they've drained their queues
- // before handling the go away frame
- wg.Wait()
-
- if goAwayFrame != nil {
- s.handleGoAwayFrame(goAwayFrame)
- }
-
- // now it's safe to close remote channels and empty s.streams
- s.streamCond.L.Lock()
- // notify streams that they're now closed, which will
- // unblock any stream Read() calls
- for _, stream := range s.streams {
- stream.closeRemoteChannels()
- }
- s.streams = make(map[spdy.StreamId]*Stream)
- s.streamCond.Broadcast()
- s.streamCond.L.Unlock()
-}
-
-func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
- for {
- popFrame := frameQueue.Pop()
- if popFrame == nil {
- return
- }
-
- var frameErr error
- switch frame := popFrame.(type) {
- case *spdy.SynStreamFrame:
- frameErr = s.handleStreamFrame(frame, newHandler)
- case *spdy.SynReplyFrame:
- frameErr = s.handleReplyFrame(frame)
- case *spdy.DataFrame:
- frameErr = s.dataFrameHandler(frame)
- case *spdy.RstStreamFrame:
- frameErr = s.handleResetFrame(frame)
- case *spdy.HeadersFrame:
- frameErr = s.handleHeaderFrame(frame)
- case *spdy.PingFrame:
- frameErr = s.handlePingFrame(frame)
- case *spdy.GoAwayFrame:
- frameErr = s.handleGoAwayFrame(frame)
- default:
- frameErr = fmt.Errorf("unhandled frame type: %T", frame)
- }
-
- if frameErr != nil {
- fmt.Errorf("frame handling error: %s", frameErr)
- }
- }
-}
-
-func (s *Connection) getStreamPriority(streamId spdy.StreamId) uint8 {
- stream, streamOk := s.getStream(streamId)
- if !streamOk {
- return 7
- }
- return stream.priority
-}
-
-func (s *Connection) addStreamFrame(frame *spdy.SynStreamFrame) {
- var parent *Stream
- if frame.AssociatedToStreamId != spdy.StreamId(0) {
- parent, _ = s.getStream(frame.AssociatedToStreamId)
- }
-
- stream := &Stream{
- streamId: frame.StreamId,
- parent: parent,
- conn: s,
- startChan: make(chan error),
- headers: frame.Headers,
- finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
- replyCond: sync.NewCond(new(sync.Mutex)),
- dataChan: make(chan []byte),
- headerChan: make(chan http.Header),
- closeChan: make(chan bool),
- }
- if frame.CFHeader.Flags&spdy.ControlFlagFin != 0x00 {
- stream.closeRemoteChannels()
- }
-
- s.addStream(stream)
-}
-
-// checkStreamFrame checks to see if a stream frame is allowed.
-// If the stream is invalid, then a reset frame with protocol error
-// will be returned.
-func (s *Connection) checkStreamFrame(frame *spdy.SynStreamFrame) bool {
- s.receiveIdLock.Lock()
- defer s.receiveIdLock.Unlock()
- if s.goneAway {
- return false
- }
- validationErr := s.validateStreamId(frame.StreamId)
- if validationErr != nil {
- go func() {
- resetErr := s.sendResetFrame(spdy.ProtocolError, frame.StreamId)
- if resetErr != nil {
- fmt.Errorf("reset error: %s", resetErr)
- }
- }()
- return false
- }
- return true
-}
-
-func (s *Connection) handleStreamFrame(frame *spdy.SynStreamFrame, newHandler StreamHandler) error {
- stream, ok := s.getStream(frame.StreamId)
- if !ok {
- return fmt.Errorf("Missing stream: %d", frame.StreamId)
- }
-
- newHandler(stream)
-
- return nil
-}
-
-func (s *Connection) handleReplyFrame(frame *spdy.SynReplyFrame) error {
- debugMessage("(%p) Reply frame received for %d", s, frame.StreamId)
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- debugMessage("Reply frame gone away for %d", frame.StreamId)
- // Stream has already gone away
- return nil
- }
- if stream.replied {
- // Stream has already received reply
- return nil
- }
- stream.replied = true
-
- // TODO Check for error
- if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
- s.remoteStreamFinish(stream)
- }
-
- close(stream.startChan)
-
- return nil
-}
-
-func (s *Connection) handleResetFrame(frame *spdy.RstStreamFrame) error {
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- // Stream has already been removed
- return nil
- }
- s.removeStream(stream)
- stream.closeRemoteChannels()
-
- if !stream.replied {
- stream.replied = true
- stream.startChan <- ErrReset
- close(stream.startChan)
- }
-
- stream.finishLock.Lock()
- stream.finished = true
- stream.finishLock.Unlock()
-
- return nil
-}
-
-func (s *Connection) handleHeaderFrame(frame *spdy.HeadersFrame) error {
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- // Stream has already gone away
- return nil
- }
- if !stream.replied {
- // No reply received...Protocol error?
- return nil
- }
-
- // TODO limit headers while not blocking (use buffered chan or goroutine?)
- select {
- case <-stream.closeChan:
- return nil
- case stream.headerChan <- frame.Headers:
- }
-
- if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
- s.remoteStreamFinish(stream)
- }
-
- return nil
-}
-
-func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
- debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
- stream, streamOk := s.getStream(frame.StreamId)
- if !streamOk {
- debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
- // Stream has already gone away
- return nil
- }
- if !stream.replied {
- debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
- // No reply received...Protocol error?
- return nil
- }
-
- debugMessage("(%p) (%d) Data frame handling", stream, stream.streamId)
- if len(frame.Data) > 0 {
- stream.dataLock.RLock()
- select {
- case <-stream.closeChan:
- debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
- case stream.dataChan <- frame.Data:
- debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
- }
- stream.dataLock.RUnlock()
- }
- if (frame.Flags & spdy.DataFlagFin) != 0x00 {
- s.remoteStreamFinish(stream)
- }
- return nil
-}
-
-func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error {
- if s.pingId&0x01 != frame.Id&0x01 {
- return s.framer.WriteFrame(frame)
- }
- pingChan, pingOk := s.pingChans[frame.Id]
- if pingOk {
- close(pingChan)
- }
- return nil
-}
-
-func (s *Connection) handleGoAwayFrame(frame *spdy.GoAwayFrame) error {
- debugMessage("(%p) Go away received", s)
- s.receiveIdLock.Lock()
- if s.goneAway {
- s.receiveIdLock.Unlock()
- return nil
- }
- s.goneAway = true
- s.receiveIdLock.Unlock()
-
- if s.lastStreamChan != nil {
- stream, _ := s.getStream(frame.LastGoodStreamId)
- go func() {
- s.lastStreamChan <- stream
- }()
- }
-
- // Do not block frame handler waiting for closure
- go s.shutdown(s.goAwayTimeout)
-
- return nil
-}
-
-func (s *Connection) remoteStreamFinish(stream *Stream) {
- stream.closeRemoteChannels()
-
- stream.finishLock.Lock()
- if stream.finished {
- // Stream is fully closed, cleanup
- s.removeStream(stream)
- }
- stream.finishLock.Unlock()
-}
-
-// CreateStream creates a new spdy stream using the parameters for
-// creating the stream frame. The stream frame will be sent upon
-// calling this function, however this function does not wait for
-// the reply frame. If waiting for the reply is desired, use
-// the stream Wait or WaitTimeout function on the stream returned
-// by this function.
-func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool) (*Stream, error) {
- // MUST synchronize stream creation (all the way to writing the frame)
- // as stream IDs **MUST** increase monotonically.
- s.nextIdLock.Lock()
- defer s.nextIdLock.Unlock()
-
- streamId := s.getNextStreamId()
- if streamId == 0 {
- return nil, fmt.Errorf("Unable to get new stream id")
- }
-
- stream := &Stream{
- streamId: streamId,
- parent: parent,
- conn: s,
- startChan: make(chan error),
- headers: headers,
- dataChan: make(chan []byte),
- headerChan: make(chan http.Header),
- closeChan: make(chan bool),
- }
-
- debugMessage("(%p) (%p) Create stream", s, stream)
-
- s.addStream(stream)
-
- return stream, s.sendStream(stream, fin)
-}
-
-func (s *Connection) shutdown(closeTimeout time.Duration) {
- // TODO Ensure this isn't called multiple times
- s.shutdownLock.Lock()
- if s.hasShutdown {
- s.shutdownLock.Unlock()
- return
- }
- s.hasShutdown = true
- s.shutdownLock.Unlock()
-
- var timeout <-chan time.Time
- if closeTimeout > time.Duration(0) {
- timeout = time.After(closeTimeout)
- }
- streamsClosed := make(chan bool)
-
- go func() {
- s.streamCond.L.Lock()
- for len(s.streams) > 0 {
- debugMessage("Streams opened: %d, %#v", len(s.streams), s.streams)
- s.streamCond.Wait()
- }
- s.streamCond.L.Unlock()
- close(streamsClosed)
- }()
-
- var err error
- select {
- case <-streamsClosed:
- // No active streams, close should be safe
- err = s.conn.Close()
- case <-timeout:
- // Force ungraceful close
- err = s.conn.Close()
- // Wait for cleanup to clear active streams
- <-streamsClosed
- }
-
- if err != nil {
- duration := 10 * time.Minute
- time.AfterFunc(duration, func() {
- select {
- case err, ok := <-s.shutdownChan:
- if ok {
- fmt.Errorf("Unhandled close error after %s: %s", duration, err)
- }
- default:
- }
- })
- s.shutdownChan <- err
- }
- close(s.shutdownChan)
-
- return
-}
-
-// Closes spdy connection by sending GoAway frame and initiating shutdown
-func (s *Connection) Close() error {
- s.receiveIdLock.Lock()
- if s.goneAway {
- s.receiveIdLock.Unlock()
- return nil
- }
- s.goneAway = true
- s.receiveIdLock.Unlock()
-
- var lastStreamId spdy.StreamId
- if s.receivedStreamId > 2 {
- lastStreamId = s.receivedStreamId - 2
- }
-
- goAwayFrame := &spdy.GoAwayFrame{
- LastGoodStreamId: lastStreamId,
- Status: spdy.GoAwayOK,
- }
-
- err := s.framer.WriteFrame(goAwayFrame)
- if err != nil {
- return err
- }
-
- go s.shutdown(s.closeTimeout)
-
- return nil
-}
-
-// CloseWait closes the connection and waits for shutdown
-// to finish. Note the underlying network Connection
-// is not closed until the end of shutdown.
-func (s *Connection) CloseWait() error {
- closeErr := s.Close()
- if closeErr != nil {
- return closeErr
- }
- shutdownErr, ok := <-s.shutdownChan
- if ok {
- return shutdownErr
- }
- return nil
-}
-
-// Wait waits for the connection to finish shutdown or for
-// the wait timeout duration to expire. This needs to be
-// called either after Close has been called or the GOAWAYFRAME
-// has been received. If the wait timeout is 0, this function
-// will block until shutdown finishes. If wait is never called
-// and a shutdown error occurs, that error will be logged as an
-// unhandled error.
-func (s *Connection) Wait(waitTimeout time.Duration) error {
- var timeout <-chan time.Time
- if waitTimeout > time.Duration(0) {
- timeout = time.After(waitTimeout)
- }
-
- select {
- case err, ok := <-s.shutdownChan:
- if ok {
- return err
- }
- case <-timeout:
- return ErrTimeout
- }
- return nil
-}
-
-// NotifyClose registers a channel to be called when the remote
-// peer inidicates connection closure. The last stream to be
-// received by the remote will be sent on the channel. The notify
-// timeout will determine the duration between go away received
-// and the connection being closed.
-func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {
- s.goAwayTimeout = timeout
- s.lastStreamChan = c
-}
-
-// SetCloseTimeout sets the amount of time close will wait for
-// streams to finish before terminating the underlying network
-// connection. Setting the timeout to 0 will cause close to
-// wait forever, which is the default.
-func (s *Connection) SetCloseTimeout(timeout time.Duration) {
- s.closeTimeout = timeout
-}
-
-// SetIdleTimeout sets the amount of time the connection may sit idle before
-// it is forcefully terminated.
-func (s *Connection) SetIdleTimeout(timeout time.Duration) {
- s.framer.setIdleTimeout(timeout)
-}
-
-func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
- var flags spdy.ControlFlags
- if fin {
- flags = spdy.ControlFlagFin
- }
-
- headerFrame := &spdy.HeadersFrame{
- StreamId: stream.streamId,
- Headers: headers,
- CFHeader: spdy.ControlFrameHeader{Flags: flags},
- }
-
- return s.framer.WriteFrame(headerFrame)
-}
-
-func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) error {
- var flags spdy.ControlFlags
- if fin {
- flags = spdy.ControlFlagFin
- }
-
- replyFrame := &spdy.SynReplyFrame{
- StreamId: stream.streamId,
- Headers: headers,
- CFHeader: spdy.ControlFrameHeader{Flags: flags},
- }
-
- return s.framer.WriteFrame(replyFrame)
-}
-
-func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.StreamId) error {
- resetFrame := &spdy.RstStreamFrame{
- StreamId: streamId,
- Status: status,
- }
-
- return s.framer.WriteFrame(resetFrame)
-}
-
-func (s *Connection) sendReset(status spdy.RstStreamStatus, stream *Stream) error {
- return s.sendResetFrame(status, stream.streamId)
-}
-
-func (s *Connection) sendStream(stream *Stream, fin bool) error {
- var flags spdy.ControlFlags
- if fin {
- flags = spdy.ControlFlagFin
- stream.finished = true
- }
-
- var parentId spdy.StreamId
- if stream.parent != nil {
- parentId = stream.parent.streamId
- }
-
- streamFrame := &spdy.SynStreamFrame{
- StreamId: spdy.StreamId(stream.streamId),
- AssociatedToStreamId: spdy.StreamId(parentId),
- Headers: stream.headers,
- CFHeader: spdy.ControlFrameHeader{Flags: flags},
- }
-
- return s.framer.WriteFrame(streamFrame)
-}
-
-// getNextStreamId returns the next sequential id
-// every call should produce a unique value or an error
-func (s *Connection) getNextStreamId() spdy.StreamId {
- sid := s.nextStreamId
- if sid > 0x7fffffff {
- return 0
- }
- s.nextStreamId = s.nextStreamId + 2
- return sid
-}
-
-// PeekNextStreamId returns the next sequential id and keeps the next id untouched
-func (s *Connection) PeekNextStreamId() spdy.StreamId {
- sid := s.nextStreamId
- return sid
-}
-
-func (s *Connection) validateStreamId(rid spdy.StreamId) error {
- if rid > 0x7fffffff || rid < s.receivedStreamId {
- return ErrInvalidStreamId
- }
- s.receivedStreamId = rid + 2
- return nil
-}
-
-func (s *Connection) addStream(stream *Stream) {
- s.streamCond.L.Lock()
- s.streams[stream.streamId] = stream
- debugMessage("(%p) (%p) Stream added, broadcasting: %d", s, stream, stream.streamId)
- s.streamCond.Broadcast()
- s.streamCond.L.Unlock()
-}
-
-func (s *Connection) removeStream(stream *Stream) {
- s.streamCond.L.Lock()
- delete(s.streams, stream.streamId)
- debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
- s.streamCond.Broadcast()
- s.streamCond.L.Unlock()
-}
-
-func (s *Connection) getStream(streamId spdy.StreamId) (stream *Stream, ok bool) {
- s.streamLock.RLock()
- stream, ok = s.streams[streamId]
- s.streamLock.RUnlock()
- return
-}
-
-// FindStream looks up the given stream id and either waits for the
-// stream to be found or returns nil if the stream id is no longer
-// valid.
-func (s *Connection) FindStream(streamId uint32) *Stream {
- var stream *Stream
- var ok bool
- s.streamCond.L.Lock()
- stream, ok = s.streams[spdy.StreamId(streamId)]
- debugMessage("(%p) Found stream %d? %t", s, spdy.StreamId(streamId), ok)
- for !ok && streamId >= uint32(s.receivedStreamId) {
- s.streamCond.Wait()
- stream, ok = s.streams[spdy.StreamId(streamId)]
- }
- s.streamCond.L.Unlock()
- return stream
-}
-
-func (s *Connection) CloseChan() <-chan bool {
- return s.closeChan
-}