@@ -27,6 +27,7 @@ type Message struct {
27
27
}
28
28
29
29
type DB interface {
30
+ Ping (ctx context.Context ) error
30
31
Exec (ctx context.Context , sql string , args ... any ) (pgconn.CommandTag , error )
31
32
Query (ctx context.Context , sql string , args ... any ) (pgx.Rows , error )
32
33
QueryRow (ctx context.Context , sql string , args ... any ) pgx.Row
@@ -51,31 +52,27 @@ func New(ctx context.Context, connString string) (*PGMQ, error) {
51
52
return nil , fmt .Errorf ("error creating pool: %w" , err )
52
53
}
53
54
54
- err = pool .Ping (ctx )
55
- if err != nil {
55
+ return NewFromDB (ctx , pool )
56
+ }
57
+
58
+ // NewFromDB is a bring your own DB version of New. Given an implementation
59
+ // of DB, it will call Ping to ensure the connection has been established,
60
+ // then create the PGMQ extension if it does not already exist.
61
+ func NewFromDB (ctx context.Context , db DB ) (* PGMQ , error ) {
62
+ if err := db .Ping (ctx ); err != nil {
56
63
return nil , err
57
64
}
58
65
59
- _ , err = pool .Exec (ctx , "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE" )
66
+ _ , err := db .Exec (ctx , "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE" )
60
67
if err != nil {
61
68
return nil , fmt .Errorf ("error creating pgmq extension: %w" , err )
62
69
}
63
70
64
71
return & PGMQ {
65
- db : pool ,
72
+ db : db ,
66
73
}, nil
67
74
}
68
75
69
- // MustNew is similar to New, but panics if it encounters an error.
70
- func MustNew (ctx context.Context , connString string ) * PGMQ {
71
- q , err := New (ctx , connString )
72
- if err != nil {
73
- panic (err )
74
- }
75
-
76
- return q
77
- }
78
-
79
76
// Close closes the underlying connection pool.
80
77
func (p * PGMQ ) Close () {
81
78
p .db .Close ()
0 commit comments