- 
                Notifications
    
You must be signed in to change notification settings  - Fork 2.3k
 
packets: implemented compression protocol #649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
e6c682c
              77f6792
              a0cf94b
              4cdff28
              d0ea1a4
              477c9f8
              996ed2d
              f74faed
              b3a093e
              5eaa5ff
              ee46028
              93aed73
              4f10ee5
              59b0f90
              3fbf53a
              f9c6a2c
              6046bf0
              385673a
              9031984
              6992fad
              386f84b
              f853432
              d1a8b86
              3167920
              fb33a2c
              f174605
              dbd1e2b
              3e12e32
              17a06f1
              60bdaec
              422ab6f
              26ea544
              f339392
              3e559a8
              6ceaef6
              97afd8d
              f617170
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,217 @@ | ||
| package mysql | ||
| 
     | 
||
| import ( | ||
| "bytes" | ||
| "compress/zlib" | ||
| "io" | ||
| ) | ||
| 
     | 
||
| const ( | ||
| minCompressLength = 50 | ||
| ) | ||
| 
     | 
||
| type packetReader interface { | ||
| readNext(need int) ([]byte, error) | ||
| } | ||
| 
     | 
||
| type compressedReader struct { | ||
| buf packetReader | ||
| bytesBuf []byte | ||
| mc *mysqlConn | ||
| } | ||
| 
     | 
||
| type compressedWriter struct { | ||
| connWriter io.Writer | ||
| mc *mysqlConn | ||
| } | ||
| 
     | 
||
| func NewCompressedReader(buf packetReader, mc *mysqlConn) *compressedReader { | ||
| return &compressedReader{ | ||
| buf: buf, | ||
| bytesBuf: make([]byte, 0), | ||
| mc: mc, | ||
| } | ||
| } | ||
| 
     | 
||
| func NewCompressedWriter(connWriter io.Writer, mc *mysqlConn) *compressedWriter { | ||
                
       | 
||
| return &compressedWriter{ | ||
| connWriter: connWriter, | ||
| mc: mc, | ||
| } | ||
| } | ||
| 
     | 
