aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/klauspost/compress/zstd
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/zstd')
-rw-r--r--vendor/github.com/klauspost/compress/zstd/README.md40
-rw-r--r--vendor/github.com/klauspost/compress/zstd/bitreader.go4
-rw-r--r--vendor/github.com/klauspost/compress/zstd/blockdec.go422
-rw-r--r--vendor/github.com/klauspost/compress/zstd/bytebuf.go3
-rw-r--r--vendor/github.com/klauspost/compress/zstd/decoder.go626
-rw-r--r--vendor/github.com/klauspost/compress/zstd/decoder_options.go24
-rw-r--r--vendor/github.com/klauspost/compress/zstd/encoder.go66
-rw-r--r--vendor/github.com/klauspost/compress/zstd/encoder_options.go1
-rw-r--r--vendor/github.com/klauspost/compress/zstd/framedec.go202
-rw-r--r--vendor/github.com/klauspost/compress/zstd/fuzz.go11
-rw-r--r--vendor/github.com/klauspost/compress/zstd/fuzz_none.go11
-rw-r--r--vendor/github.com/klauspost/compress/zstd/history.go46
-rw-r--r--vendor/github.com/klauspost/compress/zstd/seqdec.go335
-rw-r--r--vendor/github.com/klauspost/compress/zstd/zstd.go4
14 files changed, 1163 insertions, 632 deletions
diff --git a/vendor/github.com/klauspost/compress/zstd/README.md b/vendor/github.com/klauspost/compress/zstd/README.md
index c8f0f16fc..c876c591a 100644
--- a/vendor/github.com/klauspost/compress/zstd/README.md
+++ b/vendor/github.com/klauspost/compress/zstd/README.md
@@ -78,6 +78,9 @@ of a stream. This is independent of the `WithEncoderConcurrency(n)`, but that is
in the future. So if you want to limit concurrency for future updates, specify the concurrency
you would like.
+If you would like stream encoding to be done without spawning async goroutines, use `WithEncoderConcurrency(1)`
+which will compress input as each block is completed, blocking on writes until each has completed.
+
You can specify your desired compression level using `WithEncoderLevel()` option. Currently only pre-defined
compression settings can be specified.
@@ -104,7 +107,8 @@ and seems to ignore concatenated streams, even though [it is part of the spec](h
For compressing small blocks, the returned encoder has a function called `EncodeAll(src, dst []byte) []byte`.
`EncodeAll` will encode all input in src and append it to dst.
-This function can be called concurrently, but each call will only run on a single goroutine.
+This function can be called concurrently.
+Each call will only run on a same goroutine as the caller.
Encoded blocks can be concatenated and the result will be the combined input stream.
Data compressed with EncodeAll can be decoded with the Decoder, using either a stream or `DecodeAll`.
@@ -283,8 +287,13 @@ func Decompress(in io.Reader, out io.Writer) error {
}
```
-It is important to use the "Close" function when you no longer need the Reader to stop running goroutines.
-See "Allocation-less operation" below.
+It is important to use the "Close" function when you no longer need the Reader to stop running goroutines,
+when running with default settings.
+Goroutines will exit once an error has been returned, including `io.EOF` at the end of a stream.
+
+Streams are decoded concurrently in 4 asynchronous stages to give the best possible throughput.
+However, if you prefer synchronous decompression, use `WithDecoderConcurrency(1)` which will decompress data
+as it is being requested only.
For decoding buffers, it could look something like this:
@@ -293,7 +302,7 @@ import "github.com/klauspost/compress/zstd"
// Create a reader that caches decompressors.
// For this operation type we supply a nil Reader.
-var decoder, _ = zstd.NewReader(nil)
+var decoder, _ = zstd.NewReader(nil, WithDecoderConcurrency(0))
// Decompress a buffer. We don't supply a destination buffer,
// so it will be allocated by the decoder.
@@ -303,9 +312,12 @@ func Decompress(src []byte) ([]byte, error) {
```
Both of these cases should provide the functionality needed.
-The decoder can be used for *concurrent* decompression of multiple buffers.
+The decoder can be used for *concurrent* decompression of multiple buffers.
+By default 4 decompressors will be created.
+
It will only allow a certain number of concurrent operations to run.
-To tweak that yourself use the `WithDecoderConcurrency(n)` option when creating the decoder.
+To tweak that yourself use the `WithDecoderConcurrency(n)` option when creating the decoder.
+It is possible to use `WithDecoderConcurrency(0)` to create GOMAXPROCS decoders.
### Dictionaries
@@ -357,19 +369,21 @@ In this case no unneeded allocations should be made.
The buffer decoder does everything on the same goroutine and does nothing concurrently.
It can however decode several buffers concurrently. Use `WithDecoderConcurrency(n)` to limit that.
-The stream decoder operates on
+The stream decoder will create goroutines that:
-* One goroutine reads input and splits the input to several block decoders.
-* A number of decoders will decode blocks.
-* A goroutine coordinates these blocks and sends history from one to the next.
+1) Reads input and splits the input into blocks.
+2) Decompression of literals.
+3) Decompression of sequences.
+4) Reconstruction of output stream.
So effectively this also means the decoder will "read ahead" and prepare data to always be available for output.
+The concurrency level will, for streams, determine how many blocks ahead the compression will start.
+
Since "blocks" are quite dependent on the output of the previous block stream decoding will only have limited concurrency.
-In practice this means that concurrency is often limited to utilizing about 2 cores effectively.
-
-
+In practice this means that concurrency is often limited to utilizing about 3 cores effectively.
+
### Benchmarks
These are some examples of performance compared to [datadog cgo library](https://github.com/DataDog/zstd).
diff --git a/vendor/github.com/klauspost/compress/zstd/bitreader.go b/vendor/github.com/klauspost/compress/zstd/bitreader.go
index 753d17df6..d7cd15ba2 100644
--- a/vendor/github.com/klauspost/compress/zstd/bitreader.go
+++ b/vendor/github.com/klauspost/compress/zstd/bitreader.go
@@ -7,6 +7,7 @@ package zstd
import (
"encoding/binary"
"errors"
+ "fmt"
"io"
"math/bits"
)
@@ -132,6 +133,9 @@ func (b *bitReader) remain() uint {
func (b *bitReader) close() error {
// Release reference.
b.in = nil
+ if !b.finished() {
+ return fmt.Errorf("%d extra bits on block, should be 0", b.remain())
+ }
if b.bitsRead > 64 {
return io.ErrUnexpectedEOF
}
diff --git a/vendor/github.com/klauspost/compress/zstd/blockdec.go b/vendor/github.com/klauspost/compress/zstd/blockdec.go
index dc587b2c9..607b62ee3 100644
--- a/vendor/github.com/klauspost/compress/zstd/blockdec.go
+++ b/vendor/github.com/klauspost/compress/zstd/blockdec.go
@@ -76,16 +76,25 @@ type blockDec struct {
// Window size of the block.
WindowSize uint64
- history chan *history
- input chan struct{}
- result chan decodeOutput
- err error
- decWG sync.WaitGroup
+ err error
+
+ // Check against this crc
+ checkCRC []byte
// Frame to use for singlethreaded decoding.
// Should not be used by the decoder itself since parent may be another frame.
localFrame *frameDec
+ sequence []seqVals
+
+ async struct {
+ newHist *history
+ literals []byte
+ seqData []byte
+ seqSize int // Size of uncompressed sequences
+ fcs uint64
+ }
+
// Block is RLE, this is the size.
RLESize uint32
tmp [4]byte
@@ -108,13 +117,8 @@ func (b *blockDec) String() string {
func newBlockDec(lowMem bool) *blockDec {
b := blockDec{
- lowMem: lowMem,
- result: make(chan decodeOutput, 1),
- input: make(chan struct{}, 1),
- history: make(chan *history, 1),
+ lowMem: lowMem,
}
- b.decWG.Add(1)
- go b.startDecoder()
return &b
}
@@ -137,6 +141,12 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error {
case blockTypeReserved:
return ErrReservedBlockType
case blockTypeRLE:
+ if cSize > maxCompressedBlockSize || cSize > int(b.WindowSize) {
+ if debugDecoder {
+ printf("rle block too big: csize:%d block: %+v\n", uint64(cSize), b)
+ }
+ return ErrWindowSizeExceeded
+ }
b.RLESize = uint32(cSize)
if b.lowMem {
maxSize = cSize
@@ -158,6 +168,13 @@ func (b *blockDec) reset(br byteBuffer, windowSize uint64) error {
return ErrCompressedSizeTooBig
}
case blockTypeRaw:
+ if cSize > maxCompressedBlockSize || cSize > int(b.WindowSize) {
+ if debugDecoder {
+ printf("rle block too big: csize:%d block: %+v\n", uint64(cSize), b)
+ }
+ return ErrWindowSizeExceeded
+ }
+
b.RLESize = 0
// We do not need a destination for raw blocks.
maxSize = -1
@@ -192,85 +209,14 @@ func (b *blockDec) sendErr(err error) {
b.Last = true
b.Type = blockTypeReserved
b.err = err
- b.input <- struct{}{}
}
// Close will release resources.
// Closed blockDec cannot be reset.
func (b *blockDec) Close() {
- close(b.input)
- close(b.history)
- close(b.result)
- b.decWG.Wait()
-}
-
-// decodeAsync will prepare decoding the block when it receives input.
-// This will separate output and history.
-func (b *blockDec) startDecoder() {
- defer b.decWG.Done()
- for range b.input {
- //println("blockDec: Got block input")
- switch b.Type {
- case blockTypeRLE:
- if cap(b.dst) < int(b.RLESize) {
- if b.lowMem {
- b.dst = make([]byte, b.RLESize)
- } else {
- b.dst = make([]byte, maxBlockSize)
- }
- }
- o := decodeOutput{
- d: b,
- b: b.dst[:b.RLESize],
- err: nil,
- }
- v := b.data[0]
- for i := range o.b {
- o.b[i] = v
- }
- hist := <-b.history
- hist.append(o.b)
- b.result <- o
- case blockTypeRaw:
- o := decodeOutput{
- d: b,
- b: b.data,
- err: nil,
- }
- hist := <-b.history
- hist.append(o.b)
- b.result <- o
- case blockTypeCompressed:
- b.dst = b.dst[:0]
- err := b.decodeCompressed(nil)
- o := decodeOutput{
- d: b,
- b: b.dst,
- err: err,
- }
- if debugDecoder {
- println("Decompressed to", len(b.dst), "bytes, error:", err)
- }
- b.result <- o
- case blockTypeReserved:
- // Used for returning errors.
- <-b.history
- b.result <- decodeOutput{
- d: b,
- b: nil,
- err: b.err,
- }
- default:
- panic("Invalid block type")
- }
- if debugDecoder {
- println("blockDec: Finished block")
- }
- }
}
-// decodeAsync will prepare decoding the block when it receives the history.
-// If history is provided, it will not fetch it from the channel.
+// decodeBuf
func (b *blockDec) decodeBuf(hist *history) error {
switch b.Type {
case blockTypeRLE:
@@ -293,14 +239,23 @@ func (b *blockDec) decodeBuf(hist *history) error {
return nil
case blockTypeCompressed:
saved := b.dst
- b.dst = hist.b
- hist.b = nil
+ // Append directly to history
+ if hist.ignoreBuffer == 0 {
+ b.dst = hist.b
+ hist.b = nil
+ } else {
+ b.dst = b.dst[:0]
+ }
err := b.decodeCompressed(hist)
if debugDecoder {
println("Decompressed to total", len(b.dst), "bytes, hash:", xxhash.Sum64(b.dst), "error:", err)
}
- hist.b = b.dst
- b.dst = saved
+ if hist.ignoreBuffer == 0 {
+ hist.b = b.dst
+ b.dst = saved
+ } else {
+ hist.appendKeep(b.dst)
+ }
return err
case blockTypeReserved:
// Used for returning errors.
@@ -310,30 +265,18 @@ func (b *blockDec) decodeBuf(hist *history) error {
}
}
-// decodeCompressed will start decompressing a block.
-// If no history is supplied the decoder will decodeAsync as much as possible
-// before fetching from blockDec.history
-func (b *blockDec) decodeCompressed(hist *history) error {
- in := b.data
- delayedHistory := hist == nil
-
- if delayedHistory {
- // We must always grab history.
- defer func() {
- if hist == nil {
- <-b.history
- }
- }()
- }
+func (b *blockDec) decodeLiterals(in []byte, hist *history) (remain []byte, err error) {
// There must be at least one byte for Literals_Block_Type and one for Sequences_Section_Header
if len(in) < 2 {
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
+
litType := literalsBlockType(in[0] & 3)
var litRegenSize int
var litCompSize int
sizeFormat := (in[0] >> 2) & 3
var fourStreams bool
+ var literals []byte
switch litType {
case literalsBlockRaw, literalsBlockRLE:
switch sizeFormat {
@@ -349,7 +292,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
// Regenerated_Size uses 20 bits (0-1048575). Literals_Section_Header uses 3 bytes.
if len(in) < 3 {
println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in))
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
litRegenSize = int(in[0]>>4) + (int(in[1]) << 4) + (int(in[2]) << 12)
in = in[3:]
@@ -360,7 +303,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
// Both Regenerated_Size and Compressed_Size use 10 bits (0-1023).
if len(in) < 3 {
println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in))
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
n := uint64(in[0]>>4) + (uint64(in[1]) << 4) + (uint64(in[2]) << 12)
litRegenSize = int(n & 1023)
@@ -371,7 +314,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
fourStreams = true
if len(in) < 4 {
println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in))
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
n := uint64(in[0]>>4) + (uint64(in[1]) << 4) + (uint64(in[2]) << 12) + (uint64(in[3]) << 20)
litRegenSize = int(n & 16383)
@@ -381,7 +324,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
fourStreams = true
if len(in) < 5 {
println("too small: litType:", litType, " sizeFormat", sizeFormat, len(in))
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
n := uint64(in[0]>>4) + (uint64(in[1]) << 4) + (uint64(in[2]) << 12) + (uint64(in[3]) << 20) + (uint64(in[4]) << 28)
litRegenSize = int(n & 262143)
@@ -392,13 +335,15 @@ func (b *blockDec) decodeCompressed(hist *history) error {
if debugDecoder {
println("literals type:", litType, "litRegenSize:", litRegenSize, "litCompSize:", litCompSize, "sizeFormat:", sizeFormat, "4X:", fourStreams)
}
- var literals []byte
- var huff *huff0.Scratch
+ if litRegenSize > int(b.WindowSize) || litRegenSize > maxCompressedBlockSize {
+ return in, ErrWindowSizeExceeded
+ }
+
switch litType {
case literalsBlockRaw:
if len(in) < litRegenSize {
println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", litRegenSize)
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
literals = in[:litRegenSize]
in = in[litRegenSize:]
@@ -406,7 +351,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
case literalsBlockRLE:
if len(in) < 1 {
println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", 1)
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
if cap(b.literalBuf) < litRegenSize {
if b.lowMem {
@@ -417,7 +362,6 @@ func (b *blockDec) decodeCompressed(hist *history) error {
b.literalBuf = make([]byte, litRegenSize)
} else {
b.literalBuf = make([]byte, litRegenSize, maxCompressedLiteralSize)
-
}
}
}
@@ -433,7 +377,7 @@ func (b *blockDec) decodeCompressed(hist *history) error {
case literalsBlockTreeless:
if len(in) < litCompSize {
println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", litCompSize)
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
// Store compressed literals, so we defer decoding until we get history.
literals = in[:litCompSize]
@@ -441,31 +385,65 @@ func (b *blockDec) decodeCompressed(hist *history) error {
if debugDecoder {
printf("Found %d compressed literals\n", litCompSize)
}
+ huff := hist.huffTree
+ if huff == nil {
+ return in, errors.New("literal block was treeless, but no history was defined")
+ }
+ // Ensure we have space to store it.
+ if cap(b.literalBuf) < litRegenSize {
+ if b.lowMem {
+ b.literalBuf = make([]byte, 0, litRegenSize)
+ } else {
+ b.literalBuf = make([]byte, 0, maxCompressedLiteralSize)
+ }
+ }
+ var err error
+ // Use our out buffer.
+ huff.MaxDecodedSize = maxCompressedBlockSize
+ if fourStreams {
+ literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals)
+ } else {
+ literals, err = huff.Decoder().Decompress1X(b.literalBuf[:0:litRegenSize], literals)
+ }
+ // Make sure we don't leak our literals buffer
+ if err != nil {
+ println("decompressing literals:", err)
+ return in, err
+ }
+ if len(literals) != litRegenSize {
+ return in, fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals))
+ }
+
case literalsBlockCompressed:
if len(in) < litCompSize {
println("too small: litType:", litType, " sizeFormat", sizeFormat, "remain:", len(in), "want:", litCompSize)
- return ErrBlockTooSmall
+ return in, ErrBlockTooSmall
}
literals = in[:litCompSize]
in = in[litCompSize:]
- huff = huffDecoderPool.Get().(*huff0.Scratch)
- var err error
// Ensure we have space to store it.
if cap(b.literalBuf) < litRegenSize {
if b.lowMem {
b.literalBuf = make([]byte, 0, litRegenSize)
} else {
- b.literalBuf = make([]byte, 0, maxCompressedLiteralSize)
+ b.literalBuf = make([]byte, 0, maxCompressedBlockSize)
}
}
- if huff == nil {
- huff = &huff0.Scratch{}
+ huff := hist.huffTree
+ if huff == nil || (hist.dict != nil && huff == hist.dict.litEnc) {
+ huff = huffDecoderPool.Get().(*huff0.Scratch)
+ if huff == nil {
+ huff = &huff0.Scratch{}
+ }
}
+ var err error
huff, literals, err = huff0.ReadTable(literals, huff)
if err != nil {
println("reading huffman table:", err)
- return err
+ return in, err
}
+ hist.huffTree = huff
+ huff.MaxDecodedSize = maxCompressedBlockSize
// Use our out buffer.
if fourStreams {
literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals)
@@ -474,24 +452,52 @@ func (b *blockDec) decodeCompressed(hist *history) error {
}
if err != nil {
println("decoding compressed literals:", err)
- return err
+ return in, err
}
// Make sure we don't leak our literals buffer
if len(literals) != litRegenSize {
- return fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals))
+ return in, fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals))
}
if debugDecoder {
printf("Decompressed %d literals into %d bytes\n", litCompSize, litRegenSize)
}
}
+ hist.decoders.literals = literals
+ return in, nil
+}
+
+// decodeCompressed will start decompressing a block.
+func (b *blockDec) decodeCompressed(hist *history) error {
+ in := b.data
+ in, err := b.decodeLiterals(in, hist)
+ if err != nil {
+ return err
+ }
+ err = b.prepareSequences(in, hist)
+ if err != nil {
+ return err
+ }
+ if hist.decoders.nSeqs == 0 {
+ b.dst = append(b.dst, hist.decoders.literals...)
+ return nil
+ }
+ err = hist.decoders.decodeSync(hist)
+ if err != nil {
+ return err
+ }
+ b.dst = hist.decoders.out
+ hist.recentOffsets = hist.decoders.prevOffset
+ return nil
+}
+func (b *blockDec) prepareSequences(in []byte, hist *history) (err error) {
// Decode Sequences
// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#sequences-section
if len(in) < 1 {
return ErrBlockTooSmall
}
+ var nSeqs int
seqHeader := in[0]
- nSeqs := 0
switch {
case seqHeader == 0:
in = in[1:]
@@ -512,7 +518,8 @@ func (b *blockDec) decodeCompressed(hist *history) error {
in = in[3:]
}
- var seqs = &sequenceDecs{}
+ var seqs = &hist.decoders
+ seqs.nSeqs = nSeqs
if nSeqs > 0 {
if len(in) < 1 {
return ErrBlockTooSmall
@@ -541,6 +548,9 @@ func (b *blockDec) decodeCompressed(hist *history) error {
}
switch mode {
case compModePredefined:
+ if seq.fse != nil && !seq.fse.preDefined {
+ fseDecoderPool.Put(seq.fse)
+ }
seq.fse = &fsePredef[i]
case compModeRLE:
if br.remain() < 1 {
@@ -548,34 +558,36 @@ func (b *blockDec) decodeCompressed(hist *history) error {
}
v := br.Uint8()
br.advance(1)
- dec := fseDecoderPool.Get().(*fseDecoder)
+ if seq.fse == nil || seq.fse.preDefined {
+ seq.fse = fseDecoderPool.Get().(*fseDecoder)
+ }
symb, err := decSymbolValue(v, symbolTableX[i])
if err != nil {
printf("RLE Transform table (%v) error: %v", tableIndex(i), err)
return err
}
- dec.setRLE(symb)
- seq.fse = dec
+ seq.fse.setRLE(symb)
if debugDecoder {
printf("RLE set to %+v, code: %v", symb, v)
}
case compModeFSE:
println("Reading table for", tableIndex(i))
- dec := fseDecoderPool.Get().(*fseDecoder)
- err := dec.readNCount(&br, uint16(maxTableSymbol[i]))
+ if seq.fse == nil || seq.fse.preDefined {
+ seq.fse = fseDecoderPool.Get().(*fseDecoder)
+ }
+ err := seq.fse.readNCount(&br, uint16(maxTableSymbol[i]))
if err != nil {
println("Read table error:", err)
return err
}
- err = dec.transform(symbolTableX[i])
+ err = seq.fse.transform(symbolTableX[i])
if err != nil {
println("Transform table error:", err)
return err
}
if debugDecoder {
- println("Read table ok", "symbolLen:", dec.symbolLen)
+ println("Read table ok", "symbolLen:", seq.fse.symbolLen)
}
- seq.fse = dec
case compModeRepeat:
seq.repeat = true
}
@@ -585,140 +597,88 @@ func (b *blockDec) decodeCompressed(hist *history) error {
}
in = br.unread()
}
-
- // Wait for history.
- // All time spent after this is critical since it is strictly sequential.
- if hist == nil {
- hist = <-b.history
- if hist.error {
- return ErrDecoderClosed
- }
- }
-
- // Decode treeless literal block.
- if litType == literalsBlockTreeless {
- // TODO: We could send the history early WITHOUT the stream history.
- // This would allow decoding treeless literals before the byte history is available.
- // Silencia stats: Treeless 4393, with: 32775, total: 37168, 11% treeless.
- // So not much obvious gain here.
-
- if hist.huffTree == nil {
- return errors.New("literal block was treeless, but no history was defined")
- }
- // Ensure we have space to store it.
- if cap(b.literalBuf) < litRegenSize {
- if b.lowMem {
- b.literalBuf = make([]byte, 0, litRegenSize)
- } else {
- b.literalBuf = make([]byte, 0, maxCompressedLiteralSize)
- }
- }
- var err error
- // Use our out buffer.
- huff = hist.huffTree
- if fourStreams {
- literals, err = huff.Decoder().Decompress4X(b.literalBuf[:0:litRegenSize], literals)
- } else {
- literals, err = huff.Decoder().Decompress1X(b.literalBuf[:0:litRegenSize], literals)
- }
- // Make sure we don't leak our literals buffer
- if err != nil {
- println("decompressing literals:", err)
- return err
- }
- if len(literals) != litRegenSize {
- return fmt.Errorf("literal output size mismatch want %d, got %d", litRegenSize, len(literals))
- }
- } else {
- if hist.huffTree != nil && huff != nil {
- if hist.dict == nil || hist.dict.litEnc != hist.huffTree {
- huffDecoderPool.Put(hist.huffTree)
- }
- hist.huffTree = nil
- }
- }
- if huff != nil {
- hist.huffTree = huff
- }
if debugDecoder {
- println("Final literals:", len(literals), "hash:", xxhash.Sum64(literals), "and", nSeqs, "sequences.")
+ println("Literals:", len(seqs.literals), "hash:", xxhash.Sum64(seqs.literals), "and", seqs.nSeqs, "sequences.")
}
if nSeqs == 0 {
- // Decompressed content is defined entirely as Literals Section content.
- b.dst = append(b.dst, literals...)
- if delayedHistory {
- hist.append(literals)
+ if len(b.sequence) > 0 {
+ b.sequence = b.sequence[:0]
}
return nil
}
-
- seqs, err := seqs.mergeHistory(&hist.decoders)
- if err != nil {
- return err
+ br := seqs.br
+ if br == nil {
+ br = &bitReader{}
}
- if debugDecoder {
- println("History merged ok")
- }
- br := &bitReader{}
if err := br.init(in); err != nil {
return err
}
- // TODO: Investigate if sending history without decoders are faster.
- // This would allow the sequences to be decoded async and only have to construct stream history.
- // If only recent offsets were not transferred, this would be an obvious win.
- // Also, if first 3 sequences don't reference recent offsets, all sequences can be decoded.
+ if err := seqs.initialize(br, hist, b.dst); err != nil {
+ println("initializing sequences:", err)
+ return err
+ }
+ return nil
+}
+
+func (b *blockDec) decodeSequences(hist *history) error {
+ if cap(b.sequence) < hist.decoders.nSeqs {
+ if b.lowMem {
+ b.sequence = make([]seqVals, 0, hist.decoders.nSeqs)
+ } else {
+ b.sequence = make([]seqVals, 0, 0x7F00+0xffff)
+ }
+ }
+ b.sequence = b.sequence[:hist.decoders.nSeqs]
+ if hist.decoders.nSeqs == 0 {
+ hist.decoders.seqSize = len(hist.decoders.literals)
+ return nil
+ }
+ hist.decoders.prevOffset = hist.recentOffsets
+ err := hist.decoders.decode(b.sequence)
+ hist.recentOffsets = hist.decoders.prevOffset
+ return err
+}
+func (b *blockDec) executeSequences(hist *history) error {
hbytes := hist.b
if len(hbytes) > hist.windowSize {
hbytes = hbytes[len(hbytes)-hist.windowSize:]
- // We do not need history any more.
+ // We do not need history anymore.
if hist.dict != nil {
hist.dict.content = nil
}
}
-
- if err := seqs.initialize(br, hist, literals, b.dst); err != nil {
- println("initializing sequences:", err)
- return err
- }
-
- err = seqs.decode(nSeqs, br, hbytes)
+ hist.decoders.windowSize = hist.windowSize
+ hist.decoders.out = b.dst[:0]
+ err := hist.decoders.execute(b.sequence, hbytes)
if err != nil {
return err
}
- if !br.finished() {
- return fmt.Errorf("%d extra bits on block, should be 0", br.remain())
- }
+ return b.updateHistory(hist)
+}
- err = br.close()
- if err != nil {
- printf("Closing sequences: %v, %+v\n", err, *br)
- }
+func (b *blockDec) updateHistory(hist *history) error {
if len(b.data) > maxCompressedBlockSize {
return fmt.Errorf("compressed block size too large (%d)", len(b.data))
}
// Set output and release references.
- b.dst = seqs.out
- seqs.out, seqs.literals, seqs.hist = nil, nil, nil
+ b.dst = hist.decoders.out
+ hist.recentOffsets = hist.decoders.prevOffset
- if !delayedHistory {
- // If we don't have delayed history, no need to update.
- hist.recentOffsets = seqs.prevOffset
- return nil
- }
if b.Last {
// if last block we don't care about history.
println("Last block, no history returned")
hist.b = hist.b[:0]
return nil
+ } else {
+ hist.append(b.dst)
+ if debugDecoder {
+ println("Finished block with ", len(b.sequence), "sequences. Added", len(b.dst), "to history, now length", len(hist.b))
+ }
}
- hist.append(b.dst)
- hist.recentOffsets = seqs.prevOffset
- if debugDecoder {
- println("Finished block with literals:", len(literals), "and", nSeqs, "sequences.")
- }
+ hist.decoders.out, hist.decoders.literals = nil, nil
return nil
}
diff --git a/vendor/github.com/klauspost/compress/zstd/bytebuf.go b/vendor/github.com/klauspost/compress/zstd/bytebuf.go
index aab71c6cf..b80191e4b 100644
--- a/vendor/github.com/klauspost/compress/zstd/bytebuf.go
+++ b/vendor/github.com/klauspost/compress/zstd/bytebuf.go
@@ -113,6 +113,9 @@ func (r *readerWrapper) readBig(n int, dst []byte) ([]byte, error) {
func (r *readerWrapper) readByte() (byte, error) {
n2, err := r.r.Read(r.tmp[:1])
if err != nil {
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
return 0, err
}
if n2 != 1 {
diff --git a/vendor/github.com/klauspost/compress/zstd/decoder.go b/vendor/github.com/klauspost/compress/zstd/decoder.go
index f430f58b5..a93dfaf10 100644
--- a/vendor/github.com/klauspost/compress/zstd/decoder.go
+++ b/vendor/github.com/klauspost/compress/zstd/decoder.go
@@ -5,9 +5,13 @@
package zstd
import (
- "errors"
+ "bytes"
+ "context"
+ "encoding/binary"
"io"
"sync"
+
+ "github.com/klauspost/compress/zstd/internal/xxhash"
)
// Decoder provides decoding of zstandard streams.
@@ -22,12 +26,19 @@ type Decoder struct {
// Unreferenced decoders, ready for use.
decoders chan *blockDec
- // Streams ready to be decoded.
- stream chan decodeStream
-
// Current read position used for Reader functionality.
current decoderState
+ // sync stream decoding
+ syncStream struct {
+ decodedFrame uint64
+ br readerWrapper
+ enabled bool
+ inFrame bool
+ }
+
+ frame *frameDec
+
// Custom dictionaries.
// Always uses copies.
dicts map[uint32]dict
@@ -46,7 +57,10 @@ type decoderState struct {
output chan decodeOutput
// cancel remaining output.
- cancel chan struct{}
+ cancel context.CancelFunc
+
+ // crc of current frame
+ crc *xxhash.Digest
flushed bool
}
@@ -81,7 +95,7 @@ func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
return nil, err
}
}
- d.current.output = make(chan decodeOutput, d.o.concurrent)
+ d.current.crc = xxhash.New()
d.current.flushed = true
if r == nil {
@@ -130,7 +144,7 @@ func (d *Decoder) Read(p []byte) (int, error) {
break
}
if !d.nextBlock(n == 0) {
- return n, nil
+ return n, d.current.err
}
}
}
@@ -162,6 +176,7 @@ func (d *Decoder) Reset(r io.Reader) error {
d.drainOutput()
+ d.syncStream.br.r = nil
if r == nil {
d.current.err = ErrDecoderNilInput
if len(d.current.b) > 0 {
@@ -195,33 +210,39 @@ func (d *Decoder) Reset(r io.Reader) error {
}
return nil
}
-
- if d.stream == nil {
- d.stream = make(chan decodeStream, 1)
- d.streamWg.Add(1)
- go d.startStreamDecoder(d.stream)
- }
-
// Remove current block.
+ d.stashDecoder()
d.current.decodeOutput = decodeOutput{}
d.current.err = nil
- d.current.cancel = make(chan struct{})
d.current.flushed = false
d.current.d = nil
- d.stream <- decodeStream{
- r: r,
- output: d.current.output,
- cancel: d.current.cancel,
+ // Ensure no-one else is still running...
+ d.streamWg.Wait()
+ if d.frame == nil {
+ d.frame = newFrameDec(d.o)
+ }
+
+ if d.o.concurrent == 1 {
+ return d.startSyncDecoder(r)
}
+
+ d.current.output = make(chan decodeOutput, d.o.concurrent)
+ ctx, cancel := context.WithCancel(context.Background())
+ d.current.cancel = cancel
+ d.streamWg.Add(1)
+ go d.startStreamDecoder(ctx, r, d.current.output)
+
return nil
}
// drainOutput will drain the output until errEndOfStream is sent.
func (d *Decoder) drainOutput() {
if d.current.cancel != nil {
- println("cancelling current")
- close(d.current.cancel)
+ if debugDecoder {
+ println("cancelling current")
+ }
+ d.current.cancel()
d.current.cancel = nil
}
if d.current.d != nil {
@@ -243,12 +264,9 @@ func (d *Decoder) drainOutput() {
}
d.decoders <- v.d
}
- if v.err == errEndOfStream {
- println("current flushed")
- d.current.flushed = true
- return
- }
}
+ d.current.output = nil
+ d.current.flushed = true
}
// WriteTo writes data to w until there's no more data to write or when an error occurs.
@@ -287,7 +305,7 @@ func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
// DecodeAll can be used concurrently.
// The Decoder concurrency limits will be respected.
func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
- if d.current.err == ErrDecoderClosed {
+ if d.decoders == nil {
return dst, ErrDecoderClosed
}
@@ -300,6 +318,9 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
}
frame.rawInput = nil
frame.bBuf = nil
+ if frame.history.decoders.br != nil {
+ frame.history.decoders.br.in = nil
+ }
d.decoders <- block
}()
frame.bBuf = input
@@ -307,27 +328,31 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
for {
frame.history.reset()
err := frame.reset(&frame.bBuf)
- if err == io.EOF {
- if debugDecoder {
- println("frame reset return EOF")
+ if err != nil {
+ if err == io.EOF {
+ if debugDecoder {
+ println("frame reset return EOF")
+ }
+ return dst, nil
}
- return dst, nil
+ return dst, err
}
if frame.DictionaryID != nil {
dict, ok := d.dicts[*frame.DictionaryID]
if !ok {
return nil, ErrUnknownDictionary
}
+ if debugDecoder {
+ println("setting dict", frame.DictionaryID)
+ }
frame.history.setDict(&dict)
}
- if err != nil {
- return dst, err
- }
+
if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
return dst, ErrDecoderSizeExceeded
}
if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
- // Never preallocate moe than 1 GB up front.
+ // Never preallocate more than 1 GB up front.
if cap(dst)-len(dst) < int(frame.FrameContentSize) {
dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
copy(dst2, dst)
@@ -368,33 +393,170 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
// If non-blocking mode is used the returned boolean will be false
// if no data was available without blocking.
func (d *Decoder) nextBlock(blocking bool) (ok bool) {
- if d.current.d != nil {
- if debugDecoder {
- printf("re-adding current decoder %p", d.current.d)
- }
- d.decoders <- d.current.d
- d.current.d = nil
- }
if d.current.err != nil {
// Keep error state.
- return blocking
+ return false
}
+ d.current.b = d.current.b[:0]
+ // SYNC:
+ if d.syncStream.enabled {
+ if !blocking {
+ return false
+ }
+ ok = d.nextBlockSync()
+ if !ok {
+ d.stashDecoder()
+ }
+ return ok
+ }
+
+ //ASYNC:
+ d.stashDecoder()
if blocking {
- d.current.decodeOutput = <-d.current.output
+ d.current.decodeOutput, ok = <-d.current.output
} else {
select {
- case d.current.decodeOutput = <-d.current.output:
+ case d.current.decodeOutput, ok = <-d.current.output:
default:
return false
}
}
+ if !ok {
+ // This should not happen, so signal error state...
+ d.current.err = io.ErrUnexpectedEOF
+ return false
+ }
+ next := d.current.decodeOutput
+ if next.d != nil && next.d.async.newHist != nil {
+ d.current.crc.Reset()
+ }
if debugDecoder {
- println("got", len(d.current.b), "bytes, error:", d.current.err)
+ var tmp [4]byte
+ binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
+ println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
+ }
+
+ if len(next.b) > 0 {
+ n, err := d.current.crc.Write(next.b)
+ if err == nil {
+ if n != len(next.b) {
+ d.current.err = io.ErrShortWrite
+ }
+ }
+ }
+ if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 {
+ got := d.current.crc.Sum64()
+ var tmp [4]byte
+ binary.LittleEndian.PutUint32(tmp[:], uint32(got))
+ if !bytes.Equal(tmp[:], next.d.checkCRC) && !ignoreCRC {
+ if debugDecoder {
+ println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)")
+ }
+ d.current.err = ErrCRCMismatch
+ } else {
+ if debugDecoder {
+ println("CRC ok", tmp[:])
+ }
+ }
+ }
+
+ return true
+}
+
+func (d *Decoder) nextBlockSync() (ok bool) {
+ if d.current.d == nil {
+ d.current.d = <-d.decoders
+ }
+ for len(d.current.b) == 0 {
+ if !d.syncStream.inFrame {
+ d.frame.history.reset()
+ d.current.err = d.frame.reset(&d.syncStream.br)
+ if d.current.err != nil {
+ return false
+ }
+ if d.frame.DictionaryID != nil {
+ dict, ok := d.dicts[*d.frame.DictionaryID]
+ if !ok {
+ d.current.err = ErrUnknownDictionary
+ return false
+ } else {
+ d.frame.history.setDict(&dict)
+ }
+ }
+ if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
+ d.current.err = ErrDecoderSizeExceeded
+ return false
+ }
+
+ d.syncStream.decodedFrame = 0
+ d.syncStream.inFrame = true
+ }
+ d.current.err = d.frame.next(d.current.d)
+ if d.current.err != nil {
+ return false
+ }
+ d.frame.history.ensureBlock()
+ if debugDecoder {
+ println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
+ }
+ histBefore := len(d.frame.history.b)
+ d.current.err = d.current.d.decodeBuf(&d.frame.history)
+
+ if d.current.err != nil {
+ println("error after:", d.current.err)
+ return false
+ }
+ d.current.b = d.frame.history.b[histBefore:]
+ if debugDecoder {
+ println("history after:", len(d.frame.history.b))
+ }
+
+ // Check frame size (before CRC)
+ d.syncStream.decodedFrame += uint64(len(d.current.b))
+ if d.frame.FrameContentSize > 0 && d.syncStream.decodedFrame > d.frame.FrameContentSize {
+ if debugDecoder {
+ printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
+ }
+ d.current.err = ErrFrameSizeExceeded
+ return false
+ }
+
+ // Check FCS
+ if d.current.d.Last && d.frame.FrameContentSize > 0 && d.syncStream.decodedFrame != d.frame.FrameContentSize {
+ if debugDecoder {
+ printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
+ }
+ d.current.err = ErrFrameSizeMismatch
+ return false
+ }
+
+ // Update/Check CRC
+ if d.frame.HasCheckSum {
+ d.frame.crc.Write(d.current.b)
+ if d.current.d.Last {
+ d.current.err = d.frame.checkCRC()
+ if d.current.err != nil {
+ println("CRC error:", d.current.err)
+ return false
+ }
+ }
+ }
+ d.syncStream.inFrame = !d.current.d.Last
}
return true
}
+func (d *Decoder) stashDecoder() {
+ if d.current.d != nil {
+ if debugDecoder {
+ printf("re-adding current decoder %p", d.current.d)
+ }
+ d.decoders <- d.current.d
+ d.current.d = nil
+ }
+}
+
// Close will release all resources.
// It is NOT possible to reuse the decoder after this.
func (d *Decoder) Close() {
@@ -402,10 +564,10 @@ func (d *Decoder) Close() {
return
}
d.drainOutput()
- if d.stream != nil {
- close(d.stream)
+ if d.current.cancel != nil {
+ d.current.cancel()
d.streamWg.Wait()
- d.stream = nil
+ d.current.cancel = nil
}
if d.decoders != nil {
close(d.decoders)
@@ -456,100 +618,306 @@ type decodeOutput struct {
err error
}
-type decodeStream struct {
- r io.Reader
-
- // Blocks ready to be written to output.
- output chan decodeOutput
-
- // cancel reading from the input
- cancel chan struct{}
+func (d *Decoder) startSyncDecoder(r io.Reader) error {
+ d.frame.history.reset()
+ d.syncStream.br = readerWrapper{r: r}
+ d.syncStream.inFrame = false
+ d.syncStream.enabled = true
+ d.syncStream.decodedFrame = 0
+ return nil
}
-// errEndOfStream indicates that everything from the stream was read.
-var errEndOfStream = errors.New("end-of-stream")
-
// Create Decoder:
-// Spawn n block decoders. These accept tasks to decode a block.
-// Create goroutine that handles stream processing, this will send history to decoders as they are available.
-// Decoders update the history as they decode.
-// When a block is returned:
-// a) history is sent to the next decoder,
-// b) content written to CRC.
-// c) return data to WRITER.
-// d) wait for next block to return data.
-// Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
-func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
+// ASYNC:
+// Spawn 4 go routines.
+// 0: Read frames and decode blocks.
+// 1: Decode block and literals. Receives hufftree and seqdecs, returns seqdecs and huff tree.
+// 2: Wait for recentOffsets if needed. Decode sequences, send recentOffsets.
+// 3: Wait for stream history, execute sequences, send stream history.
+func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
defer d.streamWg.Done()
- frame := newFrameDec(d.o)
- for stream := range inStream {
- if debugDecoder {
- println("got new stream")
+ br := readerWrapper{r: r}
+
+ var seqPrepare = make(chan *blockDec, d.o.concurrent)
+ var seqDecode = make(chan *blockDec, d.o.concurrent)
+ var seqExecute = make(chan *blockDec, d.o.concurrent)
+
+ // Async 1: Prepare blocks...
+ go func() {
+ var hist history
+ var hasErr bool
+ for block := range seqPrepare {
+ if hasErr {
+ if block != nil {
+ seqDecode <- block
+ }
+ continue
+ }
+ if block.async.newHist != nil {
+ if debugDecoder {
+ println("Async 1: new history")
+ }
+ hist.reset()
+ if block.async.newHist.dict != nil {
+ hist.setDict(block.async.newHist.dict)
+ }
+ }
+ if block.err != nil || block.Type != blockTypeCompressed {
+ hasErr = block.err != nil
+ seqDecode <- block
+ continue
+ }
+
+ remain, err := block.decodeLiterals(block.data, &hist)
+ block.err = err
+ hasErr = block.err != nil
+ if err == nil {
+ block.async.literals = hist.decoders.literals
+ block.async.seqData = remain
+ } else if debugDecoder {
+ println("decodeLiterals error:", err)
+ }
+ seqDecode <- block
}
- br := readerWrapper{r: stream.r}
- decodeStream:
- for {
- frame.history.reset()
- err := frame.reset(&br)
- if debugDecoder && err != nil {
- println("Frame decoder returned", err)
+ close(seqDecode)
+ }()
+
+ // Async 2: Decode sequences...
+ go func() {
+ var hist history
+ var hasErr bool
+
+ for block := range seqDecode {
+ if hasErr {
+ if block != nil {
+ seqExecute <- block
+ }
+ continue
}
- if err == nil && frame.DictionaryID != nil {
- dict, ok := d.dicts[*frame.DictionaryID]
- if !ok {
- err = ErrUnknownDictionary
- } else {
- frame.history.setDict(&dict)
+ if block.async.newHist != nil {
+ if debugDecoder {
+ println("Async 2: new history, recent:", block.async.newHist.recentOffsets)
+ }
+ hist.decoders = block.async.newHist.decoders
+ hist.recentOffsets = block.async.newHist.recentOffsets
+ if block.async.newHist.dict != nil {
+ hist.setDict(block.async.newHist.dict)
}
}
- if err != nil {
- stream.output <- decodeOutput{
- err: err,
+ if block.err != nil || block.Type != blockTypeCompressed {
+ hasErr = block.err != nil
+ seqExecute <- block
+ continue
+ }
+
+ hist.decoders.literals = block.async.literals
+ block.err = block.prepareSequences(block.async.seqData, &hist)
+ if debugDecoder && block.err != nil {
+ println("prepareSequences returned:", block.err)
+ }
+ hasErr = block.err != nil
+ if block.err == nil {
+ block.err = block.decodeSequences(&hist)
+ if debugDecoder && block.err != nil {
+ println("decodeSequences returned:", block.err)
}
- break
+ hasErr = block.err != nil
+ // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
+ block.async.seqSize = hist.decoders.seqSize
}
- if debugDecoder {
- println("starting frame decoder")
- }
-
- // This goroutine will forward history between frames.
- frame.frameDone.Add(1)
- frame.initAsync()
-
- go frame.startDecoder(stream.output)
- decodeFrame:
- // Go through all blocks of the frame.
- for {
- dec := <-d.decoders
- select {
- case <-stream.cancel:
- if !frame.sendErr(dec, io.EOF) {
- // To not let the decoder dangle, send it back.
- stream.output <- decodeOutput{d: dec}
+ seqExecute <- block
+ }
+ close(seqExecute)
+ }()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+
+ // Async 3: Execute sequences...
+ frameHistCache := d.frame.history.b
+ go func() {
+ var hist history
+ var decodedFrame uint64
+ var fcs uint64
+ var hasErr bool
+ for block := range seqExecute {
+ out := decodeOutput{err: block.err, d: block}
+ if block.err != nil || hasErr {
+ hasErr = true
+ output <- out
+ continue
+ }
+ if block.async.newHist != nil {
+ if debugDecoder {
+ println("Async 3: new history")
+ }
+ hist.windowSize = block.async.newHist.windowSize
+ hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
+ if block.async.newHist.dict != nil {
+ hist.setDict(block.async.newHist.dict)
+ }
+
+ if cap(hist.b) < hist.allocFrameBuffer {
+ if cap(frameHistCache) >= hist.allocFrameBuffer {
+ hist.b = frameHistCache
+ } else {
+ hist.b = make([]byte, 0, hist.allocFrameBuffer)
+ println("Alloc history sized", hist.allocFrameBuffer)
+ }
+ }
+ hist.b = hist.b[:0]
+ fcs = block.async.fcs
+ decodedFrame = 0
+ }
+ do := decodeOutput{err: block.err, d: block}
+ switch block.Type {
+ case blockTypeRLE:
+ if debugDecoder {
+ println("add rle block length:", block.RLESize)
+ }
+
+ if cap(block.dst) < int(block.RLESize) {
+ if block.lowMem {
+ block.dst = make([]byte, block.RLESize)
+ } else {
+ block.dst = make([]byte, maxBlockSize)
}
- break decodeStream
- default:
}
- err := frame.next(dec)
- switch err {
- case io.EOF:
- // End of current frame, no error
- println("EOF on next block")
- break decodeFrame
- case nil:
- continue
- default:
- println("block decoder returned", err)
- break decodeStream
+ block.dst = block.dst[:block.RLESize]
+ v := block.data[0]
+ for i := range block.dst {
+ block.dst[i] = v
+ }
+ hist.append(block.dst)
+ do.b = block.dst
+ case blockTypeRaw:
+ if debugDecoder {
+ println("add raw block length:", len(block.data))
+ }
+ hist.append(block.data)
+ do.b = block.data
+ case blockTypeCompressed:
+ if debugDecoder {
+ println("execute with history length:", len(hist.b), "window:", hist.windowSize)
+ }
+ hist.decoders.seqSize = block.async.seqSize
+ hist.decoders.literals = block.async.literals
+ do.err = block.executeSequences(&hist)
+ hasErr = do.err != nil
+ if debugDecoder && hasErr {
+ println("executeSequences returned:", do.err)
+ }
+ do.b = block.dst
+ }
+ if !hasErr {
+ decodedFrame += uint64(len(do.b))
+ if fcs > 0 && decodedFrame > fcs {
+ println("fcs exceeded", block.Last, fcs, decodedFrame)
+ do.err = ErrFrameSizeExceeded
+ hasErr = true
+ } else if block.Last && fcs > 0 && decodedFrame != fcs {
+ do.err = ErrFrameSizeMismatch
+ hasErr = true
+ } else {
+ if debugDecoder {
+ println("fcs ok", block.Last, fcs, decodedFrame)
+ }
}
}
- // All blocks have started decoding, check if there are more frames.
- println("waiting for done")
- frame.frameDone.Wait()
- println("done waiting...")
+ output <- do
+ }
+ close(output)
+ frameHistCache = hist.b
+ wg.Done()
+ if debugDecoder {
+ println("decoder goroutines finished")
+ }
+ }()
+
+decodeStream:
+ for {
+ frame := d.frame
+ if debugDecoder {
+ println("New frame...")
+ }
+ var historySent bool
+ frame.history.reset()
+ err := frame.reset(&br)
+ if debugDecoder && err != nil {
+ println("Frame decoder returned", err)
+ }
+ if err == nil && frame.DictionaryID != nil {
+ dict, ok := d.dicts[*frame.DictionaryID]
+ if !ok {
+ err = ErrUnknownDictionary
+ } else {
+ frame.history.setDict(&dict)
+ }
+ }
+ if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
+ err = ErrDecoderSizeExceeded
+ }
+ if err != nil {
+ select {
+ case <-ctx.Done():
+ case dec := <-d.decoders:
+ dec.sendErr(err)
+ seqPrepare <- dec
+ }
+ break decodeStream
+ }
+
+ // Go through all blocks of the frame.
+ for {
+ var dec *blockDec
+ select {
+ case <-ctx.Done():
+ break decodeStream
+ case dec = <-d.decoders:
+ // Once we have a decoder, we MUST return it.
+ }
+ err := frame.next(dec)
+ if !historySent {
+ h := frame.history
+ if debugDecoder {
+ println("Alloc History:", h.allocFrameBuffer)
+ }
+ dec.async.newHist = &h
+ dec.async.fcs = frame.FrameContentSize
+ historySent = true
+ } else {
+ dec.async.newHist = nil
+ }
+ if debugDecoder && err != nil {
+ println("next block returned error:", err)
+ }
+ dec.err = err
+ dec.checkCRC = nil
+ if dec.Last && frame.HasCheckSum && err == nil {
+ crc, err := frame.rawInput.readSmall(4)
+ if err != nil {
+ println("CRC missing?", err)
+ dec.err = err
+ }
+ var tmp [4]byte
+ copy(tmp[:], crc)
+ dec.checkCRC = tmp[:]
+ if debugDecoder {
+ println("found crc to check:", dec.checkCRC)
+ }
+ }
+ err = dec.err
+ last := dec.Last
+ seqPrepare <- dec
+ if err != nil {
+ break decodeStream
+ }
+ if last {
+ break
+ }
}
- frame.frameDone.Wait()
- println("Sending EOS")
- stream.output <- decodeOutput{err: errEndOfStream}
}
+ close(seqPrepare)
+ wg.Wait()
+ d.frame.history.b = frameHistCache
}
diff --git a/vendor/github.com/klauspost/compress/zstd/decoder_options.go b/vendor/github.com/klauspost/compress/zstd/decoder_options.go
index 95cc9b8b8..fd05c9bb0 100644
--- a/vendor/github.com/klauspost/compress/zstd/decoder_options.go
+++ b/vendor/github.com/klauspost/compress/zstd/decoder_options.go
@@ -28,6 +28,9 @@ func (o *decoderOptions) setDefault() {
concurrent: runtime.GOMAXPROCS(0),
maxWindowSize: MaxWindowSize,
}
+ if o.concurrent > 4 {
+ o.concurrent = 4
+ }
o.maxDecodedSize = 1 << 63
}
@@ -37,16 +40,25 @@ func WithDecoderLowmem(b bool) DOption {
return func(o *decoderOptions) error { o.lowMem = b; return nil }
}
-// WithDecoderConcurrency will set the concurrency,
-// meaning the maximum number of decoders to run concurrently.
-// The value supplied must be at least 1.
-// By default this will be set to GOMAXPROCS.
+// WithDecoderConcurrency sets the number of created decoders.
+// When decoding block with DecodeAll, this will limit the number
+// of possible concurrently running decodes.
+// When decoding streams, this will limit the number of
+// inflight blocks.
+// When decoding streams and setting maximum to 1,
+// no async decoding will be done.
+// When a value of 0 is provided GOMAXPROCS will be used.
+// By default this will be set to 4 or GOMAXPROCS, whatever is lower.
func WithDecoderConcurrency(n int) DOption {
return func(o *decoderOptions) error {
- if n <= 0 {
+ if n < 0 {
return errors.New("concurrency must be at least 1")
}
- o.concurrent = n
+ if n == 0 {
+ o.concurrent = runtime.GOMAXPROCS(0)
+ } else {
+ o.concurrent = n
+ }
return nil
}
}
diff --git a/vendor/github.com/klauspost/compress/zstd/encoder.go b/vendor/github.com/klauspost/compress/zstd/encoder.go
index e6e315969..dcc987a7c 100644
--- a/vendor/github.com/klauspost/compress/zstd/encoder.go
+++ b/vendor/github.com/klauspost/compress/zstd/encoder.go
@@ -98,23 +98,25 @@ func (e *Encoder) Reset(w io.Writer) {
if cap(s.filling) == 0 {
s.filling = make([]byte, 0, e.o.blockSize)
}
- if cap(s.current) == 0 {
- s.current = make([]byte, 0, e.o.blockSize)
- }
- if cap(s.previous) == 0 {
- s.previous = make([]byte, 0, e.o.blockSize)
+ if e.o.concurrent > 1 {
+ if cap(s.current) == 0 {
+ s.current = make([]byte, 0, e.o.blockSize)
+ }
+ if cap(s.previous) == 0 {
+ s.previous = make([]byte, 0, e.o.blockSize)
+ }
+ s.current = s.current[:0]
+ s.previous = s.previous[:0]
+ if s.writing == nil {
+ s.writing = &blockEnc{lowMem: e.o.lowMem}
+ s.writing.init()
+ }
+ s.writing.initNewEncode()
}
if s.encoder == nil {
s.encoder = e.o.encoder()
}
- if s.writing == nil {
- s.writing = &blockEnc{lowMem: e.o.lowMem}
- s.writing.init()
- }
- s.writing.initNewEncode()
s.filling = s.filling[:0]
- s.current = s.current[:0]
- s.previous = s.previous[:0]
s.encoder.Reset(e.o.dict, false)
s.headerWritten = false
s.eofWritten = false
@@ -258,6 +260,46 @@ func (e *Encoder) nextBlock(final bool) error {
return s.err
}
+ // SYNC:
+ if e.o.concurrent == 1 {
+ src := s.filling
+ s.nInput += int64(len(s.filling))
+ if debugEncoder {
+ println("Adding sync block,", len(src), "bytes, final:", final)
+ }
+ enc := s.encoder
+ blk := enc.Block()
+ blk.reset(nil)
+ enc.Encode(blk, src)
+ blk.last = final
+ if final {
+ s.eofWritten = true
+ }
+
+ err := errIncompressible
+ // If we got the exact same number of literals as input,
+ // assume the literals cannot be compressed.
+ if len(src) != len(blk.literals) || len(src) != e.o.blockSize {
+ err = blk.encode(src, e.o.noEntropy, !e.o.allLitEntropy)
+ }
+ switch err {
+ case errIncompressible:
+ if debugEncoder {
+ println("Storing incompressible block as raw")
+ }
+ blk.encodeRaw(src)
+ // In fast mode, we do not transfer offsets, so we don't have to deal with changing the.
+ case nil:
+ default:
+ s.err = err
+ return err
+ }
+ _, s.err = s.w.Write(blk.output)
+ s.nWritten += int64(len(blk.output))
+ s.filling = s.filling[:0]
+ return s.err
+ }
+
// Move blocks forward.
s.filling, s.current, s.previous = s.previous[:0], s.filling, s.current
s.nInput += int64(len(s.current))
diff --git a/vendor/github.com/klauspost/compress/zstd/encoder_options.go b/vendor/github.com/klauspost/compress/zstd/encoder_options.go
index 5f2e1d020..44d8dbd19 100644
--- a/vendor/github.com/klauspost/compress/zstd/encoder_options.go
+++ b/vendor/github.com/klauspost/compress/zstd/encoder_options.go
@@ -76,6 +76,7 @@ func WithEncoderCRC(b bool) EOption {
// WithEncoderConcurrency will set the concurrency,
// meaning the maximum number of encoders to run concurrently.
// The value supplied must be at least 1.
+// For streams, setting a value of 1 will disable async compression.
// By default this will be set to GOMAXPROCS.
func WithEncoderConcurrency(n int) EOption {
return func(o *encoderOptions) error {
diff --git a/vendor/github.com/klauspost/compress/zstd/framedec.go b/vendor/github.com/klauspost/compress/zstd/framedec.go
index 989c79f8c..29c3176b0 100644
--- a/vendor/github.com/klauspost/compress/zstd/framedec.go
+++ b/vendor/github.com/klauspost/compress/zstd/framedec.go
@@ -8,23 +8,17 @@ import (
"bytes"
"encoding/hex"
"errors"
- "hash"
"io"
- "sync"
"github.com/klauspost/compress/zstd/internal/xxhash"
)
type frameDec struct {
- o decoderOptions
- crc hash.Hash64
- offset int64
+ o decoderOptions
+ crc *xxhash.Digest
WindowSize uint64
- // In order queue of blocks being decoded.
- decoding chan *blockDec
-
// Frame history passed between blocks
history history
@@ -34,15 +28,10 @@ type frameDec struct {
bBuf byteBuf
FrameContentSize uint64
- frameDone sync.WaitGroup
DictionaryID *uint32
HasCheckSum bool
SingleSegment bool
-
- // asyncRunning indicates whether the async routine processes input on 'decoding'.
- asyncRunningMu sync.Mutex
- asyncRunning bool
}
const (
@@ -229,9 +218,10 @@ func (d *frameDec) reset(br byteBuffer) error {
d.FrameContentSize = uint64(d1) | (uint64(d2) << 32)
}
if debugDecoder {
- println("field size bits:", v, "fcsSize:", fcsSize, "FrameContentSize:", d.FrameContentSize, hex.EncodeToString(b[:fcsSize]), "singleseg:", d.SingleSegment, "window:", d.WindowSize)
+ println("Read FCS:", d.FrameContentSize)
}
}
+
// Move this to shared.
d.HasCheckSum = fhd&(1<<2) != 0
if d.HasCheckSum {
@@ -264,10 +254,16 @@ func (d *frameDec) reset(br byteBuffer) error {
}
d.history.windowSize = int(d.WindowSize)
if d.o.lowMem && d.history.windowSize < maxBlockSize {
- d.history.maxSize = d.history.windowSize * 2
+ d.history.allocFrameBuffer = d.history.windowSize * 2
+ // TODO: Maybe use FrameContent size
} else {
- d.history.maxSize = d.history.windowSize + maxBlockSize
+ d.history.allocFrameBuffer = d.history.windowSize + maxBlockSize
}
+
+ if debugDecoder {
+ println("Frame: Dict:", d.DictionaryID, "FrameContentSize:", d.FrameContentSize, "singleseg:", d.SingleSegment, "window:", d.WindowSize, "crc:", d.HasCheckSum)
+ }
+
// history contains input - maybe we do something
d.rawInput = br
return nil
@@ -276,49 +272,18 @@ func (d *frameDec) reset(br byteBuffer) error {
// next will start decoding the next block from stream.
func (d *frameDec) next(block *blockDec) error {
if debugDecoder {
- printf("decoding new block %p:%p", block, block.data)
+ println("decoding new block")
}
err := block.reset(d.rawInput, d.WindowSize)
if err != nil {
println("block error:", err)
// Signal the frame decoder we have a problem.
- d.sendErr(block, err)
+ block.sendErr(err)
return err
}
- block.input <- struct{}{}
- if debugDecoder {
- println("next block:", block)
- }
- d.asyncRunningMu.Lock()
- defer d.asyncRunningMu.Unlock()
- if !d.asyncRunning {
- return nil
- }
- if block.Last {
- // We indicate the frame is done by sending io.EOF
- d.decoding <- block
- return io.EOF
- }
- d.decoding <- block
return nil
}
-// sendEOF will queue an error block on the frame.
-// This will cause the frame decoder to return when it encounters the block.
-// Returns true if the decoder was added.
-func (d *frameDec) sendErr(block *blockDec, err error) bool {
- d.asyncRunningMu.Lock()
- defer d.asyncRunningMu.Unlock()
- if !d.asyncRunning {
- return false
- }
-
- println("sending error", err.Error())
- block.sendErr(err)
- d.decoding <- block
- return true
-}
-
// checkCRC will check the checksum if the frame has one.
// Will return ErrCRCMismatch if crc check failed, otherwise nil.
func (d *frameDec) checkCRC() error {
@@ -340,7 +305,7 @@ func (d *frameDec) checkCRC() error {
return err
}
- if !bytes.Equal(tmp[:], want) {
+ if !bytes.Equal(tmp[:], want) && !ignoreCRC {
if debugDecoder {
println("CRC Check Failed:", tmp[:], "!=", want)
}
@@ -352,131 +317,13 @@ func (d *frameDec) checkCRC() error {
return nil
}
-func (d *frameDec) initAsync() {
- if !d.o.lowMem && !d.SingleSegment {
- // set max extra size history to 2MB.
- d.history.maxSize = d.history.windowSize + maxBlockSize
- }
- // re-alloc if more than one extra block size.
- if d.o.lowMem && cap(d.history.b) > d.history.maxSize+maxBlockSize {
- d.history.b = make([]byte, 0, d.history.maxSize)
- }
- if cap(d.history.b) < d.history.maxSize {
- d.history.b = make([]byte, 0, d.history.maxSize)
- }
- if cap(d.decoding) < d.o.concurrent {
- d.decoding = make(chan *blockDec, d.o.concurrent)
- }
- if debugDecoder {
- h := d.history
- printf("history init. len: %d, cap: %d", len(h.b), cap(h.b))
- }
- d.asyncRunningMu.Lock()
- d.asyncRunning = true
- d.asyncRunningMu.Unlock()
-}
-
-// startDecoder will start decoding blocks and write them to the writer.
-// The decoder will stop as soon as an error occurs or at end of frame.
-// When the frame has finished decoding the *bufio.Reader
-// containing the remaining input will be sent on frameDec.frameDone.
-func (d *frameDec) startDecoder(output chan decodeOutput) {
- written := int64(0)
-
- defer func() {
- d.asyncRunningMu.Lock()
- d.asyncRunning = false
- d.asyncRunningMu.Unlock()
-
- // Drain the currently decoding.
- d.history.error = true
- flushdone:
- for {
- select {
- case b := <-d.decoding:
- b.history <- &d.history
- output <- <-b.result
- default:
- break flushdone
- }
- }
- println("frame decoder done, signalling done")
- d.frameDone.Done()
- }()
- // Get decoder for first block.
- block := <-d.decoding
- block.history <- &d.history
- for {
- var next *blockDec
- // Get result
- r := <-block.result
- if r.err != nil {
- println("Result contained error", r.err)
- output <- r
- return
- }
- if debugDecoder {
- println("got result, from ", d.offset, "to", d.offset+int64(len(r.b)))
- d.offset += int64(len(r.b))
- }
- if !block.Last {
- // Send history to next block
- select {
- case next = <-d.decoding:
- if debugDecoder {
- println("Sending ", len(d.history.b), "bytes as history")
- }
- next.history <- &d.history
- default:
- // Wait until we have sent the block, so
- // other decoders can potentially get the decoder.
- next = nil
- }
- }
-
- // Add checksum, async to decoding.
- if d.HasCheckSum {
- n, err := d.crc.Write(r.b)
- if err != nil {
- r.err = err
- if n != len(r.b) {
- r.err = io.ErrShortWrite
- }
- output <- r
- return
- }
- }
- written += int64(len(r.b))
- if d.SingleSegment && uint64(written) > d.FrameContentSize {
- println("runDecoder: single segment and", uint64(written), ">", d.FrameContentSize)
- r.err = ErrFrameSizeExceeded
- output <- r
- return
- }
- if block.Last {
- r.err = d.checkCRC()
- output <- r
- return
- }
- output <- r
- if next == nil {
- // There was no decoder available, we wait for one now that we have sent to the writer.
- if debugDecoder {
- println("Sending ", len(d.history.b), " bytes as history")
- }
- next = <-d.decoding
- next.history <- &d.history
- }
- block = next
- }
-}
-
// runDecoder will create a sync decoder that will decode a block of data.
func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) {
saved := d.history.b
// We use the history for output to avoid copying it.
d.history.b = dst
+ d.history.ignoreBuffer = len(dst)
// Store input length, so we only check new data.
crcStart := len(dst)
var err error
@@ -489,7 +336,7 @@ func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) {
println("next block:", dec)
}
err = dec.decodeBuf(&d.history)
- if err != nil || dec.Last {
+ if err != nil {
break
}
if uint64(len(d.history.b)) > d.o.maxDecodedSize {
@@ -501,10 +348,23 @@ func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) {
err = ErrFrameSizeExceeded
break
}
+ if d.FrameContentSize > 0 && uint64(len(d.history.b)-crcStart) > d.FrameContentSize {
+ println("runDecoder: FrameContentSize exceeded", uint64(len(d.history.b)-crcStart), ">", d.FrameContentSize)
+ err = ErrFrameSizeExceeded
+ break
+ }
+ if dec.Last {
+ break
+ }
+ if debugDecoder && d.FrameContentSize > 0 {
+ println("runDecoder: FrameContentSize", uint64(len(d.history.b)-crcStart), "<=", d.FrameContentSize)
+ }
}
dst = d.history.b
if err == nil {
- if d.HasCheckSum {
+ if d.FrameContentSize > 0 && uint64(len(d.history.b)-crcStart) != d.FrameContentSize {
+ err = ErrFrameSizeMismatch
+ } else if d.HasCheckSum {
var n int
n, err = d.crc.Write(dst[crcStart:])
if err == nil {
diff --git a/vendor/github.com/klauspost/compress/zstd/fuzz.go b/vendor/github.com/klauspost/compress/zstd/fuzz.go
new file mode 100644
index 000000000..fda8a7422
--- /dev/null
+++ b/vendor/github.com/klauspost/compress/zstd/fuzz.go
@@ -0,0 +1,11 @@
+//go:build gofuzz
+// +build gofuzz
+
+// Copyright 2019+ Klaus Post. All rights reserved.
+// License information can be found in the LICENSE file.
+// Based on work by Yann Collet, released under BSD License.
+
+package zstd
+
+// ignoreCRC can be used for fuzz testing to ignore CRC values...
+const ignoreCRC = true
diff --git a/vendor/github.com/klauspost/compress/zstd/fuzz_none.go b/vendor/github.com/klauspost/compress/zstd/fuzz_none.go
new file mode 100644
index 000000000..0515b201c
--- /dev/null
+++ b/vendor/github.com/klauspost/compress/zstd/fuzz_none.go
@@ -0,0 +1,11 @@
+//go:build !gofuzz
+// +build !gofuzz
+
+// Copyright 2019+ Klaus Post. All rights reserved.
+// License information can be found in the LICENSE file.
+// Based on work by Yann Collet, released under BSD License.
+
+package zstd
+
+// ignoreCRC can be used for fuzz testing to ignore CRC values...
+const ignoreCRC = false
diff --git a/vendor/github.com/klauspost/compress/zstd/history.go b/vendor/github.com/klauspost/compress/zstd/history.go
index f783e32d2..28b40153c 100644
--- a/vendor/github.com/klauspost/compress/zstd/history.go
+++ b/vendor/github.com/klauspost/compress/zstd/history.go
@@ -10,20 +10,31 @@ import (
// history contains the information transferred between blocks.
type history struct {
- b []byte
- huffTree *huff0.Scratch
- recentOffsets [3]int
+ // Literal decompression
+ huffTree *huff0.Scratch
+
+ // Sequence decompression
decoders sequenceDecs
- windowSize int
- maxSize int
- error bool
- dict *dict
+ recentOffsets [3]int
+
+ // History buffer...
+ b []byte
+
+ // ignoreBuffer is meant to ignore a number of bytes
+ // when checking for matches in history
+ ignoreBuffer int
+
+ windowSize int
+ allocFrameBuffer int // needed?
+ error bool
+ dict *dict
}
// reset will reset the history to initial state of a frame.
// The history must already have been initialized to the desired size.
func (h *history) reset() {
h.b = h.b[:0]
+ h.ignoreBuffer = 0
h.error = false
h.recentOffsets = [3]int{1, 4, 8}
if f := h.decoders.litLengths.fse; f != nil && !f.preDefined {
@@ -35,7 +46,7 @@ func (h *history) reset() {
if f := h.decoders.matchLengths.fse; f != nil && !f.preDefined {
fseDecoderPool.Put(f)
}
- h.decoders = sequenceDecs{}
+ h.decoders = sequenceDecs{br: h.decoders.br}
if h.huffTree != nil {
if h.dict == nil || h.dict.litEnc != h.huffTree {
huffDecoderPool.Put(h.huffTree)
@@ -54,6 +65,7 @@ func (h *history) setDict(dict *dict) {
h.decoders.litLengths = dict.llDec
h.decoders.offsets = dict.ofDec
h.decoders.matchLengths = dict.mlDec
+ h.decoders.dict = dict.content
h.recentOffsets = dict.offsets
h.huffTree = dict.litEnc
}
@@ -83,6 +95,24 @@ func (h *history) append(b []byte) {
copy(h.b[h.windowSize-len(b):], b)
}
+// ensureBlock will ensure there is space for at least one block...
+func (h *history) ensureBlock() {
+ if cap(h.b) < h.allocFrameBuffer {
+ h.b = make([]byte, 0, h.allocFrameBuffer)
+ return
+ }
+
+ avail := cap(h.b) - len(h.b)
+ if avail >= h.windowSize || avail > maxCompressedBlockSize {
+ return
+ }
+ // Move data down so we only have window size left.
+ // We know we have less than window size in b at this point.
+ discard := len(h.b) - h.windowSize
+ copy(h.b, h.b[discard:])
+ h.b = h.b[:h.windowSize]
+}
+
// append bytes to history without ever discarding anything.
func (h *history) appendKeep(b []byte) {
h.b = append(h.b, b...)
diff --git a/vendor/github.com/klauspost/compress/zstd/seqdec.go b/vendor/github.com/klauspost/compress/zstd/seqdec.go
index bc731e4cb..213736ad7 100644
--- a/vendor/github.com/klauspost/compress/zstd/seqdec.go
+++ b/vendor/github.com/klauspost/compress/zstd/seqdec.go
@@ -20,6 +20,10 @@ type seq struct {
llCode, mlCode, ofCode uint8
}
+type seqVals struct {
+ ll, ml, mo int
+}
+
func (s seq) String() string {
if s.offset <= 3 {
if s.offset == 0 {
@@ -61,16 +65,18 @@ type sequenceDecs struct {
offsets sequenceDec
matchLengths sequenceDec
prevOffset [3]int
- hist []byte
dict []byte
literals []byte
out []byte
+ nSeqs int
+ br *bitReader
+ seqSize int
windowSize int
maxBits uint8
}
// initialize all 3 decoders from the stream input.
-func (s *sequenceDecs) initialize(br *bitReader, hist *history, literals, out []byte) error {
+func (s *sequenceDecs) initialize(br *bitReader, hist *history, out []byte) error {
if err := s.litLengths.init(br); err != nil {
return errors.New("litLengths:" + err.Error())
}
@@ -80,8 +86,7 @@ func (s *sequenceDecs) initialize(br *bitReader, hist *history, literals, out []
if err := s.matchLengths.init(br); err != nil {
return errors.New("matchLengths:" + err.Error())
}
- s.literals = literals
- s.hist = hist.b
+ s.br = br
s.prevOffset = hist.recentOffsets
s.maxBits = s.litLengths.fse.maxBits + s.offsets.fse.maxBits + s.matchLengths.fse.maxBits
s.windowSize = hist.windowSize
@@ -94,11 +99,254 @@ func (s *sequenceDecs) initialize(br *bitReader, hist *history, literals, out []
}
// decode sequences from the stream with the provided history.
-func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
+func (s *sequenceDecs) decode(seqs []seqVals) error {
+ br := s.br
+
+ // Grab full sizes tables, to avoid bounds checks.
+ llTable, mlTable, ofTable := s.litLengths.fse.dt[:maxTablesize], s.matchLengths.fse.dt[:maxTablesize], s.offsets.fse.dt[:maxTablesize]
+ llState, mlState, ofState := s.litLengths.state.state, s.matchLengths.state.state, s.offsets.state.state
+ s.seqSize = 0
+ litRemain := len(s.literals)
+
+ for i := range seqs {
+ var ll, mo, ml int
+ if br.off > 4+((maxOffsetBits+16+16)>>3) {
+ // inlined function:
+ // ll, mo, ml = s.nextFast(br, llState, mlState, ofState)
+
+ // Final will not read from stream.
+ var llB, mlB, moB uint8
+ ll, llB = llState.final()
+ ml, mlB = mlState.final()
+ mo, moB = ofState.final()
+
+ // extra bits are stored in reverse order.
+ br.fillFast()
+ mo += br.getBits(moB)
+ if s.maxBits > 32 {
+ br.fillFast()
+ }
+ ml += br.getBits(mlB)
+ ll += br.getBits(llB)
+
+ if moB > 1 {
+ s.prevOffset[2] = s.prevOffset[1]
+ s.prevOffset[1] = s.prevOffset[0]
+ s.prevOffset[0] = mo
+ } else {
+ // mo = s.adjustOffset(mo, ll, moB)
+ // Inlined for rather big speedup
+ if ll == 0 {
+ // There is an exception though, when current sequence's literals_length = 0.
+ // In this case, repeated offsets are shifted by one, so an offset_value of 1 means Repeated_Offset2,
+ // an offset_value of 2 means Repeated_Offset3, and an offset_value of 3 means Repeated_Offset1 - 1_byte.
+ mo++
+ }
+
+ if mo == 0 {
+ mo = s.prevOffset[0]
+ } else {
+ var temp int
+ if mo == 3 {
+ temp = s.prevOffset[0] - 1
+ } else {
+ temp = s.prevOffset[mo]
+ }
+
+ if temp == 0 {
+ // 0 is not valid; input is corrupted; force offset to 1
+ println("WARNING: temp was 0")
+ temp = 1
+ }
+
+ if mo != 1 {
+ s.prevOffset[2] = s.prevOffset[1]
+ }
+ s.prevOffset[1] = s.prevOffset[0]
+ s.prevOffset[0] = temp
+ mo = temp
+ }
+ }
+ br.fillFast()
+ } else {
+ if br.overread() {
+ if debugDecoder {
+ printf("reading sequence %d, exceeded available data\n", i)
+ }
+ return io.ErrUnexpectedEOF
+ }
+ ll, mo, ml = s.next(br, llState, mlState, ofState)
+ br.fill()
+ }
+
+ if debugSequences {
+ println("Seq", i, "Litlen:", ll, "mo:", mo, "(abs) ml:", ml)
+ }
+ // Evaluate.
+ // We might be doing this async, so do it early.
+ if mo == 0 && ml > 0 {
+ return fmt.Errorf("zero matchoff and matchlen (%d) > 0", ml)
+ }
+ if ml > maxMatchLen {
+ return fmt.Errorf("match len (%d) bigger than max allowed length", ml)
+ }
+ s.seqSize += ll + ml
+ if s.seqSize > maxBlockSize {
+ return fmt.Errorf("output (%d) bigger than max block size", s.seqSize)
+ }
+ litRemain -= ll
+ if litRemain < 0 {
+ return fmt.Errorf("unexpected literal count, want %d bytes, but only %d is available", ll, litRemain+ll)
+ }
+ seqs[i] = seqVals{
+ ll: ll,
+ ml: ml,
+ mo: mo,
+ }
+ if i == len(seqs)-1 {
+ // This is the last sequence, so we shouldn't update state.
+ break
+ }
+
+ // Manually inlined, ~ 5-20% faster
+ // Update all 3 states at once. Approx 20% faster.
+ nBits := llState.nbBits() + mlState.nbBits() + ofState.nbBits()
+ if nBits == 0 {
+ llState = llTable[llState.newState()&maxTableMask]
+ mlState = mlTable[mlState.newState()&maxTableMask]
+ ofState = ofTable[ofState.newState()&maxTableMask]
+ } else {
+ bits := br.get32BitsFast(nBits)
+ lowBits := uint16(bits >> ((ofState.nbBits() + mlState.nbBits()) & 31))
+ llState = llTable[(llState.newState()+lowBits)&maxTableMask]
+
+ lowBits = uint16(bits >> (ofState.nbBits() & 31))
+ lowBits &= bitMask[mlState.nbBits()&15]
+ mlState = mlTable[(mlState.newState()+lowBits)&maxTableMask]
+
+ lowBits = uint16(bits) & bitMask[ofState.nbBits()&15]
+ ofState = ofTable[(ofState.newState()+lowBits)&maxTableMask]
+ }
+ }
+ s.seqSize += litRemain
+ if s.seqSize > maxBlockSize {
+ return fmt.Errorf("output (%d) bigger than max block size", s.seqSize)
+ }
+ err := br.close()
+ if err != nil {
+ printf("Closing sequences: %v, %+v\n", err, *br)
+ }
+ return err
+}
+
+// execute will execute the decoded sequence with the provided history.
+// The sequence must be evaluated before being sent.
+func (s *sequenceDecs) execute(seqs []seqVals, hist []byte) error {
+ // Ensure we have enough output size...
+ if len(s.out)+s.seqSize > cap(s.out) {
+ addBytes := s.seqSize + len(s.out)
+ s.out = append(s.out, make([]byte, addBytes)...)
+ s.out = s.out[:len(s.out)-addBytes]
+ }
+
+ if debugDecoder {
+ printf("Execute %d seqs with hist %d, dict %d, literals: %d into %d bytes\n", len(seqs), len(hist), len(s.dict), len(s.literals), s.seqSize)
+ }
+
+ var t = len(s.out)
+ out := s.out[:t+s.seqSize]
+
+ for _, seq := range seqs {
+ // Add literals
+ copy(out[t:], s.literals[:seq.ll])
+ t += seq.ll
+ s.literals = s.literals[seq.ll:]
+
+ // Copy from dictionary...
+ if seq.mo > t+len(hist) || seq.mo > s.windowSize {
+ if len(s.dict) == 0 {
+ return fmt.Errorf("match offset (%d) bigger than current history (%d)", seq.mo, t+len(hist))
+ }
+
+ // we may be in dictionary.
+ dictO := len(s.dict) - (seq.mo - (t + len(hist)))
+ if dictO < 0 || dictO >= len(s.dict) {
+ return fmt.Errorf("match offset (%d) bigger than current history+dict (%d)", seq.mo, t+len(hist)+len(s.dict))
+ }
+ end := dictO + seq.ml
+ if end > len(s.dict) {
+ n := len(s.dict) - dictO
+ copy(out[t:], s.dict[dictO:])
+ t += n
+ seq.ml -= n
+ } else {
+ copy(out[t:], s.dict[dictO:end])
+ t += end - dictO
+ continue
+ }
+ }
+
+ // Copy from history.
+ if v := seq.mo - t; v > 0 {
+ // v is the start position in history from end.
+ start := len(hist) - v
+ if seq.ml > v {
+ // Some goes into current block.
+ // Copy remainder of history
+ copy(out[t:], hist[start:])
+ t += v
+ seq.ml -= v
+ } else {
+ copy(out[t:], hist[start:start+seq.ml])
+ t += seq.ml
+ continue
+ }
+ }
+ // We must be in current buffer now
+ if seq.ml > 0 {
+ start := t - seq.mo
+ if seq.ml <= t-start {
+ // No overlap
+ copy(out[t:], out[start:start+seq.ml])
+ t += seq.ml
+ continue
+ } else {
+ // Overlapping copy
+ // Extend destination slice and copy one byte at the time.
+ src := out[start : start+seq.ml]
+ dst := out[t:]
+ dst = dst[:len(src)]
+ t += len(src)
+ // Destination is the space we just added.
+ for i := range src {
+ dst[i] = src[i]
+ }
+ }
+ }
+ }
+ // Add final literals
+ copy(out[t:], s.literals)
+ if debugDecoder {
+ t += len(s.literals)
+ if t != len(out) {
+ panic(fmt.Errorf("length mismatch, want %d, got %d, ss: %d", len(out), t, s.seqSize))
+ }
+ }
+ s.out = out
+
+ return nil
+}
+
+// decode sequences from the stream with the provided history.
+func (s *sequenceDecs) decodeSync(history *history) error {
+ br := s.br
+ seqs := s.nSeqs
startSize := len(s.out)
// Grab full sizes tables, to avoid bounds checks.
llTable, mlTable, ofTable := s.litLengths.fse.dt[:maxTablesize], s.matchLengths.fse.dt[:maxTablesize], s.offsets.fse.dt[:maxTablesize]
llState, mlState, ofState := s.litLengths.state.state, s.matchLengths.state.state, s.offsets.state.state
+ hist := history.b[history.ignoreBuffer:]
+ out := s.out
for i := seqs - 1; i >= 0; i-- {
if br.overread() {
@@ -151,7 +399,7 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
if temp == 0 {
// 0 is not valid; input is corrupted; force offset to 1
- println("temp was 0")
+ println("WARNING: temp was 0")
temp = 1
}
@@ -176,51 +424,49 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
if ll > len(s.literals) {
return fmt.Errorf("unexpected literal count, want %d bytes, but only %d is available", ll, len(s.literals))
}
- size := ll + ml + len(s.out)
+ size := ll + ml + len(out)
if size-startSize > maxBlockSize {
return fmt.Errorf("output (%d) bigger than max block size", size)
}
- if size > cap(s.out) {
+ if size > cap(out) {
// Not enough size, which can happen under high volume block streaming conditions
// but could be if destination slice is too small for sync operations.
// over-allocating here can create a large amount of GC pressure so we try to keep
// it as contained as possible
- used := len(s.out) - startSize
+ used := len(out) - startSize
addBytes := 256 + ll + ml + used>>2
// Clamp to max block size.
if used+addBytes > maxBlockSize {
addBytes = maxBlockSize - used
}
- s.out = append(s.out, make([]byte, addBytes)...)
- s.out = s.out[:len(s.out)-addBytes]
+ out = append(out, make([]byte, addBytes)...)
+ out = out[:len(out)-addBytes]
}
if ml > maxMatchLen {
return fmt.Errorf("match len (%d) bigger than max allowed length", ml)
}
// Add literals
- s.out = append(s.out, s.literals[:ll]...)
+ out = append(out, s.literals[:ll]...)
s.literals = s.literals[ll:]
- out := s.out
if mo == 0 && ml > 0 {
return fmt.Errorf("zero matchoff and matchlen (%d) > 0", ml)
}
- if mo > len(s.out)+len(hist) || mo > s.windowSize {
+ if mo > len(out)+len(hist) || mo > s.windowSize {
if len(s.dict) == 0 {
- return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(s.out)+len(hist))
+ return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(out)+len(hist))
}
// we may be in dictionary.
- dictO := len(s.dict) - (mo - (len(s.out) + len(hist)))
+ dictO := len(s.dict) - (mo - (len(out) + len(hist)))
if dictO < 0 || dictO >= len(s.dict) {
- return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(s.out)+len(hist))
+ return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(out)+len(hist))
}
end := dictO + ml
if end > len(s.dict) {
out = append(out, s.dict[dictO:]...)
- mo -= len(s.dict) - dictO
ml -= len(s.dict) - dictO
} else {
out = append(out, s.dict[dictO:end]...)
@@ -231,26 +477,25 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
// Copy from history.
// TODO: Blocks without history could be made to ignore this completely.
- if v := mo - len(s.out); v > 0 {
+ if v := mo - len(out); v > 0 {
// v is the start position in history from end.
- start := len(s.hist) - v
+ start := len(hist) - v
if ml > v {
// Some goes into current block.
// Copy remainder of history
- out = append(out, s.hist[start:]...)
- mo -= v
+ out = append(out, hist[start:]...)
ml -= v
} else {
- out = append(out, s.hist[start:start+ml]...)
+ out = append(out, hist[start:start+ml]...)
ml = 0
}
}
// We must be in current buffer now
if ml > 0 {
- start := len(s.out) - mo
- if ml <= len(s.out)-start {
+ start := len(out) - mo
+ if ml <= len(out)-start {
// No overlap
- out = append(out, s.out[start:start+ml]...)
+ out = append(out, out[start:start+ml]...)
} else {
// Overlapping copy
// Extend destination slice and copy one byte at the time.
@@ -264,7 +509,6 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
}
}
}
- s.out = out
if i == 0 {
// This is the last sequence, so we shouldn't update state.
break
@@ -292,8 +536,8 @@ func (s *sequenceDecs) decode(seqs int, br *bitReader, hist []byte) error {
}
// Add final literals
- s.out = append(s.out, s.literals...)
- return nil
+ s.out = append(out, s.literals...)
+ return br.close()
}
// update states, at least 27 bits must be available.
@@ -457,36 +701,3 @@ func (s *sequenceDecs) adjustOffset(offset, litLen int, offsetB uint8) int {
s.prevOffset[0] = temp
return temp
}
-
-// mergeHistory will merge history.
-func (s *sequenceDecs) mergeHistory(hist *sequenceDecs) (*sequenceDecs, error) {
- for i := uint(0); i < 3; i++ {
- var sNew, sHist *sequenceDec
- switch i {
- default:
- // same as "case 0":
- sNew = &s.litLengths
- sHist = &hist.litLengths
- case 1:
- sNew = &s.offsets
- sHist = &hist.offsets
- case 2:
- sNew = &s.matchLengths
- sHist = &hist.matchLengths
- }
- if sNew.repeat {
- if sHist.fse == nil {
- return nil, fmt.Errorf("sequence stream %d, repeat requested, but no history", i)
- }
- continue
- }
- if sNew.fse == nil {
- return nil, fmt.Errorf("sequence stream %d, no fse found", i)
- }
- if sHist.fse != nil && !sHist.fse.preDefined {
- fseDecoderPool.Put(sHist.fse)
- }
- sHist.fse = sNew.fse
- }
- return hist, nil
-}
diff --git a/vendor/github.com/klauspost/compress/zstd/zstd.go b/vendor/github.com/klauspost/compress/zstd/zstd.go
index ef1d49a00..0b0c2571d 100644
--- a/vendor/github.com/klauspost/compress/zstd/zstd.go
+++ b/vendor/github.com/klauspost/compress/zstd/zstd.go
@@ -75,6 +75,10 @@ var (
// This is only returned if SingleSegment is specified on the frame.
ErrFrameSizeExceeded = errors.New("frame size exceeded")
+ // ErrFrameSizeMismatch is returned if the stated frame size does not match the expected size.
+ // This is only returned if SingleSegment is specified on the frame.
+ ErrFrameSizeMismatch = errors.New("frame size does not match size on stream")
+
// ErrCRCMismatch is returned if CRC mismatches.
ErrCRCMismatch = errors.New("CRC check failed")