diff options
Diffstat (limited to 'vendor/github.com/klauspost/pgzip/gzip.go')
-rw-r--r-- | vendor/github.com/klauspost/pgzip/gzip.go | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/pgzip/gzip.go b/vendor/github.com/klauspost/pgzip/gzip.go new file mode 100644 index 000000000..85d14e9cb --- /dev/null +++ b/vendor/github.com/klauspost/pgzip/gzip.go @@ -0,0 +1,501 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pgzip + +import ( + "bytes" + "errors" + "fmt" + "hash" + "hash/crc32" + "io" + "sync" + "time" + + "github.com/klauspost/compress/flate" +) + +const ( + defaultBlockSize = 256 << 10 + tailSize = 16384 + defaultBlocks = 16 +) + +// These constants are copied from the flate package, so that code that imports +// "compress/gzip" does not also have to import "compress/flate". +const ( + NoCompression = flate.NoCompression + BestSpeed = flate.BestSpeed + BestCompression = flate.BestCompression + DefaultCompression = flate.DefaultCompression + ConstantCompression = flate.ConstantCompression + HuffmanOnly = flate.HuffmanOnly +) + +// A Writer is an io.WriteCloser. +// Writes to a Writer are compressed and written to w. +type Writer struct { + Header + w io.Writer + level int + wroteHeader bool + blockSize int + blocks int + currentBuffer []byte + prevTail []byte + digest hash.Hash32 + size int + closed bool + buf [10]byte + errMu sync.RWMutex + err error + pushedErr chan struct{} + results chan result + dictFlatePool sync.Pool + dstPool sync.Pool + wg sync.WaitGroup +} + +type result struct { + result chan []byte + notifyWritten chan struct{} +} + +// Use SetConcurrency to finetune the concurrency level if needed. +// +// With this you can control the approximate size of your blocks, +// as well as how many you want to be processing in parallel. +// +// Default values for this is SetConcurrency(250000, 16), +// meaning blocks are split at 250000 bytes and up to 16 blocks +// can be processing at once before the writer blocks. +func (z *Writer) SetConcurrency(blockSize, blocks int) error { + if blockSize <= tailSize { + return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize) + } + if blocks <= 0 { + return errors.New("gzip: blocks cannot be zero or less") + } + if blockSize == z.blockSize && blocks == z.blocks { + return nil + } + z.blockSize = blockSize + z.results = make(chan result, blocks) + z.blocks = blocks + z.dstPool = sync.Pool{New: func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }} + return nil +} + +// NewWriter returns a new Writer. +// Writes to the returned writer are compressed and written to w. +// +// It is the caller's responsibility to call Close on the WriteCloser when done. +// Writes may be buffered and not flushed until Close. +// +// Callers that wish to set the fields in Writer.Header must do so before +// the first call to Write or Close. The Comment and Name header fields are +// UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO +// 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an +// error on Write. +func NewWriter(w io.Writer) *Writer { + z, _ := NewWriterLevel(w, DefaultCompression) + return z +} + +// NewWriterLevel is like NewWriter but specifies the compression level instead +// of assuming DefaultCompression. +// +// The compression level can be DefaultCompression, NoCompression, or any +// integer value between BestSpeed and BestCompression inclusive. The error +// returned will be nil if the level is valid. +func NewWriterLevel(w io.Writer, level int) (*Writer, error) { + if level < ConstantCompression || level > BestCompression { + return nil, fmt.Errorf("gzip: invalid compression level: %d", level) + } + z := new(Writer) + z.SetConcurrency(defaultBlockSize, defaultBlocks) + z.init(w, level) + return z, nil +} + +// This function must be used by goroutines to set an +// error condition, since z.err access is restricted +// to the callers goruotine. +func (z *Writer) pushError(err error) { + z.errMu.Lock() + if z.err != nil { + z.errMu.Unlock() + return + } + z.err = err + close(z.pushedErr) + z.errMu.Unlock() +} + +func (z *Writer) init(w io.Writer, level int) { + z.wg.Wait() + digest := z.digest + if digest != nil { + digest.Reset() + } else { + digest = crc32.NewIEEE() + } + z.Header = Header{OS: 255} + z.w = w + z.level = level + z.digest = digest + z.pushedErr = make(chan struct{}, 0) + z.results = make(chan result, z.blocks) + z.err = nil + z.closed = false + z.Comment = "" + z.Extra = nil + z.ModTime = time.Time{} + z.wroteHeader = false + z.currentBuffer = nil + z.buf = [10]byte{} + z.prevTail = nil + z.size = 0 + if z.dictFlatePool.New == nil { + z.dictFlatePool.New = func() interface{} { + f, _ := flate.NewWriterDict(w, level, nil) + return f + } + } +} + +// Reset discards the Writer z's state and makes it equivalent to the +// result of its original state from NewWriter or NewWriterLevel, but +// writing to w instead. This permits reusing a Writer rather than +// allocating a new one. +func (z *Writer) Reset(w io.Writer) { + if z.results != nil && !z.closed { + close(z.results) + } + z.SetConcurrency(defaultBlockSize, defaultBlocks) + z.init(w, z.level) +} + +// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950). +func put2(p []byte, v uint16) { + p[0] = uint8(v >> 0) + p[1] = uint8(v >> 8) +} + +func put4(p []byte, v uint32) { + p[0] = uint8(v >> 0) + p[1] = uint8(v >> 8) + p[2] = uint8(v >> 16) + p[3] = uint8(v >> 24) +} + +// writeBytes writes a length-prefixed byte slice to z.w. +func (z *Writer) writeBytes(b []byte) error { + if len(b) > 0xffff { + return errors.New("gzip.Write: Extra data is too large") + } + put2(z.buf[0:2], uint16(len(b))) + _, err := z.w.Write(z.buf[0:2]) + if err != nil { + return err + } + _, err = z.w.Write(b) + return err +} + +// writeString writes a UTF-8 string s in GZIP's format to z.w. +// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1). +func (z *Writer) writeString(s string) (err error) { + // GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII. + needconv := false + for _, v := range s { + if v == 0 || v > 0xff { + return errors.New("gzip.Write: non-Latin-1 header string") + } + if v > 0x7f { + needconv = true + } + } + if needconv { + b := make([]byte, 0, len(s)) + for _, v := range s { + b = append(b, byte(v)) + } + _, err = z.w.Write(b) + } else { + _, err = io.WriteString(z.w, s) + } + if err != nil { + return err + } + // GZIP strings are NUL-terminated. + z.buf[0] = 0 + _, err = z.w.Write(z.buf[0:1]) + return err +} + +// compressCurrent will compress the data currently buffered +// This should only be called from the main writer/flush/closer +func (z *Writer) compressCurrent(flush bool) { + r := result{} + r.result = make(chan []byte, 1) + r.notifyWritten = make(chan struct{}, 0) + select { + case z.results <- r: + case <-z.pushedErr: + return + } + + // If block given is more than twice the block size, split it. + c := z.currentBuffer + if len(c) > z.blockSize*2 { + c = c[:z.blockSize] + z.wg.Add(1) + go z.compressBlock(c, z.prevTail, r, false) + z.prevTail = c[len(c)-tailSize:] + z.currentBuffer = z.currentBuffer[z.blockSize:] + z.compressCurrent(flush) + // Last one flushes if needed + return + } + + z.wg.Add(1) + go z.compressBlock(c, z.prevTail, r, z.closed) + if len(c) > tailSize { + z.prevTail = c[len(c)-tailSize:] + } else { + z.prevTail = nil + } + z.currentBuffer = z.dstPool.Get().([]byte) + z.currentBuffer = z.currentBuffer[:0] + + // Wait if flushing + if flush { + <-r.notifyWritten + } +} + +// Returns an error if it has been set. +// Cannot be used by functions that are from internal goroutines. +func (z *Writer) checkError() error { + z.errMu.RLock() + err := z.err + z.errMu.RUnlock() + return err +} + +// Write writes a compressed form of p to the underlying io.Writer. The +// compressed bytes are not necessarily flushed to output until +// the Writer is closed or Flush() is called. +// +// The function will return quickly, if there are unused buffers. +// The sent slice (p) is copied, and the caller is free to re-use the buffer +// when the function returns. +// +// Errors that occur during compression will be reported later, and a nil error +// does not signify that the compression succeeded (since it is most likely still running) +// That means that the call that returns an error may not be the call that caused it. +// Only Flush and Close functions are guaranteed to return any errors up to that point. +func (z *Writer) Write(p []byte) (int, error) { + if err := z.checkError(); err != nil { + return 0, err + } + // Write the GZIP header lazily. + if !z.wroteHeader { + z.wroteHeader = true + z.buf[0] = gzipID1 + z.buf[1] = gzipID2 + z.buf[2] = gzipDeflate + z.buf[3] = 0 + if z.Extra != nil { + z.buf[3] |= 0x04 + } + if z.Name != "" { + z.buf[3] |= 0x08 + } + if z.Comment != "" { + z.buf[3] |= 0x10 + } + put4(z.buf[4:8], uint32(z.ModTime.Unix())) + if z.level == BestCompression { + z.buf[8] = 2 + } else if z.level == BestSpeed { + z.buf[8] = 4 + } else { + z.buf[8] = 0 + } + z.buf[9] = z.OS + var n int + var err error + n, err = z.w.Write(z.buf[0:10]) + if err != nil { + z.pushError(err) + return n, err + } + if z.Extra != nil { + err = z.writeBytes(z.Extra) + if err != nil { + z.pushError(err) + return n, err + } + } + if z.Name != "" { + err = z.writeString(z.Name) + if err != nil { + z.pushError(err) + return n, err + } + } + if z.Comment != "" { + err = z.writeString(z.Comment) + if err != nil { + z.pushError(err) + return n, err + } + } + // Start receiving data from compressors + go func() { + listen := z.results + for { + r, ok := <-listen + // If closed, we are finished. + if !ok { + return + } + buf := <-r.result + n, err := z.w.Write(buf) + if err != nil { + z.pushError(err) + close(r.notifyWritten) + return + } + if n != len(buf) { + z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf))) + close(r.notifyWritten) + return + } + z.dstPool.Put(buf) + close(r.notifyWritten) + } + }() + z.currentBuffer = make([]byte, 0, z.blockSize) + } + q := p + for len(q) > 0 { + length := len(q) + if length+len(z.currentBuffer) > z.blockSize { + length = z.blockSize - len(z.currentBuffer) + } + z.digest.Write(q[:length]) + z.currentBuffer = append(z.currentBuffer, q[:length]...) + if len(z.currentBuffer) >= z.blockSize { + z.compressCurrent(false) + if err := z.checkError(); err != nil { + return len(p) - len(q) - length, err + } + } + z.size += length + q = q[length:] + } + return len(p), z.checkError() +} + +// Step 1: compresses buffer to buffer +// Step 2: send writer to channel +// Step 3: Close result channel to indicate we are done +func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) { + defer func() { + close(r.result) + z.wg.Done() + }() + buf := z.dstPool.Get().([]byte) + dest := bytes.NewBuffer(buf[:0]) + + compressor := z.dictFlatePool.Get().(*flate.Writer) + compressor.ResetDict(dest, prevTail) + compressor.Write(p) + + err := compressor.Flush() + if err != nil { + z.pushError(err) + return + } + if closed { + err = compressor.Close() + if err != nil { + z.pushError(err) + return + } + } + z.dictFlatePool.Put(compressor) + // Read back buffer + buf = dest.Bytes() + r.result <- buf +} + +// Flush flushes any pending compressed data to the underlying writer. +// +// It is useful mainly in compressed network protocols, to ensure that +// a remote reader has enough data to reconstruct a packet. Flush does +// not return until the data has been written. If the underlying +// writer returns an error, Flush returns that error. +// +// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH. +func (z *Writer) Flush() error { + if err := z.checkError(); err != nil { + return err + } + if z.closed { + return nil + } + if !z.wroteHeader { + _, err := z.Write(nil) + if err != nil { + return err + } + } + // We send current block to compression + z.compressCurrent(true) + + return z.checkError() +} + +// UncompressedSize will return the number of bytes written. +// pgzip only, not a function in the official gzip package. +func (z *Writer) UncompressedSize() int { + return z.size +} + +// Close closes the Writer, flushing any unwritten data to the underlying +// io.Writer, but does not close the underlying io.Writer. +func (z *Writer) Close() error { + if err := z.checkError(); err != nil { + return err + } + if z.closed { + return nil + } + + z.closed = true + if !z.wroteHeader { + z.Write(nil) + if err := z.checkError(); err != nil { + return err + } + } + z.compressCurrent(true) + if err := z.checkError(); err != nil { + return err + } + close(z.results) + put4(z.buf[0:4], z.digest.Sum32()) + put4(z.buf[4:8], uint32(z.size)) + _, err := z.w.Write(z.buf[0:8]) + if err != nil { + z.pushError(err) + return err + } + return nil +} |