diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/zstd/decoder.go')
-rw-r--r-- | vendor/github.com/klauspost/compress/zstd/decoder.go | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/vendor/github.com/klauspost/compress/zstd/decoder.go b/vendor/github.com/klauspost/compress/zstd/decoder.go index f06bff6f6..f4db3096a 100644 --- a/vendor/github.com/klauspost/compress/zstd/decoder.go +++ b/vendor/github.com/klauspost/compress/zstd/decoder.go @@ -127,6 +127,9 @@ func (d *Decoder) Read(p []byte) (int, error) { } } if len(d.current.b) > 0 { + if debug { + println("returning", n, "still bytes left:", len(d.current.b)) + } // Only return error at end of block return n, nil } @@ -159,6 +162,9 @@ func (d *Decoder) Reset(r io.Reader) error { // If bytes buffer and < 1MB, do sync decoding anyway. if bb, ok := r.(*bytes.Buffer); ok && bb.Len() < 1<<20 { + if debug { + println("*bytes.Buffer detected, doing sync decode, len:", bb.Len()) + } b := bb.Bytes() dst, err := d.DecodeAll(b, nil) if err == nil { @@ -167,6 +173,9 @@ func (d *Decoder) Reset(r io.Reader) error { d.current.b = dst d.current.err = err d.current.flushed = true + if debug { + println("sync decode to ", len(dst), "bytes, err:", err) + } return nil } @@ -193,7 +202,9 @@ func (d *Decoder) drainOutput() { d.current.cancel = nil } if d.current.d != nil { - println("re-adding current decoder", d.current.d, len(d.decoders)) + if debug { + printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders)) + } d.decoders <- d.current.d d.current.d = nil d.current.b = nil @@ -206,7 +217,9 @@ func (d *Decoder) drainOutput() { select { case v := <-d.current.output: if v.d != nil { - println("got decoder", v.d) + if debug { + printf("re-adding decoder %p", v.d) + } d.decoders <- v.d } if v.err == errEndOfStream { @@ -259,20 +272,22 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { if d.current.err == ErrDecoderClosed { return dst, ErrDecoderClosed } - //println(len(d.frames), len(d.decoders), d.current) + + // Grab a block decoder and frame decoder. block, frame := <-d.decoders, <-d.frames defer func() { + if debug { + printf("re-adding decoder: %p", block) + } d.decoders <- block frame.rawInput = nil + frame.bBuf = nil d.frames <- frame }() - if cap(dst) == 0 { - // Allocate 1MB by default. - dst = make([]byte, 0, 1<<20) - } - br := byteBuf(input) + frame.bBuf = input + for { - err := frame.reset(&br) + err := frame.reset(&frame.bBuf) if err == io.EOF { return dst, nil } @@ -290,11 +305,21 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { dst = dst2 } } + if cap(dst) == 0 { + // Allocate window size * 2 by default if nothing is provided and we didn't get frame content size. + size := frame.WindowSize * 2 + // Cap to 1 MB. + if size > 1<<20 { + size = 1 << 20 + } + dst = make([]byte, 0, frame.WindowSize) + } + dst, err = frame.runDecoder(dst, block) if err != nil { return dst, err } - if len(br) == 0 { + if len(frame.bBuf) == 0 { break } } @@ -305,6 +330,9 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) { // If an error occurs d.err will be set. func (d *Decoder) nextBlock() { if d.current.d != nil { + if debug { + printf("re-adding current decoder %p", d.current.d) + } d.decoders <- d.current.d d.current.d = nil } @@ -377,6 +405,9 @@ func (d *Decoder) startStreamDecoder(inStream chan decodeStream) { defer d.streamWg.Done() frame := newFrameDec(d.o) for stream := range inStream { + if debug { + println("got new stream") + } br := readerWrapper{r: stream.r} decodeStream: for { |