Skip to content

Commit

Permalink
Implement zlib compression (#1487)
Browse files Browse the repository at this point in the history
Implemented the SQL compression protocol. This new feature is enabled by:

* Adding `compress=true` in DSN.
* `cfg.Apply(Compress(True))`

Co-authored-by: Brigitte Lamarche <[email protected]>
Co-authored-by: Julien Schmidt <[email protected]>
Co-authored-by: Jeffrey Charles <[email protected]>
Co-authored-by: Jeff Hodges <[email protected]>
Co-authored-by: Daniel Montoya <[email protected]>
Co-authored-by: Justin Li <[email protected]>
Co-authored-by: Dave Stubbs <[email protected]>
Co-authored-by: Linh Tran Tuan <[email protected]>
Co-authored-by: Robert R. Russell <[email protected]>
Co-authored-by: INADA Naoki <[email protected]>
Co-authored-by: Kieron Woodhouse <[email protected]>
Co-authored-by: Alexey Palazhchenko <[email protected]>
Co-authored-by: Reed Allman <[email protected]>
Co-authored-by: Joe Mann <[email protected]>
  • Loading branch information
15 people authored Dec 19, 2024
1 parent c9f41c0 commit 3348e57
Show file tree
Hide file tree
Showing 17 changed files with 581 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
my-cnf: |
innodb_log_file_size=256MB
innodb_buffer_pool_size=512MB
max_allowed_packet=16MB
max_allowed_packet=48MB
; TestConcurrent fails if max_connections is too large
max_connections=50
local_infile=1
Expand Down
2 changes: 2 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Animesh Ray <mail.rayanimesh at gmail.com>
Arne Hormann <arnehormann at gmail.com>
Ariel Mashraki <ariel at mashraki.co.il>
Asta Xie <xiemengjun at gmail.com>
B Lamarche <blam413 at gmail.com>
Brian Hendriks <brian at dolthub.com>
Bulat Gaifullin <gaifullinbf at gmail.com>
Caine Jette <jette at alum.mit.edu>
Expand Down Expand Up @@ -62,6 +63,7 @@ Jennifer Purevsuren <jennifer at dolthub.com>
Jerome Meyer <jxmeyer at gmail.com>
Jiajia Zhong <zhong2plus at gmail.com>
Jian Zhen <zhenjl at gmail.com>
Joe Mann <contact at joemann.co.uk>
Joshua Prunier <joshua.prunier at gmail.com>
Julien Lefevre <julien.lefevr at gmail.com>
Julien Schmidt <go-sql-driver at julienschmidt.com>
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ A MySQL-Driver for Go's [database/sql](https://golang.org/pkg/database/sql/) pac
* Secure `LOAD DATA LOCAL INFILE` support with file allowlisting and `io.Reader` support
* Optional `time.Time` parsing
* Optional placeholder interpolation
* Supports zlib compression.

## Requirements

Expand Down Expand Up @@ -267,6 +268,16 @@ SELECT u.id FROM users as u

will return `u.id` instead of just `id` if `columnsWithAlias=true`.

##### `compress`

```
Type: bool
Valid Values: true, false
Default: false
```

Toggles zlib compression. false by default.

##### `interpolateParams`

```
Expand Down
28 changes: 20 additions & 8 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func (tb *TB) checkStmt(stmt *sql.Stmt, err error) *sql.Stmt {
return stmt
}

func initDB(b *testing.B, queries ...string) *sql.DB {
func initDB(b *testing.B, useCompression bool, queries ...string) *sql.DB {
tb := (*TB)(b)
db := tb.checkDB(sql.Open(driverNameTest, dsn))
comprStr := ""
if useCompression {
comprStr = "&compress=1"
}
db := tb.checkDB(sql.Open(driverNameTest, dsn+comprStr))
for _, query := range queries {
if _, err := db.Exec(query); err != nil {
b.Fatalf("error on %q: %v", query, err)
Expand All @@ -60,10 +64,18 @@ func initDB(b *testing.B, queries ...string) *sql.DB {
const concurrencyLevel = 10

func BenchmarkQuery(b *testing.B) {
benchmarkQueryHelper(b, false)
}

func BenchmarkQueryCompression(b *testing.B) {
benchmarkQueryHelper(b, true)
}

func benchmarkQueryHelper(b *testing.B, compr bool) {
tb := (*TB)(b)
b.StopTimer()
b.ReportAllocs()
db := initDB(b,
db := initDB(b, compr,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down Expand Up @@ -222,7 +234,7 @@ func BenchmarkInterpolation(b *testing.B) {
},
maxAllowedPacket: maxPacketSize,
maxWriteSize: maxPacketSize - 1,
buf: newBuffer(nil),
buf: newBuffer(),
}

args := []driver.Value{
Expand Down Expand Up @@ -269,7 +281,7 @@ func benchmarkQueryContext(b *testing.B, db *sql.DB, p int) {
}

func BenchmarkQueryContext(b *testing.B) {
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand Down Expand Up @@ -305,7 +317,7 @@ func benchmarkExecContext(b *testing.B, db *sql.DB, p int) {
}

func BenchmarkExecContext(b *testing.B) {
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val CHAR(50))",
`INSERT INTO foo VALUES (1, "one")`,
Expand All @@ -323,7 +335,7 @@ func BenchmarkExecContext(b *testing.B) {
// "size=" means size of each blobs.
func BenchmarkQueryRawBytes(b *testing.B) {
var sizes []int = []int{100, 1000, 2000, 4000, 8000, 12000, 16000, 32000, 64000, 256000}
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS bench_rawbytes",
"CREATE TABLE bench_rawbytes (id INT PRIMARY KEY, val LONGBLOB)",
)
Expand Down Expand Up @@ -376,7 +388,7 @@ func BenchmarkQueryRawBytes(b *testing.B) {
// BenchmarkReceiveMassiveRows measures performance of receiving large number of rows.
func BenchmarkReceiveMassiveRows(b *testing.B) {
// Setup -- prepare 10000 rows.
db := initDB(b,
db := initDB(b, false,
"DROP TABLE IF EXISTS foo",
"CREATE TABLE foo (id INT PRIMARY KEY, val TEXT)")
defer db.Close()
Expand Down
26 changes: 10 additions & 16 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ package mysql

import (
"io"
"net"
"time"
)

const defaultBufSize = 4096
const maxCachedBufSize = 256 * 1024

// readerFunc is a function that compatible with io.Reader.
// We use this function type instead of io.Reader because we want to
// just pass mc.readWithTimeout.
type readerFunc func([]byte) (int, error)

// A buffer which is used for both reading and writing.
// This is possible since communication on each connection is synchronous.
// In other words, we can't write and read simultaneously on the same connection.
Expand All @@ -25,15 +28,12 @@ const maxCachedBufSize = 256 * 1024
type buffer struct {
buf []byte // read buffer.
cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize.
nc net.Conn
timeout time.Duration
}

// newBuffer allocates and returns a new buffer.
func newBuffer(nc net.Conn) buffer {
func newBuffer() buffer {
return buffer{
cachedBuf: make([]byte, defaultBufSize),
nc: nc,
}
}

Expand All @@ -43,7 +43,7 @@ func (b *buffer) busy() bool {
}

// fill reads into the read buffer until at least _need_ bytes are in it.
func (b *buffer) fill(need int) error {
func (b *buffer) fill(need int, r readerFunc) error {
// we'll move the contents of the current buffer to dest before filling it.
dest := b.cachedBuf

Expand All @@ -64,13 +64,7 @@ func (b *buffer) fill(need int) error {
copy(dest[:n], b.buf)

for {
if b.timeout > 0 {
if err := b.nc.SetReadDeadline(time.Now().Add(b.timeout)); err != nil {
return err
}
}

nn, err := b.nc.Read(dest[n:])
nn, err := r(dest[n:])
n += nn

if err == nil && n < need {
Expand All @@ -92,10 +86,10 @@ func (b *buffer) fill(need int) error {

// returns next N bytes from buffer.
// The returned slice is only guaranteed to be valid until the next read
func (b *buffer) readNext(need int) ([]byte, error) {
func (b *buffer) readNext(need int, r readerFunc) ([]byte, error) {
if len(b.buf) < need {
// refill
if err := b.fill(need); err != nil {
if err := b.fill(need, r); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 3348e57

Please sign in to comment.