Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgx: Support DisableTx #19

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dbump_pgx/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ go 1.18

require (
github.com/cristalhq/dbump v0.14.0
github.com/jackc/pgx/v5 v5.4.3
github.com/jackc/pgx/v5 v5.5.5
)

require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/text v0.9.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/text v0.15.0 // indirect
)
18 changes: 10 additions & 8 deletions dbump_pgx/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY=
github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
46 changes: 31 additions & 15 deletions dbump_pgx/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ type Migrator struct {

// Config for the migrator.
type Config struct {
// Schema for the dbump version table. Default is empty which means "public" schema.
// Schema for the dbump version table.
// Default is empty which means "public" schema.
Schema string
// Table for the dbump version table. Default is empty which means "_dbump_log" table.
// Table for the dbump version table.
// Default is empty which means "_dbump_log" table.
Table string

// [schema.]table
tableName string
// to prevent multiple migrations running at the same time
lockNum int64

_ struct{} // enforce explicit field names.
}

// NewMigrator instantiates new Migrator.
Expand All @@ -49,14 +53,15 @@ func NewMigrator(conn *pgx.Conn, cfg Config) *Migrator {
}
}

// Init is a method from Migrator interface.
// Init is a method from [dbump.Migrator] interface.
func (pg *Migrator) Init(ctx context.Context) error {
var query string
if pg.cfg.Schema != "" {
query = fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS %s;`, pg.cfg.Schema)
}

query += fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
id BIGSERIAL PRIMARY KEY,
version BIGINT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL
);`, pg.cfg.tableName)
Expand All @@ -65,7 +70,7 @@ func (pg *Migrator) Init(ctx context.Context) error {
return err
}

// Drop is a method from Migrator interface.
// Drop is a method from [dbump.Migrator] interface.
func (pg *Migrator) Drop(ctx context.Context) error {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s;`, pg.cfg.tableName)

Expand All @@ -77,19 +82,19 @@ func (pg *Migrator) Drop(ctx context.Context) error {
return err
}

// LockDB is a method from Migrator interface.
// LockDB is a method from [dbump.Migrator] interface.
func (pg *Migrator) LockDB(ctx context.Context) error {
_, err := pg.conn.Exec(ctx, "SELECT pg_advisory_lock($1);", pg.cfg.lockNum)
return err
}

// UnlockDB is a method from Migrator interface.
// UnlockDB is a method from [dbump.Migrator] interface.
func (pg *Migrator) UnlockDB(ctx context.Context) error {
_, err := pg.conn.Exec(ctx, "SELECT pg_advisory_unlock($1);", pg.cfg.lockNum)
return err
}

// Version is a method from Migrator interface.
// Version is a method from [dbump.Migrator] interface.
func (pg *Migrator) Version(ctx context.Context) (version int, err error) {
query := fmt.Sprintf("SELECT version FROM %s ORDER BY created_at DESC LIMIT 1;", pg.cfg.tableName)
row := pg.conn.QueryRow(ctx, query)
Expand All @@ -100,7 +105,7 @@ func (pg *Migrator) Version(ctx context.Context) (version int, err error) {
return version, err
}

// Version is a method from Migrator interface.
// DoStep is a method from [dbump.Migrator] interface.
func (pg *Migrator) DoStep(ctx context.Context, step dbump.Step) error {
if step.DisableTx {
if _, err := pg.conn.Exec(ctx, step.Query); err != nil {
Expand All @@ -111,14 +116,25 @@ func (pg *Migrator) DoStep(ctx context.Context, step dbump.Step) error {
return err
}

return pgx.BeginFunc(ctx, pg.conn, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, step.Query); err != nil {
return err
}
query := fmt.Sprintf("INSERT INTO %s (version, created_at) VALUES ($1, NOW());", pg.cfg.tableName)
_, err := tx.Exec(ctx, query, step.Version)
tx, err := pg.conn.Begin(ctx)
if err != nil {
return err
}

if _, err := tx.Exec(ctx, step.Query); err != nil {
return err
})
}

query := fmt.Sprintf("INSERT INTO %s (version, created_at) VALUES ($1, NOW());", pg.cfg.tableName)
if _, err := tx.Exec(ctx, query, step.Version); err != nil {
return err
}

// TODO: Rollback
if err := tx.Commit(ctx); err != nil {
return err
}
return nil
}

func hashTableName(s string) int64 {
Expand Down
14 changes: 8 additions & 6 deletions dbump_pgx/pgx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
var conn *pgx.Conn

func init() {
host := envOrDef("DBUMP_PG_HOST", "localhost")
port := envOrDef("DBUMP_PG_PORT", "5432")
username := envOrDef("DBUMP_PG_USER", "postgres")
password := envOrDef("DBUMP_PG_PASS", "postgres")
db := envOrDef("DBUMP_PG_DB", "postgres")
sslmode := envOrDef("DBUMP_PG_SSL", "disable")
var (
host = envOrDef("DBUMP_PG_HOST", "localhost")
port = envOrDef("DBUMP_PG_PORT", "5432")
username = envOrDef("DBUMP_PG_USER", "postgres")
password = envOrDef("DBUMP_PG_PASS", "postgres")
db = envOrDef("DBUMP_PG_DB", "postgres")
sslmode = envOrDef("DBUMP_PG_SSL", "disable")
)

dsn := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
host, port, username, password, db, sslmode)
Expand Down
Loading