-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vitess VStream Replicator Support #1943
base: master
Are you sure you want to change the base?
Conversation
Things that work: * The code successfully follows a given keyspace in a Vitess cluster (a specific shard or all shards) and pushes RowMap objects to a given producer. Still need to implement/fix/figure out: * Position store is stubbed out for now and position recovery is not implemented, so it always starts at the current stream position. * I'm not sure I'm handling GRPC stream shutdown properly. * Had to disable OpenCensus dependency for now because they depend on an ancient grpc version that conflicts with the grpc dependency used by vitess.
The
|
Definitely don't re-implement. Maxwell is apache2 as well, copying it on in is fine. The debezium guys are friends, of sorts, and a NOTICE file will be fine. |
Nice start! Let me know if there's anything you're unclear on; I presume that I won't get deep into source-level comments just now, on a very surface level you should know that maxwell uses tabs-not-spaces :) |
Nice catch, fixed that! I'm back from vacation and planning to work on adding position tracking and recovery. |
Added support for Vitess position tracking and recovery. Few notes on the implementation:
|
OK, polished it all a bit, cleaned up some changes we didn't need and tomorrow will try to deploy in staging within our infrastructure and run on a relatively large Vitess cluster to see how it goes. 🤞🏻 |
That's fine, two things:
|
That's fine, we'll need some code strewn around the codebase to turn off the expectation of heartbeats. |
do you have a docker-compose type setup I could use to step through the code? |
can I get a 10k foot view of how the in-memory schema works? Does vitess keep track of what tables a stream listeners has seen already and then send any unseen tables + changes? |
Honestly the whole heartbeating thing even for mysql is of dubious value; it was written in a world where GTID wasn't hugely common to support a wild and overly engineered desire to not lose data during primary server flips. I may start trudging down the ugly and unpleasant path of deprecation at some point. |
Exactly! For each VStream connection, they keep track of what tables and what versions of those tables a client has seen and before sending any data for a given table, they make sure the client first gets a Here is a demo of how it works in reality:
It will immediately send the Click to see JSON stream{
"events": [
{
"type": "VGTID",
"vgtid": {
"shardGtids": [
{
"keyspace": "commerce",
"shard": "0",
"gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2060"
}
]
},
"keyspace": "commerce",
"shard": "0"
},
{
"type": "OTHER",
"keyspace": "commerce",
"shard": "0"
}
]
}
Click to see JSON stream{
"events": [
{
"type": "VGTID",
"vgtid": {
"shardGtids": [
{
"keyspace": "commerce",
"shard": "0",
"gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2061"
}
]
},
"keyspace": "commerce",
"shard": "0"
},
{
"type": "DDL",
"timestamp": "1669215464",
"statement": "create table bananas (\n\t`name` varchar(100)\n)",
"currentTime": "1669215464575035000",
"keyspace": "commerce",
"shard": "0"
}
]
}
Click to see JSON stream{
"events": [
{
"type": "BEGIN",
"timestamp": "1669215490",
"currentTime": "1669215490981788000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "FIELD",
"timestamp": "1669215490",
"fieldEvent": {
"tableName": "commerce.bananas",
"fields": [
{
"name": "name",
"type": "VARCHAR",
"table": "bananas",
"orgTable": "bananas",
"database": "vt_commerce",
"orgName": "name",
"columnLength": 400,
"charset": 45,
"columnType": "varchar(100)"
}
],
"keyspace": "commerce",
"shard": "0"
},
"currentTime": "1669215490990077000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "ROW",
"timestamp": "1669215490",
"rowEvent": {
"tableName": "commerce.bananas",
"rowChanges": [
{
"after": {
"lengths": [
"5"
],
"values": "dGVzdDE="
}
}
],
"keyspace": "commerce",
"shard": "0"
},
"currentTime": "1669215490990132000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "VGTID",
"vgtid": {
"shardGtids": [
{
"keyspace": "commerce",
"shard": "0",
"gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2063"
}
]
},
"keyspace": "commerce",
"shard": "0"
},
{
"type": "COMMIT",
"timestamp": "1669215490",
"currentTime": "1669215490990165000",
"keyspace": "commerce",
"shard": "0"
}
]
}
Click to see JSON stream{
"events": [
{
"type": "BEGIN",
"timestamp": "1669215502",
"currentTime": "1669215502107840000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "ROW",
"timestamp": "1669215502",
"rowEvent": {
"tableName": "commerce.bananas",
"rowChanges": [
{
"after": {
"lengths": [
"5"
],
"values": "dGVzdDI="
}
}
],
"keyspace": "commerce",
"shard": "0"
},
"currentTime": "1669215502107932000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "VGTID",
"vgtid": {
"shardGtids": [
{
"keyspace": "commerce",
"shard": "0",
"gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2064"
}
]
},
"keyspace": "commerce",
"shard": "0"
},
{
"type": "COMMIT",
"timestamp": "1669215502",
"currentTime": "1669215502107961000",
"keyspace": "commerce",
"shard": "0"
}
]
}
Click to see JSON stream{
"events": [
{
"type": "VGTID",
"vgtid": {
"shardGtids": [
{
"keyspace": "commerce",
"shard": "0",
"gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2065"
}
]
},
"keyspace": "commerce",
"shard": "0"
},
{
"type": "DDL",
"timestamp": "1669215523",
"statement": "alter table bananas change column `name` `name` varchar(50)",
"currentTime": "1669215523734671000",
"keyspace": "commerce",
"shard": "0"
}
]
}
Click to see JSON stream{
"events": [
{
"type": "BEGIN",
"timestamp": "1669215535",
"currentTime": "1669215535606806000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "FIELD",
"timestamp": "1669215535",
"fieldEvent": {
"tableName": "commerce.bananas",
"fields": [
{
"name": "name",
"type": "VARCHAR",
"table": "bananas",
"orgTable": "bananas",
"database": "vt_commerce",
"orgName": "name",
"columnLength": 200,
"charset": 45,
"columnType": "varchar(50)"
}
],
"keyspace": "commerce",
"shard": "0"
},
"currentTime": "1669215535619492000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "ROW",
"timestamp": "1669215535",
"rowEvent": {
"tableName": "commerce.bananas",
"rowChanges": [
{
"after": {
"lengths": [
"5"
],
"values": "dGVzdDM="
}
}
],
"keyspace": "commerce",
"shard": "0"
},
"currentTime": "1669215535619512000",
"keyspace": "commerce",
"shard": "0"
},
{
"type": "VGTID",
"vgtid": {
"shardGtids": [
{
"keyspace": "commerce",
"shard": "0",
"gtid": "MySQL56/ff450168-5ed6-11ed-9415-954189eafa28:1-2066"
}
]
},
"keyspace": "commerce",
"shard": "0"
},
{
"type": "COMMIT",
"timestamp": "1669215535",
"currentTime": "1669215535619533000",
"keyspace": "commerce",
"shard": "0"
}
]
} |
nice, very ergonomic. Just something to keep in the back of your head: Maxwell supports configurations with near-psychotic sized schemas, hundreds of thousands of tables. To this end, it's important to keep in-memory schema representations as compact as possible. The default in-memory representation jumps through crazy |
Working today on setting that up (so far it has been based on a bunch of internal tooling we use at Shopify + a bunch of duct tape around official scripts shopped with Vitess for local dev environment setup). |
Added a |
Ok, we have our first instance of Maxwell running on Vitess deployed in Staging on top of a real cluster and it seems to work as expected. More testing, benchmarks, etc over the next couple of weeks. |
Updated the PR with the latest changes from the master branch and now it works witout disabling outdated dependencies. |
Added and tested both TLS certificate and user/password auth on our staging instances - both work as expected. |
Very nice. I’m pretty much good to merge and release whenever you feel confident On Dec 7, 2022, at 12:17, Oleksiy Kovyrin ***@***.***> wrote:
Added and tested both TLS certificate and user/password auth on our staging instances - both work as expected.
—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you commented.Message ID: ***@***.***>
|
I'd like it to bake for a bit in our staging environments (until January?) and then we can be more confident about it. But thank you for your vote of confidence! 😊 |
SGTMOn Dec 7, 2022, at 12:53, Oleksiy Kovyrin ***@***.***> wrote:
I'd like it to bake for a bit in our staging environments (until January?) and then we can be more confident about it. But thank you for your vote of confidence! 😊
—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you commented.Message ID: ***@***.***>
|
@osheroff Noticed an issue today: Due to the way VStream ROW events come in before the VGTID event for a given transaction and they do not have their own position values, I end up simply setting the position on the last ("commit") RowMap event manually at the very end. \ I've looked at your existing producers and they seem to use the Interestingly though, your example CustomProducer (and, as a result, our a producer we use for our project here at Shopify) has So, the question is: which is the right way to update progress from a producer? And, if both the position and the last position field could potentially be used, should I just set them to the same value to make the system work for people who may have followed the example producer code and have the same logic in place? Thank you for any insight you can provide into this. |
ugh, I'm sure that's just a dumb bug with the example The proper behavior is most assuredly to do |
Great, I'll just use that in our custom producer.
Yes, absolutely, that works! |
how'd it all go @kovyrin? stall a bit or you in prod? |
…on Vitess since they may be following a different keyspace/shards
Hey, Ben! We're slowly rolling it out in some of our environments and ironing out rough edges, etc. Currently exploring different options for coordination of multiple instances of Maxwell running with the same config (sometimes happens during rolling upgrades, k8s cluster failovers, etc). I'm considering adding Zookeeper support for leader election or something like that (will propose it in a separate PR if we decide to go that route). I'll try to polish the Vitess PR enough over the next couple of weeks (add tests, docs, etc) and then request a final review/merge if that's OK with you. Thank you for your support! |
@kovyrin Did this ever go to production? It would be great to see this officially in maxwell mainline code :) |
@dbmurphy Unfortunately, due to some changes at Shopify, we had to pivot and change our approach for the project, making Maxwell redundant in our infrastructure. We have run this code in production for about a couple of months before shutting down, so I am fairly certain in it as an experimental feature, but may not be able to push it over the finish line at this point. :( |
[Implements/fixes #1757]
As a part of a project around data materialization efforts within Shopify, we would like to use Maxwell for running our custom producer code. The databases on which the project will be running use Vitess as the clustering/sharding technology. This means we need Maxwell to support replicating data from a Vitess keyspace. This PR attempts to address that need by adding support for a new Replicator class using VStream API provided by Vitess VTgate component.
What already works
The code successfully connects to
vtgate
via the VStream API (including full TLS support, certificate verification, custom CA, etc).Both user/password and TLS auth work for gRPC connections to vtgate.
Maxwell follows a given keyspace in a Vitess cluster (all shards by default, or a specific shard configured within the properties file) and receives all relevant events.
Maxwell uses
FIELD
events from VStream to maintain an in-memory representation of the database schema (meaning we don't need to load it from MySQL or analyze DDL events). See a comment below for more details on how this works.Using the in-memory schema, Maxwell converts VStream
ROW
events intoRowMap
objects (including before/after value tracking) and pushes those to a given producer.VGTID
events are used to track current position (including the initial position when first started) and the data is persisted in a newvitess_gtid
column within thepositions
table. The persisted position is use for recovery in case of a restart.Still need to implement/fix/figure out
When loading position info from the database, we need to make sure to only use keyspaces/shards that match our existing configuration, otherwise a change in a configuration without a corresponding change in a client_id value would make Maxwell to use the old VGTID and never follow the newly configured keyspace/shards.
The code needs tests
We need to update documentation to add references to Vitess support and explain how it works.
The code within the replicator class may contain some duplication when compared to the MysqlReplicator class, but that is by design - I tried to refrain from changing the current code too much until things stabilize a bit within the Vitess-related codebase to make sure whatever generalization we do later would be done on a stable set of classes.
Licensing
You will notice, that there is a set of files within the vitess directory that have Apache 2.0 headers. Those are derivatives of the code used by the Vitess connector in Debezium. Will need guidance on how to properly integrate that code into the codebase (if there is a NOTICE file where it needs mentioning or something like that) or if it needs to be re-implemented (it mostly revolves around schema caching and VStream field value parsing/conversion).