Implement the PostgreSQL wire protocol as defined here:
- https://www.postgresql.org/docs/17/protocol-flow.html#PROTOCOL-FLOW-START-UP
- https://www.postgresql.org/docs/17/protocol-message-types.html
- https://www.postgresql.org/docs/17/protocol-message-formats.html
- https://www.postgresql.org/docs/17/protocol-replication.html
- https://www.postgresql.org/docs/17/protocol-logical-replication.html
Note: the authentication is done:
- in C, here:
PQconnectPoll()(src/interfaces/libpq/fe-connect.c) - in Rust, here:
pg_cat
Implementation of PostgreSQL's wire protocol usually cover the frontend side of things and program that need to implement the backend side of things do it in the code. That's why the messages are reimplemented here.
It's an experiment to see, what we can do:
- buffering logical replication
- anonymize on the fly
- track the activity submitted to the instance
.. and to see how the protocol works.
It's still very alpha code, not stable or full featured: a POC.
The main crate is a library, there is a derive macro crate (libpq-serde-macros) and
a utility/test crate for encoding decoding (libpq-serde-types). There are examples
in $CRATE_ROOT/examples namely client and passthru.
Known limitation:
- the cli is rough (two examples:
clientandpassthru) - TLS connexion are not supported (use
sslmode=allow) - there is no async implementation (yet?)
The purpose is to try to executes queries or consume modification from a slot.
To submit queries:
cargo run --example client -- \
--host pgsrv \
--port 5432 \
--username md5user \
--database postgres \
--password md5pass \
query \
--use-sampleOr
cargo run --example client -- \
--host pgsrv \
--port 5432 \
--username md5user \
--database postgres \
--password md5pass \
query "SELECT 1" "SELECT relname, relpages FROM pg_class LIMIT 1"Only Bool, Char, Name, Int8, Int4, Text and Oid are supported for the moment.
For the logical replication (with the same limitations and more):
cargo run --example client -- \
--host pgsrv \
--port 5432 \
--username md5userrl \
--database postgres \
--password md5passrl \
replication pub slotThiw will read the data from a slot but not consume it (since StandbyStatusUpdate is not sent when PrimaryKeepAlive is received, which also means that if there is no activity, we get kicked after a while).
The purpose is to have a proxy that can forward the data from a client connexion to a PostgreSQL instance for queries or logical replication.
Prerequisite:
- a target instance (here
192.168.121.1:9092) - an interface/port to listen on (here
192.168.121.1:9092) - the user must exist in the database and have the md5 authentication method
configured in the
pg_hba.confand the password encryption. - the connexion must not use TLS (
sslmode=allow)
Example: client
cargo run --example passthru --\
--listen-host 192.168.121.1\
--listen-port 9092\
--host 192.168.121.89\
--port 5432\
--debugIn another session (PGPASSWORD is necessary, we fail otherwise):
PGPASSWORD=benoit psql "host=192.168.121.1 port=9092 sslmode=allow"psql (17.4 (Debian 17.4-1.pgdg120+2), server 0.0.0)
WARNING: psql major version 17, server major version 0.0.
Some psql features might not work.
Type "help" for help.
192.168.121.1:9092 benoit@benoit=>
Example: replication
We will need two instances for this.
cargo run --example passthru --\
--listen-host 192.168.121.1\
--listen-port 9092\
--host 192.168.121.89\
--port 5432\
--debugIn one session connected to the publication, create a publication:
-- cleanup the slot
-- /!\ it will drop all the slot (if possible) /!\
SELECT slot_name, pg_drop_replication_slot(slot_name) FROM pg_replication_slots;
-- create a publication
CREATE TABLE t(i int);
CREATE PUBLICATION pub FOR TABLE t;In one session connected to the subscription instance, create the subscription:
-- cleanup
ALTER SUBSCRIPTION subtest DISABLE;
ALTER SUBSCRIPTION subtest SET (slot_name = NONE);
DROP SUBSCRIPTION subtest ;
-- create a subscription
CREATE SUBSCRIPTION subtest
CONNECTION 'host=192.168.121.1 port=9092 user=md5userrl dbname=postgres sslmode=allow password=md5passrl'
PUBLICATION pub;INSERT, UPDATE, DELETE, TRUNCATE on the table will be transferred.
If you run the foillowing:
cargo run --example passthru --\
--listen-host 192.168.121.1\
--listen-port 9092\
--host 192.168.121.89\
--port 5432\
--anonymize\
--debug... and update the oid on this line of src/handler/client.rs data should be
anonymized on the fly.
anonymize_what.insert((16453, 0), Anonymize::i32(|c: i32| c * -10)); // t.iNote:
- the first tuple
(16453, 0)is(relation_oid, column position) Anonymize::i32(fn(i32) -> i32): identifies the type of the column as i32 (int) and defines the prototype of the anonymization function|c: i32| c * -10): is a lambda function that takes an i32 and returns an i32. It could be any valid rust code including other function calls. I suppose we could use a scripting language (lua) to make this dynamic.
The target is to have a more user friendly struct and build the Hashmap (anonymize_what)
as we receive the CopyData>XlogData>Relation messages so that it's more usable.