||
| func (cr *compressedReader) readNext(need int) ([]byte, error) { | ||
| for len(cr.bytesBuf) < need { | ||
| err := cr.uncompressPacket() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
| 
     | 
||
| data := make([]byte, need) | ||
| 
     | 
||
| copy(data, cr.bytesBuf[:len(data)]) | ||
| 
     | 
||
| cr.bytesBuf = cr.bytesBuf[len(data):] | ||
| 
     | 
||
| return data, nil | ||
                
      
                  methane marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| } | ||
| 
     | 
||
| func (cr *compressedReader) uncompressPacket() error { | ||
| header, err := cr.buf.readNext(7) // size of compressed header | ||
| 
     | 
||
| if err != nil { | ||
| return err | ||
| } | ||
| 
     | 
||
| // compressed header structure | ||
| comprLength := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) | ||
| uncompressedLength := int(uint32(header[4]) | uint32(header[5])<<8 | uint32(header[6])<<16) | ||
| compressionSequence := uint8(header[3]) | ||
| 
     | 
||
| if compressionSequence != cr.mc.compressionSequence { | ||
| return ErrPktSync | ||
| } | ||
| 
     | 
||
| cr.mc.compressionSequence++ | ||
| 
     | 
||
| comprData, err := cr.buf.readNext(comprLength) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| 
     | 
||
| // if payload is uncompressed, its length will be specified as zero, and its | ||
| // true length is contained in comprLength | ||
| if uncompressedLength == 0 { | ||
| cr.bytesBuf = append(cr.bytesBuf, comprData...) | ||
| return nil | ||
| } | ||
| 
     | 
||
| // write comprData to a bytes.buffer, then read it using zlib into data | ||
| var b bytes.Buffer | ||
| b.Write(comprData) | ||
| r, err := zlib.NewReader(&b) | ||
                
      
                  methane marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| if r != nil { | ||
| defer r.Close() | ||
| } | ||
| 
     | 
||
| if err != nil { | ||
| return err | ||
| } | ||
| 
     | 
||
| data := make([]byte, uncompressedLength) | ||
| lenRead := 0 | ||
                
      
                  methane marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| // http://grokbase.com/t/gg/golang-nuts/146y9ppn6b/go-nuts-stream-compression-with-compress-flate | ||
| for lenRead < uncompressedLength { | ||
| 
     | 
||
| tmp := data[lenRead:] | ||
| 
     | 
||
| n, err := r.Read(tmp) | ||
                
      
                  methane marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| lenRead += n | ||
| 
     | 
||
| if err == io.EOF { | ||
| if lenRead < uncompressedLength { | ||
| return io.ErrUnexpectedEOF | ||
| } | ||
| break | ||
| } | ||
| 
     | 
||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| 
     | 
||
| cr.bytesBuf = append(cr.bytesBuf, data...) | ||
| 
     | 
||
| return nil | ||
| } | ||
| 
     | 
||
| func (cw *compressedWriter) Write(data []byte) (int, error) { | ||
| // when asked to write an empty packet, do nothing | ||
| if len(data) == 0 { | ||
| return 0, nil | ||
| } | ||
| totalBytes := len(data) | ||
| 
     | 
||
| length := len(data) - 4 | ||
| 
     | 
||
| maxPayloadLength := maxPacketSize - 4 | ||
| 
     | 
||
| for length >= maxPayloadLength { | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why both of  maxPayload := maxPacketSize - 4
for len(data) > maxPayload {
...
    data = data[maxPayload:]
} | 
||
| // cut off a slice of size max payload length | ||
| dataSmall := data[:maxPayloadLength] | ||
| lenSmall := len(dataSmall) | ||
| 
     | 
||
| var b bytes.Buffer | ||
| writer := zlib.NewWriter(&b) | ||
                
       | 
||
| _, err := writer.Write(dataSmall) | ||
| writer.Close() | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| 
     | 
||
| err = cw.writeComprPacketToNetwork(b.Bytes(), lenSmall) | ||
                
      
                  methane marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| 
     | 
||
| length -= maxPayloadLength | ||
| data = data[maxPayloadLength:] | ||
| } | ||
| 
     | 
||
| lenSmall := len(data) | ||
| 
     | 
||
| // do not compress if packet is too small | ||
| if lenSmall < minCompressLength { | ||
| err := cw.writeComprPacketToNetwork(data, 0) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| 
     | 
||
| return totalBytes, nil | ||
| } | ||
| 
     | 
||
| var b bytes.Buffer | ||
| writer := zlib.NewWriter(&b) | ||
| 
     | 
||
| _, err := writer.Write(data) | ||
| writer.Close() | ||
| 
     | 
||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| 
     | 
||
| err = cw.writeComprPacketToNetwork(b.Bytes(), lenSmall) | ||
| 
     | 
||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| return totalBytes, nil | ||
| } | ||
| 
     | 
||
| func (cw *compressedWriter) writeComprPacketToNetwork(data []byte, uncomprLength int) error { | ||
                
       | 
||
| data = append([]byte{0, 0, 0, 0, 0, 0, 0}, data...) | ||
                
      
                  methane marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
                
       | 
||
| comprLength := len(data) - 7 | ||
| 
     | 
||
| // compression header | ||
| data[0] = byte(0xff & comprLength) | ||
| data[1] = byte(0xff & (comprLength >> 8)) | ||
| data[2] = byte(0xff & (comprLength >> 16)) | ||
| 
     | 
||
| data[3] = cw.mc.compressionSequence | ||
| 
     | 
||
| //this value is never greater than maxPayloadLength | ||
| data[4] = byte(0xff & uncomprLength) | ||
| data[5] = byte(0xff & (uncomprLength >> 8)) | ||
| data[6] = byte(0xff & (uncomprLength >> 16)) | ||
| 
     | 
||
| if _, err := cw.connWriter.Write(data); err != nil { | ||
| return err | ||
| } | ||
| 
     | 
||
| cw.mc.compressionSequence++ | ||
| return nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used also when no compression is used. It is probably better to move this interface to
connection.go