package spdystream import ( "errors" "fmt" "io" "net" "net/http" "sync" "time" "github.com/docker/spdystream/spdy" ) var ( ErrUnreadPartialData = errors.New("unread partial data") ) type Stream struct { streamId spdy.StreamId parent *Stream conn *Connection startChan chan error dataLock sync.RWMutex dataChan chan []byte unread []byte priority uint8 headers http.Header headerChan chan http.Header finishLock sync.Mutex finished bool replyCond *sync.Cond replied bool closeLock sync.Mutex closeChan chan bool } // WriteData writes data to stream, sending a dataframe per call func (s *Stream) WriteData(data []byte, fin bool) error { s.waitWriteReply() var flags spdy.DataFlags if fin { flags = spdy.DataFlagFin s.finishLock.Lock() if s.finished { s.finishLock.Unlock() return ErrWriteClosedStream } s.finished = true s.finishLock.Unlock() } dataFrame := &spdy.DataFrame{ StreamId: s.streamId, Flags: flags, Data: data, } debugMessage("(%p) (%d) Writing data frame", s, s.streamId) return s.conn.framer.WriteFrame(dataFrame) } // Write writes bytes to a stream, calling write data for each call. func (s *Stream) Write(data []byte) (n int, err error) { err = s.WriteData(data, false) if err == nil { n = len(data) } return } // Read reads bytes from a stream, a single read will never get more // than what is sent on a single data frame, but a multiple calls to // read may get data from the same data frame. func (s *Stream) Read(p []byte) (n int, err error) { if s.unread == nil { select { case <-s.closeChan: return 0, io.EOF case read, ok := <-s.dataChan: if !ok { return 0, io.EOF } s.unread = read } } n = copy(p, s.unread) if n < len(s.unread) { s.unread = s.unread[n:] } else { s.unread = nil } return } // ReadData reads an entire data frame and returns the byte array // from the data frame. If there is unread data from the result // of a Read call, this function will return an ErrUnreadPartialData. func (s *Stream) ReadData() ([]byte, error) { debugMessage("(%p) Reading data from %d", s, s.streamId) if s.unread != nil { return nil, ErrUnreadPartialData } select { case <-s.closeChan: return nil, io.EOF case read, ok := <-s.dataChan: if !ok { return nil, io.EOF } return read, nil } } func (s *Stream) waitWriteReply() { if s.replyCond != nil { s.replyCond.L.Lock() for !s.replied { s.replyCond.Wait() } s.replyCond.L.Unlock() } } // Wait waits for the stream to receive a reply. func (s *Stream) Wait() error { return s.WaitTimeout(time.Duration(0)) } // WaitTimeout waits for the stream to receive a reply or for timeout. // When the timeout is reached, ErrTimeout will be returned. func (s *Stream) WaitTimeout(timeout time.Duration) error { var timeoutChan <-chan time.Time if timeout > time.Duration(0) { timeoutChan = time.After(timeout) } select { case err := <-s.startChan: if err != nil { return err } break case <-timeoutChan: return ErrTimeout } return nil } // Close closes the stream by sending an empty data frame with the // finish flag set, indicating this side is finished with the stream. func (s *Stream) Close() error { select { case <-s.closeChan: // Stream is now fully closed s.conn.removeStream(s) default: break } return s.WriteData([]byte{}, true) } // Reset sends a reset frame, putting the stream into the fully closed state. func (s *Stream) Reset() error { s.conn.removeStream(s) return s.resetStream() } func (s *Stream) resetStream() error { // Always call closeRemoteChannels, even if s.finished is already true. // This makes it so that stream.Close() followed by stream.Reset() allows // stream.Read() to unblock. s.closeRemoteChannels() s.finishLock.Lock() if s.finished { s.finishLock.Unlock() return nil } s.finished = true s.finishLock.Unlock() resetFrame := &spdy.RstStreamFrame{ StreamId: s.streamId, Status: spdy.Cancel, } return s.conn.framer.WriteFrame(resetFrame) } // CreateSubStream creates a stream using the current as the parent func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) { return s.conn.CreateStream(headers, s, fin) } // SetPriority sets the stream priority, does not affect the // remote priority of this stream after Open has been called. // Valid values are 0 through 7, 0 being the highest priority // and 7 the lowest. func (s *Stream) SetPriority(priority uint8) { s.priority = priority } // SendHeader sends a header frame across the stream func (s *Stream) SendHeader(headers http.Header, fin bool) error { return s.conn.sendHeaders(headers, s, fin) } // SendReply sends a reply on a stream, only valid to be called once // when handling a new stream func (s *Stream) SendReply(headers http.Header, fin bool) error { if s.replyCond == nil { return errors.New("cannot reply on initiated stream") } s.replyCond.L.Lock() defer s.replyCond.L.Unlock() if s.replied { return nil } err := s.conn.sendReply(headers, s, fin) if err != nil { return err } s.replied = true s.replyCond.Broadcast() return nil } // Refuse sends a reset frame with the status refuse, only // valid to be called once when handling a new stream. This // may be used to indicate that a stream is not allowed // when http status codes are not being used. func (s *Stream) Refuse() error { if s.replied { return nil } s.replied = true return s.conn.sendReset(spdy.RefusedStream, s) } // Cancel sends a reset frame with the status canceled. This // can be used at any time by the creator of the Stream to // indicate the stream is no longer needed. func (s *Stream) Cancel() error { return s.conn.sendReset(spdy.Cancel, s) } // ReceiveHeader receives a header sent on the other side // of the stream. This function will block until a header // is received or stream is closed. func (s *Stream) ReceiveHeader() (http.Header, error) { select { case <-s.closeChan: break case header, ok := <-s.headerChan: if !ok { return nil, fmt.Errorf("header chan closed") } return header, nil } return nil, fmt.Errorf("stream closed") } // Parent returns the parent stream func (s *Stream) Parent() *Stream { return s.parent } // Headers returns the headers used to create the stream func (s *Stream) Headers() http.Header { return s.headers } // String returns the string version of stream using the // streamId to uniquely identify the stream func (s *Stream) String() string { return fmt.Sprintf("stream:%d", s.streamId) } // Identifier returns a 32 bit identifier for the stream func (s *Stream) Identifier() uint32 { return uint32(s.streamId) } // IsFinished returns whether the stream has finished // sending data func (s *Stream) IsFinished() bool { return s.finished } // Implement net.Conn interface func (s *Stream) LocalAddr() net.Addr { return s.conn.conn.LocalAddr() } func (s *Stream) RemoteAddr() net.Addr { return s.conn.conn.RemoteAddr() } // TODO set per stream values instead of connection-wide func (s *Stream) SetDeadline(t time.Time) error { return s.conn.conn.SetDeadline(t) } func (s *Stream) SetReadDeadline(t time.Time) error { return s.conn.conn.SetReadDeadline(t) } func (s *Stream) SetWriteDeadline(t time.Time) error { return s.conn.conn.SetWriteDeadline(t) } func (s *Stream) closeRemoteChannels() { s.closeLock.Lock() defer s.closeLock.Unlock() select { case <-s.closeChan: default: close(s.closeChan) } }