aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/docker/spdystream/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/docker/spdystream/stream.go')
-rw-r--r--vendor/github.com/docker/spdystream/stream.go327
1 files changed, 327 insertions, 0 deletions
diff --git a/vendor/github.com/docker/spdystream/stream.go b/vendor/github.com/docker/spdystream/stream.go
new file mode 100644
index 000000000..f9e9ee267
--- /dev/null
+++ b/vendor/github.com/docker/spdystream/stream.go
@@ -0,0 +1,327 @@
+package spdystream
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/docker/spdystream/spdy"
+)
+
+var (
+ ErrUnreadPartialData = errors.New("unread partial data")
+)
+
+type Stream struct {
+ streamId spdy.StreamId
+ parent *Stream
+ conn *Connection
+ startChan chan error
+
+ dataLock sync.RWMutex
+ dataChan chan []byte
+ unread []byte
+
+ priority uint8
+ headers http.Header
+ headerChan chan http.Header
+ finishLock sync.Mutex
+ finished bool
+ replyCond *sync.Cond
+ replied bool
+ closeLock sync.Mutex
+ closeChan chan bool
+}
+
+// WriteData writes data to stream, sending a dataframe per call
+func (s *Stream) WriteData(data []byte, fin bool) error {
+ s.waitWriteReply()
+ var flags spdy.DataFlags
+
+ if fin {
+ flags = spdy.DataFlagFin
+ s.finishLock.Lock()
+ if s.finished {
+ s.finishLock.Unlock()
+ return ErrWriteClosedStream
+ }
+ s.finished = true
+ s.finishLock.Unlock()
+ }
+
+ dataFrame := &spdy.DataFrame{
+ StreamId: s.streamId,
+ Flags: flags,
+ Data: data,
+ }
+
+ debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
+ return s.conn.framer.WriteFrame(dataFrame)
+}
+
+// Write writes bytes to a stream, calling write data for each call.
+func (s *Stream) Write(data []byte) (n int, err error) {
+ err = s.WriteData(data, false)
+ if err == nil {
+ n = len(data)
+ }
+ return
+}
+
+// Read reads bytes from a stream, a single read will never get more
+// than what is sent on a single data frame, but a multiple calls to
+// read may get data from the same data frame.
+func (s *Stream) Read(p []byte) (n int, err error) {
+ if s.unread == nil {
+ select {
+ case <-s.closeChan:
+ return 0, io.EOF
+ case read, ok := <-s.dataChan:
+ if !ok {
+ return 0, io.EOF
+ }
+ s.unread = read
+ }
+ }
+ n = copy(p, s.unread)
+ if n < len(s.unread) {
+ s.unread = s.unread[n:]
+ } else {
+ s.unread = nil
+ }
+ return
+}
+
+// ReadData reads an entire data frame and returns the byte array
+// from the data frame. If there is unread data from the result
+// of a Read call, this function will return an ErrUnreadPartialData.
+func (s *Stream) ReadData() ([]byte, error) {
+ debugMessage("(%p) Reading data from %d", s, s.streamId)
+ if s.unread != nil {
+ return nil, ErrUnreadPartialData
+ }
+ select {
+ case <-s.closeChan:
+ return nil, io.EOF
+ case read, ok := <-s.dataChan:
+ if !ok {
+ return nil, io.EOF
+ }
+ return read, nil
+ }
+}
+
+func (s *Stream) waitWriteReply() {
+ if s.replyCond != nil {
+ s.replyCond.L.Lock()
+ for !s.replied {
+ s.replyCond.Wait()
+ }
+ s.replyCond.L.Unlock()
+ }
+}
+
+// Wait waits for the stream to receive a reply.
+func (s *Stream) Wait() error {
+ return s.WaitTimeout(time.Duration(0))
+}
+
+// WaitTimeout waits for the stream to receive a reply or for timeout.
+// When the timeout is reached, ErrTimeout will be returned.
+func (s *Stream) WaitTimeout(timeout time.Duration) error {
+ var timeoutChan <-chan time.Time
+ if timeout > time.Duration(0) {
+ timeoutChan = time.After(timeout)
+ }
+
+ select {
+ case err := <-s.startChan:
+ if err != nil {
+ return err
+ }
+ break
+ case <-timeoutChan:
+ return ErrTimeout
+ }
+ return nil
+}
+
+// Close closes the stream by sending an empty data frame with the
+// finish flag set, indicating this side is finished with the stream.
+func (s *Stream) Close() error {
+ select {
+ case <-s.closeChan:
+ // Stream is now fully closed
+ s.conn.removeStream(s)
+ default:
+ break
+ }
+ return s.WriteData([]byte{}, true)
+}
+
+// Reset sends a reset frame, putting the stream into the fully closed state.
+func (s *Stream) Reset() error {
+ s.conn.removeStream(s)
+ return s.resetStream()
+}
+
+func (s *Stream) resetStream() error {
+ // Always call closeRemoteChannels, even if s.finished is already true.
+ // This makes it so that stream.Close() followed by stream.Reset() allows
+ // stream.Read() to unblock.
+ s.closeRemoteChannels()
+
+ s.finishLock.Lock()
+ if s.finished {
+ s.finishLock.Unlock()
+ return nil
+ }
+ s.finished = true
+ s.finishLock.Unlock()
+
+ resetFrame := &spdy.RstStreamFrame{
+ StreamId: s.streamId,
+ Status: spdy.Cancel,
+ }
+ return s.conn.framer.WriteFrame(resetFrame)
+}
+
+// CreateSubStream creates a stream using the current as the parent
+func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
+ return s.conn.CreateStream(headers, s, fin)
+}
+
+// SetPriority sets the stream priority, does not affect the
+// remote priority of this stream after Open has been called.
+// Valid values are 0 through 7, 0 being the highest priority
+// and 7 the lowest.
+func (s *Stream) SetPriority(priority uint8) {
+ s.priority = priority
+}
+
+// SendHeader sends a header frame across the stream
+func (s *Stream) SendHeader(headers http.Header, fin bool) error {
+ return s.conn.sendHeaders(headers, s, fin)
+}
+
+// SendReply sends a reply on a stream, only valid to be called once
+// when handling a new stream
+func (s *Stream) SendReply(headers http.Header, fin bool) error {
+ if s.replyCond == nil {
+ return errors.New("cannot reply on initiated stream")
+ }
+ s.replyCond.L.Lock()
+ defer s.replyCond.L.Unlock()
+ if s.replied {
+ return nil
+ }
+
+ err := s.conn.sendReply(headers, s, fin)
+ if err != nil {
+ return err
+ }
+
+ s.replied = true
+ s.replyCond.Broadcast()
+ return nil
+}
+
+// Refuse sends a reset frame with the status refuse, only
+// valid to be called once when handling a new stream. This
+// may be used to indicate that a stream is not allowed
+// when http status codes are not being used.
+func (s *Stream) Refuse() error {
+ if s.replied {
+ return nil
+ }
+ s.replied = true
+ return s.conn.sendReset(spdy.RefusedStream, s)
+}
+
+// Cancel sends a reset frame with the status canceled. This
+// can be used at any time by the creator of the Stream to
+// indicate the stream is no longer needed.
+func (s *Stream) Cancel() error {
+ return s.conn.sendReset(spdy.Cancel, s)
+}
+
+// ReceiveHeader receives a header sent on the other side
+// of the stream. This function will block until a header
+// is received or stream is closed.
+func (s *Stream) ReceiveHeader() (http.Header, error) {
+ select {
+ case <-s.closeChan:
+ break
+ case header, ok := <-s.headerChan:
+ if !ok {
+ return nil, fmt.Errorf("header chan closed")
+ }
+ return header, nil
+ }
+ return nil, fmt.Errorf("stream closed")
+}
+
+// Parent returns the parent stream
+func (s *Stream) Parent() *Stream {
+ return s.parent
+}
+
+// Headers returns the headers used to create the stream
+func (s *Stream) Headers() http.Header {
+ return s.headers
+}
+
+// String returns the string version of stream using the
+// streamId to uniquely identify the stream
+func (s *Stream) String() string {
+ return fmt.Sprintf("stream:%d", s.streamId)
+}
+
+// Identifier returns a 32 bit identifier for the stream
+func (s *Stream) Identifier() uint32 {
+ return uint32(s.streamId)
+}
+
+// IsFinished returns whether the stream has finished
+// sending data
+func (s *Stream) IsFinished() bool {
+ return s.finished
+}
+
+// Implement net.Conn interface
+
+func (s *Stream) LocalAddr() net.Addr {
+ return s.conn.conn.LocalAddr()
+}
+
+func (s *Stream) RemoteAddr() net.Addr {
+ return s.conn.conn.RemoteAddr()
+}
+
+// TODO set per stream values instead of connection-wide
+
+func (s *Stream) SetDeadline(t time.Time) error {
+ return s.conn.conn.SetDeadline(t)
+}
+
+func (s *Stream) SetReadDeadline(t time.Time) error {
+ return s.conn.conn.SetReadDeadline(t)
+}
+
+func (s *Stream) SetWriteDeadline(t time.Time) error {
+ return s.conn.conn.SetWriteDeadline(t)
+}
+
+func (s *Stream) closeRemoteChannels() {
+ s.closeLock.Lock()
+ defer s.closeLock.Unlock()
+ select {
+ case <-s.closeChan:
+ default:
+ close(s.closeChan)
+ }
+}