diff options
Diffstat (limited to 'vendor/github.com/klauspost/compress/zstd')
9 files changed, 79 insertions, 38 deletions
diff --git a/vendor/github.com/klauspost/compress/zstd/decoder.go b/vendor/github.com/klauspost/compress/zstd/decoder.go index 6104eb793..78c10755f 100644 --- a/vendor/github.com/klauspost/compress/zstd/decoder.go +++ b/vendor/github.com/klauspost/compress/zstd/decoder.go @@ -35,6 +35,7 @@ type Decoder struct { br readerWrapper enabled bool inFrame bool + dstBuf []byte } frame *frameDec @@ -187,21 +188,23 @@ func (d *Decoder) Reset(r io.Reader) error { } // If bytes buffer and < 5MB, do sync decoding anyway. - if bb, ok := r.(byter); ok && bb.Len() < 5<<20 { + if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap { bb2 := bb if debugDecoder { println("*bytes.Buffer detected, doing sync decode, len:", bb.Len()) } b := bb2.Bytes() var dst []byte - if cap(d.current.b) > 0 { - dst = d.current.b + if cap(d.syncStream.dstBuf) > 0 { + dst = d.syncStream.dstBuf[:0] } - dst, err := d.DecodeAll(b, dst[:0]) + dst, err := d.DecodeAll(b, dst) if err == nil { err = io.EOF } + // Save output buffer + d.syncStream.dstBuf = dst d.current.b = dst d.current.err = err d.current.flushed = true @@ -216,6 +219,7 @@ func (d *Decoder) Reset(r io.Reader) error { d.current.err = nil d.current.flushed = false d.current.d = nil + d.syncStream.dstBuf = nil // Ensure no-one else is still running... d.streamWg.Wait() @@ -680,6 +684,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch if debugDecoder { println("Async 1: new history, recent:", block.async.newHist.recentOffsets) } + hist.reset() hist.decoders = block.async.newHist.decoders hist.recentOffsets = block.async.newHist.recentOffsets hist.windowSize = block.async.newHist.windowSize @@ -711,6 +716,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch seqExecute <- block } close(seqExecute) + hist.reset() }() var wg sync.WaitGroup @@ -734,6 +740,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch if debugDecoder { println("Async 2: new history") } + hist.reset() hist.windowSize = block.async.newHist.windowSize hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer if block.async.newHist.dict != nil { @@ -815,13 +822,14 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch if debugDecoder { println("decoder goroutines finished") } + hist.reset() }() + var hist history decodeStream: for { - var hist history var hasErr bool - + hist.reset() decodeBlock := func(block *blockDec) { if hasErr { if block != nil { @@ -937,5 +945,6 @@ decodeStream: } close(seqDecode) wg.Wait() + hist.reset() d.frame.history.b = frameHistCache } diff --git a/vendor/github.com/klauspost/compress/zstd/decoder_options.go b/vendor/github.com/klauspost/compress/zstd/decoder_options.go index 666c2715f..f42448e69 100644 --- a/vendor/github.com/klauspost/compress/zstd/decoder_options.go +++ b/vendor/github.com/klauspost/compress/zstd/decoder_options.go @@ -14,21 +14,23 @@ type DOption func(*decoderOptions) error // options retains accumulated state of multiple options. type decoderOptions struct { - lowMem bool - concurrent int - maxDecodedSize uint64 - maxWindowSize uint64 - dicts []dict - ignoreChecksum bool - limitToCap bool + lowMem bool + concurrent int + maxDecodedSize uint64 + maxWindowSize uint64 + dicts []dict + ignoreChecksum bool + limitToCap bool + decodeBufsBelow int } func (o *decoderOptions) setDefault() { *o = decoderOptions{ // use less ram: true for now, but may change. - lowMem: true, - concurrent: runtime.GOMAXPROCS(0), - maxWindowSize: MaxWindowSize, + lowMem: true, + concurrent: runtime.GOMAXPROCS(0), + maxWindowSize: MaxWindowSize, + decodeBufsBelow: 128 << 10, } if o.concurrent > 4 { o.concurrent = 4 @@ -126,6 +128,18 @@ func WithDecodeAllCapLimit(b bool) DOption { } } +// WithDecodeBuffersBelow will fully decode readers that have a +// `Bytes() []byte` and `Len() int` interface similar to bytes.Buffer. +// This typically uses less allocations but will have the full decompressed object in memory. +// Note that DecodeAllCapLimit will disable this, as well as giving a size of 0 or less. +// Default is 128KiB. +func WithDecodeBuffersBelow(size int) DOption { + return func(o *decoderOptions) error { + o.decodeBufsBelow = size + return nil + } +} + // IgnoreChecksum allows to forcibly ignore checksum checking. func IgnoreChecksum(b bool) DOption { return func(o *decoderOptions) error { diff --git a/vendor/github.com/klauspost/compress/zstd/enc_best.go b/vendor/github.com/klauspost/compress/zstd/enc_best.go index 96028ecd8..dbbb88d92 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_best.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_best.go @@ -32,6 +32,7 @@ type match struct { length int32 rep int32 est int32 + _ [12]byte // Aligned size to cache line: 4+4+4+4+4 bytes + 12 bytes padding = 32 bytes } const highScore = 25000 diff --git a/vendor/github.com/klauspost/compress/zstd/framedec.go b/vendor/github.com/klauspost/compress/zstd/framedec.go index 1559a2038..b6c505417 100644 --- a/vendor/github.com/klauspost/compress/zstd/framedec.go +++ b/vendor/github.com/klauspost/compress/zstd/framedec.go @@ -343,7 +343,7 @@ func (d *frameDec) consumeCRC() error { return nil } -// runDecoder will create a sync decoder that will decode a block of data. +// runDecoder will run the decoder for the remainder of the frame. func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { saved := d.history.b @@ -369,7 +369,7 @@ func (d *frameDec) runDecoder(dst []byte, dec *blockDec) ([]byte, error) { if debugDecoder { println("maxSyncLen:", d.history.decoders.maxSyncLen) } - if !d.o.limitToCap && uint64(cap(dst)-len(dst)) < d.history.decoders.maxSyncLen { + if !d.o.limitToCap && uint64(cap(dst)) < d.history.decoders.maxSyncLen { // Alloc for output dst2 := make([]byte, len(dst), d.history.decoders.maxSyncLen+compressedBlockOverAlloc) copy(dst2, dst) diff --git a/vendor/github.com/klauspost/compress/zstd/fse_decoder_amd64.go b/vendor/github.com/klauspost/compress/zstd/fse_decoder_amd64.go index c881d28d8..d04a829b0 100644 --- a/vendor/github.com/klauspost/compress/zstd/fse_decoder_amd64.go +++ b/vendor/github.com/klauspost/compress/zstd/fse_decoder_amd64.go @@ -21,7 +21,8 @@ type buildDtableAsmContext struct { // buildDtable_asm is an x86 assembly implementation of fseDecoder.buildDtable. // Function returns non-zero exit code on error. -// go:noescape +// +//go:noescape func buildDtable_asm(s *fseDecoder, ctx *buildDtableAsmContext) int // please keep in sync with _generate/gen_fse.go diff --git a/vendor/github.com/klauspost/compress/zstd/history.go b/vendor/github.com/klauspost/compress/zstd/history.go index 28b40153c..09164856d 100644 --- a/vendor/github.com/klauspost/compress/zstd/history.go +++ b/vendor/github.com/klauspost/compress/zstd/history.go @@ -37,24 +37,21 @@ func (h *history) reset() { h.ignoreBuffer = 0 h.error = false h.recentOffsets = [3]int{1, 4, 8} - if f := h.decoders.litLengths.fse; f != nil && !f.preDefined { - fseDecoderPool.Put(f) - } - if f := h.decoders.offsets.fse; f != nil && !f.preDefined { - fseDecoderPool.Put(f) - } - if f := h.decoders.matchLengths.fse; f != nil && !f.preDefined { - fseDecoderPool.Put(f) - } + h.decoders.freeDecoders() h.decoders = sequenceDecs{br: h.decoders.br} + h.freeHuffDecoder() + h.huffTree = nil + h.dict = nil + //printf("history created: %+v (l: %d, c: %d)", *h, len(h.b), cap(h.b)) +} + +func (h *history) freeHuffDecoder() { if h.huffTree != nil { if h.dict == nil || h.dict.litEnc != h.huffTree { huffDecoderPool.Put(h.huffTree) + h.huffTree = nil } } - h.huffTree = nil - h.dict = nil - //printf("history created: %+v (l: %d, c: %d)", *h, len(h.b), cap(h.b)) } func (h *history) setDict(dict *dict) { diff --git a/vendor/github.com/klauspost/compress/zstd/seqdec.go b/vendor/github.com/klauspost/compress/zstd/seqdec.go index df0447203..f833d1541 100644 --- a/vendor/github.com/klauspost/compress/zstd/seqdec.go +++ b/vendor/github.com/klauspost/compress/zstd/seqdec.go @@ -99,6 +99,21 @@ func (s *sequenceDecs) initialize(br *bitReader, hist *history, out []byte) erro return nil } +func (s *sequenceDecs) freeDecoders() { + if f := s.litLengths.fse; f != nil && !f.preDefined { + fseDecoderPool.Put(f) + s.litLengths.fse = nil + } + if f := s.offsets.fse; f != nil && !f.preDefined { + fseDecoderPool.Put(f) + s.offsets.fse = nil + } + if f := s.matchLengths.fse; f != nil && !f.preDefined { + fseDecoderPool.Put(f) + s.matchLengths.fse = nil + } +} + // execute will execute the decoded sequence with the provided history. // The sequence must be evaluated before being sent. func (s *sequenceDecs) execute(seqs []seqVals, hist []byte) error { @@ -299,7 +314,10 @@ func (s *sequenceDecs) decodeSync(hist []byte) error { } size := ll + ml + len(out) if size-startSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", size-startSize, maxBlockSize) + if size-startSize == 424242 { + panic("here") + } + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } if size > cap(out) { // Not enough size, which can happen under high volume block streaming conditions @@ -411,7 +429,7 @@ func (s *sequenceDecs) decodeSync(hist []byte) error { // Check if space for literals if size := len(s.literals) + len(s.out) - startSize; size > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", size, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } // Add final literals diff --git a/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go b/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go index 1c704d30c..191384adf 100644 --- a/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go +++ b/vendor/github.com/klauspost/compress/zstd/seqdec_amd64.go @@ -139,7 +139,7 @@ func (s *sequenceDecs) decodeSyncSimple(hist []byte) (bool, error) { if debugDecoder { println("msl:", s.maxSyncLen, "cap", cap(s.out), "bef:", startSize, "sz:", size-startSize, "mbs:", maxBlockSize, "outsz:", cap(s.out)-startSize) } - return true, fmt.Errorf("output (%d) bigger than max block size (%d)", size-startSize, maxBlockSize) + return true, fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) default: return true, fmt.Errorf("sequenceDecs_decode returned erronous code %d", errCode) @@ -147,7 +147,8 @@ func (s *sequenceDecs) decodeSyncSimple(hist []byte) (bool, error) { s.seqSize += ctx.litRemain if s.seqSize > maxBlockSize { - return true, fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return true, fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) + } err := br.close() if err != nil { @@ -289,7 +290,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { s.seqSize += ctx.litRemain if s.seqSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } err := br.close() if err != nil { diff --git a/vendor/github.com/klauspost/compress/zstd/seqdec_generic.go b/vendor/github.com/klauspost/compress/zstd/seqdec_generic.go index c3452bc3a..ac2a80d29 100644 --- a/vendor/github.com/klauspost/compress/zstd/seqdec_generic.go +++ b/vendor/github.com/klauspost/compress/zstd/seqdec_generic.go @@ -111,7 +111,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { } s.seqSize += ll + ml if s.seqSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } litRemain -= ll if litRemain < 0 { @@ -149,7 +149,7 @@ func (s *sequenceDecs) decode(seqs []seqVals) error { } s.seqSize += litRemain if s.seqSize > maxBlockSize { - return fmt.Errorf("output (%d) bigger than max block size (%d)", s.seqSize, maxBlockSize) + return fmt.Errorf("output bigger than max block size (%d)", maxBlockSize) } err := br.close() if err != nil { |