aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/klauspost/compress/zstd/decoder.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/zstd/decoder.go')
-rw-r--r--vendor/github.com/klauspost/compress/zstd/decoder.go53
1 files changed, 25 insertions, 28 deletions
diff --git a/vendor/github.com/klauspost/compress/zstd/decoder.go b/vendor/github.com/klauspost/compress/zstd/decoder.go
index f593e464b..4d984c3b2 100644
--- a/vendor/github.com/klauspost/compress/zstd/decoder.go
+++ b/vendor/github.com/klauspost/compress/zstd/decoder.go
@@ -113,9 +113,6 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
// Returns the number of bytes written and any error that occurred.
// When the stream is done, io.EOF will be returned.
func (d *Decoder) Read(p []byte) (int, error) {
- if d.stream == nil {
- return 0, ErrDecoderNilInput
- }
var n int
for {
if len(d.current.b) > 0 {
@@ -138,7 +135,7 @@ func (d *Decoder) Read(p []byte) (int, error) {
}
}
if len(d.current.b) > 0 {
- if debug {
+ if debugDecoder {
println("returning", n, "still bytes left:", len(d.current.b))
}
// Only return error at end of block
@@ -147,7 +144,7 @@ func (d *Decoder) Read(p []byte) (int, error) {
if d.current.err != nil {
d.drainOutput()
}
- if debug {
+ if debugDecoder {
println("returning", n, d.current.err, len(d.decoders))
}
return n, d.current.err
@@ -167,20 +164,17 @@ func (d *Decoder) Reset(r io.Reader) error {
if r == nil {
d.current.err = ErrDecoderNilInput
+ if len(d.current.b) > 0 {
+ d.current.b = d.current.b[:0]
+ }
d.current.flushed = true
return nil
}
- if d.stream == nil {
- d.stream = make(chan decodeStream, 1)
- d.streamWg.Add(1)
- go d.startStreamDecoder(d.stream)
- }
-
- // If bytes buffer and < 1MB, do sync decoding anyway.
- if bb, ok := r.(byter); ok && bb.Len() < 1<<20 {
+ // If bytes buffer and < 5MB, do sync decoding anyway.
+ if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
bb2 := bb
- if debug {
+ if debugDecoder {
println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
}
b := bb2.Bytes()
@@ -196,12 +190,18 @@ func (d *Decoder) Reset(r io.Reader) error {
d.current.b = dst
d.current.err = err
d.current.flushed = true
- if debug {
+ if debugDecoder {
println("sync decode to", len(dst), "bytes, err:", err)
}
return nil
}
+ if d.stream == nil {
+ d.stream = make(chan decodeStream, 1)
+ d.streamWg.Add(1)
+ go d.startStreamDecoder(d.stream)
+ }
+
// Remove current block.
d.current.decodeOutput = decodeOutput{}
d.current.err = nil
@@ -225,7 +225,7 @@ func (d *Decoder) drainOutput() {
d.current.cancel = nil
}
if d.current.d != nil {
- if debug {
+ if debugDecoder {
printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
}
d.decoders <- d.current.d
@@ -238,7 +238,7 @@ func (d *Decoder) drainOutput() {
}
for v := range d.current.output {
if v.d != nil {
- if debug {
+ if debugDecoder {
printf("re-adding decoder %p", v.d)
}
d.decoders <- v.d
@@ -255,9 +255,6 @@ func (d *Decoder) drainOutput() {
// The return value n is the number of bytes written.
// Any error encountered during the write is also returned.
func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
- if d.stream == nil {
- return 0, ErrDecoderNilInput
- }
var n int64
for {
if len(d.current.b) > 0 {
@@ -297,7 +294,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
block := <-d.decoders
frame := block.localFrame
defer func() {
- if debug {
+ if debugDecoder {
printf("re-adding decoder: %p", block)
}
frame.rawInput = nil
@@ -310,7 +307,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
frame.history.reset()
err := frame.reset(&frame.bBuf)
if err == io.EOF {
- if debug {
+ if debugDecoder {
println("frame reset return EOF")
}
return dst, nil
@@ -355,7 +352,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
return dst, err
}
if len(frame.bBuf) == 0 {
- if debug {
+ if debugDecoder {
println("frame dbuf empty")
}
break
@@ -371,7 +368,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
// if no data was available without blocking.
func (d *Decoder) nextBlock(blocking bool) (ok bool) {
if d.current.d != nil {
- if debug {
+ if debugDecoder {
printf("re-adding current decoder %p", d.current.d)
}
d.decoders <- d.current.d
@@ -391,7 +388,7 @@ func (d *Decoder) nextBlock(blocking bool) (ok bool) {
return false
}
}
- if debug {
+ if debugDecoder {
println("got", len(d.current.b), "bytes, error:", d.current.err)
}
return true
@@ -485,7 +482,7 @@ func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
defer d.streamWg.Done()
frame := newFrameDec(d.o)
for stream := range inStream {
- if debug {
+ if debugDecoder {
println("got new stream")
}
br := readerWrapper{r: stream.r}
@@ -493,7 +490,7 @@ func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
for {
frame.history.reset()
err := frame.reset(&br)
- if debug && err != nil {
+ if debugDecoder && err != nil {
println("Frame decoder returned", err)
}
if err == nil && frame.DictionaryID != nil {
@@ -510,7 +507,7 @@ func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
}
break
}
- if debug {
+ if debugDecoder {
println("starting frame decoder")
}