diff options
Diffstat (limited to 'vendor/github.com/klauspost/pgzip/gzip.go')
-rw-r--r-- | vendor/github.com/klauspost/pgzip/gzip.go | 76 |
1 files changed, 47 insertions, 29 deletions
diff --git a/vendor/github.com/klauspost/pgzip/gzip.go b/vendor/github.com/klauspost/pgzip/gzip.go index 85d14e9cb..bb2e33941 100644 --- a/vendor/github.com/klauspost/pgzip/gzip.go +++ b/vendor/github.com/klauspost/pgzip/gzip.go @@ -11,6 +11,7 @@ import ( "hash" "hash/crc32" "io" + "runtime" "sync" "time" @@ -18,9 +19,9 @@ import ( ) const ( - defaultBlockSize = 256 << 10 + defaultBlockSize = 1 << 20 tailSize = 16384 - defaultBlocks = 16 + defaultBlocks = 4 ) // These constants are copied from the flate package, so that code that imports @@ -68,8 +69,8 @@ type result struct { // With this you can control the approximate size of your blocks, // as well as how many you want to be processing in parallel. // -// Default values for this is SetConcurrency(250000, 16), -// meaning blocks are split at 250000 bytes and up to 16 blocks +// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)), +// meaning blocks are split at 1 MB and up to the number of CPU threads // can be processing at once before the writer blocks. func (z *Writer) SetConcurrency(blockSize, blocks int) error { if blockSize <= tailSize { @@ -115,7 +116,7 @@ func NewWriterLevel(w io.Writer, level int) (*Writer, error) { return nil, fmt.Errorf("gzip: invalid compression level: %d", level) } z := new(Writer) - z.SetConcurrency(defaultBlockSize, defaultBlocks) + z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)) z.init(w, level) return z, nil } @@ -174,7 +175,7 @@ func (z *Writer) Reset(w io.Writer) { if z.results != nil && !z.closed { close(z.results) } - z.SetConcurrency(defaultBlockSize, defaultBlocks) + z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)) z.init(w, z.level) } @@ -239,36 +240,36 @@ func (z *Writer) writeString(s string) (err error) { // compressCurrent will compress the data currently buffered // This should only be called from the main writer/flush/closer func (z *Writer) compressCurrent(flush bool) { + c := z.currentBuffer + if len(c) > z.blockSize { + // This can never happen through the public interface. + panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)") + } + r := result{} r.result = make(chan []byte, 1) r.notifyWritten = make(chan struct{}, 0) + // Reserve a result slot select { case z.results <- r: case <-z.pushedErr: return } - // If block given is more than twice the block size, split it. - c := z.currentBuffer - if len(c) > z.blockSize*2 { - c = c[:z.blockSize] - z.wg.Add(1) - go z.compressBlock(c, z.prevTail, r, false) - z.prevTail = c[len(c)-tailSize:] - z.currentBuffer = z.currentBuffer[z.blockSize:] - z.compressCurrent(flush) - // Last one flushes if needed - return - } - z.wg.Add(1) - go z.compressBlock(c, z.prevTail, r, z.closed) + tail := z.prevTail if len(c) > tailSize { - z.prevTail = c[len(c)-tailSize:] + buf := z.dstPool.Get().([]byte) // Put in .compressBlock + // Copy tail from current buffer before handing the buffer over to the + // compressBlock goroutine. + buf = append(buf[:0], c[len(c)-tailSize:]...) + z.prevTail = buf } else { z.prevTail = nil } - z.currentBuffer = z.dstPool.Get().([]byte) + go z.compressBlock(c, tail, r, z.closed) + + z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock z.currentBuffer = z.currentBuffer[:0] // Wait if flushing @@ -358,29 +359,37 @@ func (z *Writer) Write(p []byte) (int, error) { // Start receiving data from compressors go func() { listen := z.results + var failed bool for { r, ok := <-listen // If closed, we are finished. if !ok { return } + if failed { + close(r.notifyWritten) + continue + } buf := <-r.result n, err := z.w.Write(buf) if err != nil { z.pushError(err) close(r.notifyWritten) - return + failed = true + continue } if n != len(buf) { z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf))) + failed = true close(r.notifyWritten) - return + continue } z.dstPool.Put(buf) close(r.notifyWritten) } }() - z.currentBuffer = make([]byte, 0, z.blockSize) + z.currentBuffer = z.dstPool.Get().([]byte) + z.currentBuffer = z.currentBuffer[:0] } q := p for len(q) > 0 { @@ -390,7 +399,10 @@ func (z *Writer) Write(p []byte) (int, error) { } z.digest.Write(q[:length]) z.currentBuffer = append(z.currentBuffer, q[:length]...) - if len(z.currentBuffer) >= z.blockSize { + if len(z.currentBuffer) > z.blockSize { + panic("z.currentBuffer too large (most likely due to concurrent Write race)") + } + if len(z.currentBuffer) == z.blockSize { z.compressCurrent(false) if err := z.checkError(); err != nil { return len(p) - len(q) - length, err @@ -410,12 +422,13 @@ func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) { close(r.result) z.wg.Done() }() - buf := z.dstPool.Get().([]byte) + buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer dest := bytes.NewBuffer(buf[:0]) - compressor := z.dictFlatePool.Get().(*flate.Writer) + compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below compressor.ResetDict(dest, prevTail) compressor.Write(p) + z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent err := compressor.Flush() if err != nil { @@ -429,7 +442,12 @@ func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) { return } } - z.dictFlatePool.Put(compressor) + z.dictFlatePool.Put(compressor) // Get above + + if prevTail != nil { + z.dstPool.Put(prevTail) // Get in .compressCurrent + } + // Read back buffer buf = dest.Bytes() r.result <- buf |