summaryrefslogtreecommitdiff
path: root/vendor/github.com/docker/spdystream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/docker/spdystream/stream.go')
-rw-r--r--vendor/github.com/docker/spdystream/stream.go327
1 files changed, 0 insertions, 327 deletions
diff --git a/vendor/github.com/docker/spdystream/stream.go b/vendor/github.com/docker/spdystream/stream.go
deleted file mode 100644
index f9e9ee267..000000000
--- a/vendor/github.com/docker/spdystream/stream.go
+++ /dev/null
@@ -1,327 +0,0 @@
-package spdystream
-
-import (
- "errors"
- "fmt"
- "io"
- "net"
- "net/http"
- "sync"
- "time"
-
- "github.com/docker/spdystream/spdy"
-)
-
-var (
- ErrUnreadPartialData = errors.New("unread partial data")
-)
-
-type Stream struct {
- streamId spdy.StreamId
- parent *Stream
- conn *Connection
- startChan chan error
-
- dataLock sync.RWMutex
- dataChan chan []byte
- unread []byte
-
- priority uint8
- headers http.Header
- headerChan chan http.Header
- finishLock sync.Mutex
- finished bool
- replyCond *sync.Cond
- replied bool
- closeLock sync.Mutex
- closeChan chan bool
-}
-
-// WriteData writes data to stream, sending a dataframe per call
-func (s *Stream) WriteData(data []byte, fin bool) error {
- s.waitWriteReply()
- var flags spdy.DataFlags
-
- if fin {
- flags = spdy.DataFlagFin
- s.finishLock.Lock()
- if s.finished {
- s.finishLock.Unlock()
- return ErrWriteClosedStream
- }
- s.finished = true
- s.finishLock.Unlock()
- }
-
- dataFrame := &spdy.DataFrame{
- StreamId: s.streamId,
- Flags: flags,
- Data: data,
- }
-
- debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
- return s.conn.framer.WriteFrame(dataFrame)
-}
-
-// Write writes bytes to a stream, calling write data for each call.
-func (s *Stream) Write(data []byte) (n int, err error) {
- err = s.WriteData(data, false)
- if err == nil {
- n = len(data)
- }
- return
-}
-
-// Read reads bytes from a stream, a single read will never get more
-// than what is sent on a single data frame, but a multiple calls to
-// read may get data from the same data frame.
-func (s *Stream) Read(p []byte) (n int, err error) {
- if s.unread == nil {
- select {
- case <-s.closeChan:
- return 0, io.EOF
- case read, ok := <-s.dataChan:
- if !ok {
- return 0, io.EOF
- }
- s.unread = read
- }
- }
- n = copy(p, s.unread)
- if n < len(s.unread) {
- s.unread = s.unread[n:]
- } else {
- s.unread = nil
- }
- return
-}
-
-// ReadData reads an entire data frame and returns the byte array
-// from the data frame. If there is unread data from the result
-// of a Read call, this function will return an ErrUnreadPartialData.
-func (s *Stream) ReadData() ([]byte, error) {
- debugMessage("(%p) Reading data from %d", s, s.streamId)
- if s.unread != nil {
- return nil, ErrUnreadPartialData
- }
- select {
- case <-s.closeChan:
- return nil, io.EOF
- case read, ok := <-s.dataChan:
- if !ok {
- return nil, io.EOF
- }
- return read, nil
- }
-}
-
-func (s *Stream) waitWriteReply() {
- if s.replyCond != nil {
- s.replyCond.L.Lock()
- for !s.replied {
- s.replyCond.Wait()
- }
- s.replyCond.L.Unlock()
- }
-}
-
-// Wait waits for the stream to receive a reply.
-func (s *Stream) Wait() error {
- return s.WaitTimeout(time.Duration(0))
-}
-
-// WaitTimeout waits for the stream to receive a reply or for timeout.
-// When the timeout is reached, ErrTimeout will be returned.
-func (s *Stream) WaitTimeout(timeout time.Duration) error {
- var timeoutChan <-chan time.Time
- if timeout > time.Duration(0) {
- timeoutChan = time.After(timeout)
- }
-
- select {
- case err := <-s.startChan:
- if err != nil {
- return err
- }
- break
- case <-timeoutChan:
- return ErrTimeout
- }
- return nil
-}
-
-// Close closes the stream by sending an empty data frame with the
-// finish flag set, indicating this side is finished with the stream.
-func (s *Stream) Close() error {
- select {
- case <-s.closeChan:
- // Stream is now fully closed
- s.conn.removeStream(s)
- default:
- break
- }
- return s.WriteData([]byte{}, true)
-}
-
-// Reset sends a reset frame, putting the stream into the fully closed state.
-func (s *Stream) Reset() error {
- s.conn.removeStream(s)
- return s.resetStream()
-}
-
-func (s *Stream) resetStream() error {
- // Always call closeRemoteChannels, even if s.finished is already true.
- // This makes it so that stream.Close() followed by stream.Reset() allows
- // stream.Read() to unblock.
- s.closeRemoteChannels()
-
- s.finishLock.Lock()
- if s.finished {
- s.finishLock.Unlock()
- return nil
- }
- s.finished = true
- s.finishLock.Unlock()
-
- resetFrame := &spdy.RstStreamFrame{
- StreamId: s.streamId,
- Status: spdy.Cancel,
- }
- return s.conn.framer.WriteFrame(resetFrame)
-}
-
-// CreateSubStream creates a stream using the current as the parent
-func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
- return s.conn.CreateStream(headers, s, fin)
-}
-
-// SetPriority sets the stream priority, does not affect the
-// remote priority of this stream after Open has been called.
-// Valid values are 0 through 7, 0 being the highest priority
-// and 7 the lowest.
-func (s *Stream) SetPriority(priority uint8) {
- s.priority = priority
-}
-
-// SendHeader sends a header frame across the stream
-func (s *Stream) SendHeader(headers http.Header, fin bool) error {
- return s.conn.sendHeaders(headers, s, fin)
-}
-
-// SendReply sends a reply on a stream, only valid to be called once
-// when handling a new stream
-func (s *Stream) SendReply(headers http.Header, fin bool) error {
- if s.replyCond == nil {
- return errors.New("cannot reply on initiated stream")
- }
- s.replyCond.L.Lock()
- defer s.replyCond.L.Unlock()
- if s.replied {
- return nil
- }
-
- err := s.conn.sendReply(headers, s, fin)
- if err != nil {
- return err
- }
-
- s.replied = true
- s.replyCond.Broadcast()
- return nil
-}
-
-// Refuse sends a reset frame with the status refuse, only
-// valid to be called once when handling a new stream. This
-// may be used to indicate that a stream is not allowed
-// when http status codes are not being used.
-func (s *Stream) Refuse() error {
- if s.replied {
- return nil
- }
- s.replied = true
- return s.conn.sendReset(spdy.RefusedStream, s)
-}
-
-// Cancel sends a reset frame with the status canceled. This
-// can be used at any time by the creator of the Stream to
-// indicate the stream is no longer needed.
-func (s *Stream) Cancel() error {
- return s.conn.sendReset(spdy.Cancel, s)
-}
-
-// ReceiveHeader receives a header sent on the other side
-// of the stream. This function will block until a header
-// is received or stream is closed.
-func (s *Stream) ReceiveHeader() (http.Header, error) {
- select {
- case <-s.closeChan:
- break
- case header, ok := <-s.headerChan:
- if !ok {
- return nil, fmt.Errorf("header chan closed")
- }
- return header, nil
- }
- return nil, fmt.Errorf("stream closed")
-}
-
-// Parent returns the parent stream
-func (s *Stream) Parent() *Stream {
- return s.parent
-}
-
-// Headers returns the headers used to create the stream
-func (s *Stream) Headers() http.Header {
- return s.headers
-}
-
-// String returns the string version of stream using the
-// streamId to uniquely identify the stream
-func (s *Stream) String() string {
- return fmt.Sprintf("stream:%d", s.streamId)
-}
-
-// Identifier returns a 32 bit identifier for the stream
-func (s *Stream) Identifier() uint32 {
- return uint32(s.streamId)
-}
-
-// IsFinished returns whether the stream has finished
-// sending data
-func (s *Stream) IsFinished() bool {
- return s.finished
-}
-
-// Implement net.Conn interface
-
-func (s *Stream) LocalAddr() net.Addr {
- return s.conn.conn.LocalAddr()
-}
-
-func (s *Stream) RemoteAddr() net.Addr {
- return s.conn.conn.RemoteAddr()
-}
-
-// TODO set per stream values instead of connection-wide
-
-func (s *Stream) SetDeadline(t time.Time) error {
- return s.conn.conn.SetDeadline(t)
-}
-
-func (s *Stream) SetReadDeadline(t time.Time) error {
- return s.conn.conn.SetReadDeadline(t)
-}
-
-func (s *Stream) SetWriteDeadline(t time.Time) error {
- return s.conn.conn.SetWriteDeadline(t)
-}
-
-func (s *Stream) closeRemoteChannels() {
- s.closeLock.Lock()
- defer s.closeLock.Unlock()
- select {
- case <-s.closeChan:
- default:
- close(s.closeChan)
- }
-}