Skip to content

Commit 9c3779d

Browse files
authored
BREAKING CHANGES: Add support for transactions and support for pgmq-pg17 docker image (#67)
1 parent 0a6b880 commit 9c3779d

File tree

8 files changed

+613
-417
lines changed

8 files changed

+613
-417
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
.PHONY: mockgen
22
mockgen:
3-
mockgen --destination mocks/row.go --package=mocks --build_flags=--mod=mod github.com/jackc/pgx/v5 Row
4-
mockgen --source=pgmq.go --destination mocks/pgmq.go --package=mocks
3+
go run go.uber.org/mock/mockgen --destination mocks/row.go --package=mocks --build_flags=--mod=mod github.com/jackc/pgx/v5 Row
4+
go run go.uber.org/mock/mockgen --source=pgmq.go --destination mocks/pgmq.go --package=mocks
55

66
.PHONY: test
77
test: mockgen

README.md

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ A Go (Golang) client for
99
[Postgres Message Queue](https://github.com/tembo-io/pgmq) (PGMQ). Based loosely
1010
on the [Rust client](https://github.com/tembo-io/pgmq/tree/main/pgmq-rs).
1111

12+
`pgmq-go` works with [pgx](https://github.com/jackc/pgx). The second argument of most functions only needs to satisfy the [DB](https://pkg.go.dev/github.com/craigpastro/pgmq-go#DB) interface, which means it can take, among others, a `*pgx.Conn`, `*pgxpool.Pool`, or `pgx.Tx`.
13+
1214
## Usage
1315

1416
Start a Postgres instance with the PGMQ extension installed:
@@ -32,33 +34,53 @@ import (
3234
func main() {
3335
ctx := context.Background()
3436

35-
q, err := pgmq.New(ctx, "postgres://postgres:password@localhost:5432/postgres")
37+
pool, err := pgmq.NewPgxPool(ctx, "postgres://postgres:password@localhost:5432/postgres")
38+
if err != nil {
39+
panic(err)
40+
}
41+
42+
err = pgmq.CreatePGMQExtension(ctx, pool)
43+
if err != nil {
44+
panic(err)
45+
}
46+
47+
err = pgmq.CreateQueue(ctx, pool, "my_queue")
3648
if err != nil {
3749
panic(err)
3850
}
3951

40-
err = q.CreateQueue(ctx, "my_queue")
52+
// We can perform various queue operations using a transaction.
53+
tx, err := pool.Begin(ctx)
4154
if err != nil {
4255
panic(err)
4356
}
4457

45-
id, err := q.Send(ctx, "my_queue", json.RawMessage(`{"foo": "bar"}`))
58+
id, err := pgmq.Send(ctx, tx, "my_queue", json.RawMessage(`{"foo": "bar"}`))
4659
if err != nil {
4760
panic(err)
4861
}
4962

50-
msg, err := q.Read(ctx, "my_queue", 30)
63+
msg, err := pgmq.Read(ctx, tx, "my_queue", 30)
5164
if err != nil {
5265
panic(err)
5366
}
5467

5568
// Archive the message by moving it to the "pgmq.a_<queue_name>" table.
5669
// Alternatively, you can `Delete` the message, or read and delete in one
5770
// call by using `Pop`.
58-
_, err = q.Archive(ctx, "my_queue", id)
71+
_, err = pgmq.Archive(ctx, tx, "my_queue", id)
5972
if err != nil {
6073
panic(err)
6174
}
75+
76+
// Commit the transaction.
77+
err = tx.Commit(ctx)
78+
if err != nil {
79+
panic(err)
80+
}
81+
82+
// Close the connection pool.
83+
pool.Close()
6284
}
6385
```
6486

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,13 @@ require (
6363
go.opentelemetry.io/otel/sdk v1.21.0 // indirect
6464
go.opentelemetry.io/otel/trace v1.31.0 // indirect
6565
golang.org/x/crypto v0.31.0 // indirect
66+
golang.org/x/mod v0.18.0 // indirect
6667
golang.org/x/net v0.26.0 // indirect
6768
golang.org/x/sync v0.10.0 // indirect
6869
golang.org/x/sys v0.28.0 // indirect
6970
golang.org/x/text v0.21.0 // indirect
7071
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
72+
golang.org/x/tools v0.22.0 // indirect
7173
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
7274
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
7375
google.golang.org/protobuf v1.33.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
143143
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
144144
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
145145
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
146+
golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
147+
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
146148
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
147149
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
148150
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -176,6 +178,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
176178
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
177179
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
178180
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
181+
golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA=
182+
golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c=
179183
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
180184
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
181185
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

mocks/pgmq.go

Lines changed: 0 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)