Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GOA-522: Use last heartbeat if available when getting initial position #1823

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

mattvenz
Copy link

@mattvenz mattvenz commented Feb 4, 2022

This change is to enable safely truncating the positions table in the database in the event of a failure. This is unfortunately a reasonably common scenario but can cause a problem if the schema changes between the time Maxwell fails and recovers.

Current behaviour:

When there is no initial position stored, Maxwell attempts to capture the initial position from the BinLog:

initial = Position.capture(c, config.gtidMode);

It creates a new Position and sets the lastHeartbeatRead to 0:

return new Position(BinlogPosition.capture(c, gtidMode), 0L);

The Position is later used to query the latest schema using the offset and lastHeartbeatRead to try to get the correct schema:

String sql = "SELECT id from `schemas` "
+ "WHERE deleted = 0 "
+ "AND last_heartbeat_read <= ? AND ("
+ "(binlog_file < ?) OR "
+ "(binlog_file = ? and binlog_position < ? and base_schema_id is not null) OR "
+ "(binlog_file = ? and binlog_position <= ? and base_schema_id is null) "
+ ") AND server_id = ? "
+ "ORDER BY last_heartbeat_read DESC, binlog_file DESC, binlog_position DESC limit 1";

This works fine if in normal operation but if the position entry is removed from the database to reset the position and recover from a failure, it can cause a schema change to be missed and cause a mismatch between the schema and the data.

Proposed solution

When Maxwell is initialised, if the heartbeat is currently set to 0, this indicates that the position has been reset and should trigger a schema recapture.

References

JIRA: https://zendesk.atlassian.net/browse/GOA-726

@mattvenz mattvenz changed the title GOA-522: Use last heartbeat on capture GOA-522: Use last heartbeat if available when getting initial position Feb 4, 2022
Copy link
Contributor

@redcape redcape left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify the description more. It's describing the current code path, but not clearly explaining what's happened/what is expected, I made some assumption below and came to a different conclusion for the fix, please correct where I'm wrong.

The position was removed from maxwell "to reset". After reading through the recovery attempts logic and the PR description, my understanding is that you:

Actions Taken:

  1. Deleted all the positions for maxwell for the given server_id

What happened:

  1. Maxwell used an old schema (maybe the initial one captured with last_heartbeat_read = 0 , is that right or a different one?)

Expectation:

  1. Maxwell will continue processing from the latest binlog position captured at startup using the latest schema already present in the schemas table.

Is that right? If so, I think the expectation has faults, what if there were schema changes between the latest binlog and the last binlog position processed before the positions delete? Maxwell wouldn't have a proper picture of the schema at that point. I think maxwell should recapture the schema again or it should continue reading from the last schema change (which may or may not exist in the binlogs). The former seems more straightforward.

In that case, the solution to your issue would be to leave the initial position logic of setting the heartbeat to 0 as-is and change the restore logic to special-case the initial heartbeat of 0 to mean that the schema doesn't exist and it should be recaptured. What do you think @mattvenz ?

@mattvenz
Copy link
Author

mattvenz commented Feb 6, 2022

@redcape thanks for your comments. Apologies if my comment lacked clarity. I'm new to this codebase and working with Tim to fix this issue.

What you're saying makes sense. I'll take another look at the code and re-evaluate. Thanks!

@mattvenz
Copy link
Author

mattvenz commented Feb 7, 2022

I've update the change to the simpler and better solution proposed by @redcape.

@redcape
Copy link
Contributor

redcape commented Feb 7, 2022

What you're saying makes sense but would setting the heartbeat back to 0 cause other issues? We could also just set the heatbeat to the current time which would also trigger a new schema capture

The query you linked is what I'm focusing on:

String sql = "SELECT id from `schemas` " 
 		+ "WHERE deleted = 0 " 
 		+ "AND last_heartbeat_read <= ? AND (" 
 		+ "(binlog_file < ?) OR " 
 		+ "(binlog_file = ? and binlog_position < ? and base_schema_id is not null) OR " 
 		+ "(binlog_file = ? and binlog_position <= ? and base_schema_id is null) " 
 		+ ") AND server_id = ? " 
 		+ "ORDER BY last_heartbeat_read DESC, binlog_file DESC, binlog_position DESC limit 1"; 

I don't think setting heartbeat to the current time would help because that will make it use the latest schema and it wouldn't recapture. Setting the heartbeat to 0 like I suggested looks like it would produce issues as well since the same SQL query will end up potentially picking up a later schema change on reboot if no schemas changed during the run and our newly captured schema also has a heartbeat of 0.

This is a problem of a dirty delete, usually I drop Maxwell DB when I want to reset for testing, not just a single position. How should Maxwell properly recover in this case or pass info that the schema needs to reset. One answer might be to just delete the schemas(set deleted=1 so compaction can delete the child data later) for that server_id when the positions are missing and the problem will fix itself in the normal way.

@mattvenz
Copy link
Author

mattvenz commented Feb 7, 2022

What you're saying makes sense but would setting the heartbeat back to 0 cause other issues? We could also just set the heatbeat to the current time which would also trigger a new schema capture

The query you linked is what I'm focusing on:

String sql = "SELECT id from `schemas` " 
 		+ "WHERE deleted = 0 " 
 		+ "AND last_heartbeat_read <= ? AND (" 
 		+ "(binlog_file < ?) OR " 
 		+ "(binlog_file = ? and binlog_position < ? and base_schema_id is not null) OR " 
 		+ "(binlog_file = ? and binlog_position <= ? and base_schema_id is null) " 
 		+ ") AND server_id = ? " 
 		+ "ORDER BY last_heartbeat_read DESC, binlog_file DESC, binlog_position DESC limit 1"; 

I don't think setting heartbeat to the current time would help because that will make it use the latest schema and it wouldn't recapture. Setting the heartbeat to 0 like I suggested looks like it would produce issues as well since the same SQL query will end up potentially picking up a later schema change on reboot if no schemas changed during the run and our newly captured schema also has a heartbeat of 0.

This is a problem of a dirty delete, usually I drop Maxwell DB when I want to reset for testing, not just a single position. How should Maxwell properly recover in this case or pass info that the schema needs to reset. One answer might be to just delete the schemas(set deleted=1 so compaction can delete the child data later) for that server_id when the positions are missing and the problem will fix itself in the normal way.

As I understand it it will get the correct position from the binlog on startup and it will use that position to save the new schema so it shouldn't query an older schema if it's saved a new one.

@redcape
Copy link
Contributor

redcape commented Feb 7, 2022

As I understand it it will get the correct position from the binlog on startup and it will use that position to save the new schema so it shouldn't query an older schema if it's saved a new one.

Consider the query and the original change you wanted to make. The initial position's heartbeat is 0, this is what is used when storing the new schema. The first tiebreaker in the query is last_heartbeat_read.

Assuming there is at least one existing additional schema change stored past the initial one with a nonzero heartbeat, there would already be a schema with a later heartbeat that would be incorrectly chosen as the latest immediately after the first schema capture.

If we want Maxwell to support recovery from manually deleting a specific position like this, I think automatically marking all the related schemas with the same server_id as deleted makes the most sense to get back to a normal place.

@osheroff any thoughts on this?

@mattvenz
Copy link
Author

mattvenz commented Feb 7, 2022

As I understand it it will get the correct position from the binlog on startup and it will use that position to save the new schema so it shouldn't query an older schema if it's saved a new one.

Consider the query and the original change you wanted to make. The initial position's heartbeat is 0, this is what is used when storing the new schema. The first tiebreaker in the query is last_heartbeat_read.

Assuming there is at least one existing additional schema change stored past the initial one with a nonzero heartbeat, there would already be a schema with a later heartbeat that would be incorrectly chosen as the latest immediately after the first schema capture.

If we want Maxwell to support recovery from manually deleting a specific position like this, I think automatically marking all the related schemas with the same server_id as deleted makes the most sense to get back to a normal place.

@osheroff any thoughts on this?

Shouldn't the schemas be sorted by position first, then heartbeat? If not, maybe we should use the latest heartbeat and reset the schema as well.

@timbertson-zd
Copy link
Contributor

Shouldn't the schemas be sorted by position first, then heartbeat?

Sadly not, that's why last_heartbeat_read was introduced: #630

The thing we ultimately want is that when we start maxwell with no stored position, it resumes from the latest binlog position, with the best schema for that position. Typically, the schema hasn't changed since maxwell got stuck, so the last stored schema is often an OK choice. But it would be safest to also recpature the current schema.

There's a few changes we could make:

Change 1: synthesize a plausible last_heartbeat_read

In Position.capture(...), we would SELECT heartbeat from heartbeats (with appropriate conditions), and use that instead of 0L. This is the most reasonable value in all cases, since if you had actually followed the binlog, that would have been the last heartbeat you saw.

(if there's no such row then it's fine to fallback on 0L, since this is presumably the first run of maxwell)

Change 2: recapture schema when there's no initial position

This makes sense, I think we probably ought to implement it since nothing good can come from using an incompatible schema. We can live without it though, since we already have a --recapture_schema flag.

But as you point out the current code would capture a schema with last_heartbeat_read=0, making it unlikely to be chosen, so it's not much use unless we also implement (1).

Change 3: delete all other schemas when recapturing

This would workaround the fact that a captured schema has last_heartbeat_read=0. But it's still a workaround, and one we wouldn't need if we just had a more correct last_heartbeat_read in both the position and the captured schema.


So my tl;dr is that I think we should implement (1) straight away, and (2) afterwards (although it's less important, as there are already workarounds). I don't think (3) is necessary.

@mattvenz
Copy link
Author

mattvenz commented Feb 8, 2022

So my tl;dr is that I think we should implement (1) straight away, and (2) afterwards (although it's less important, as there are already workarounds). I don't think (3) is necessary.

This makes sense. However in the case of change 2 we would also need to recapture the schema if there is a current heartbeat as it may have changed before the position table is truncated.

@redcape
Copy link
Contributor

redcape commented Feb 8, 2022

For 3, I was thinking something like: when position is missing, immediately: update schemas set deleted = 1 where server_id = ?, on first run it will update nothing, in cases like this, it will update existing schemas.

All of these are workarounds for the user manually modifying maxwell to be in an inconsistent state. What is currently a user-initiated bug / incorrect state change today is a feature for how to do resets tomorrow.

1 creates a position initialization/heartbeat code-path that is only exercised when this type of change happens and can cause more issues without 2 when you have frequent schema changes.
2. relies on 1 to work and will need to pass around information about this being initialization for the position.
3. tries to bring the state closer to the original initialization state

I don't think it's appropriate to implement 1 without 2 if you're going that route. It can't be a feature to delete positions unless it will always correct itself. I prefer 3 because it seems like the least code and I don't have to reason very hard about its correctness since the schema query checks for deleted=0 and will auto-recapture when it finds nothing.

Super-rare corner cases for 2 would look something like this:

the last recorded heartbeat also had a schema change
positions are deleted
the binlog rolled over and is now lexicographically "earlier"
recapture stores the same heartbeat with an "earlier" binlog and the next restart picks up the old schema

3 avoids this and needs less thought :)

@redcape
Copy link
Contributor

redcape commented Feb 8, 2022

So my tl;dr is that I think we should implement (1) straight away, and (2) afterwards (although it's less important, as there are already workarounds). I don't think (3) is necessary.

I disagree that 2 is less important, if the feature of "deleting positions is okay and works fine" exists, then it's very important to my use-cases where the DDL changes occur very frequently

@mattvenz
Copy link
Author

mattvenz commented Feb 8, 2022

So my tl;dr is that I think we should implement (1) straight away, and (2) afterwards (although it's less important, as there are already workarounds). I don't think (3) is necessary.

I disagree that 2 is less important, if the feature of "deleting positions is okay and works fine" exists, then it's very important to my use-cases where the DDL changes occur very frequently

I don't think there's a need to delete schemas. We should be able to create a new schema at the correct position and heartbeat. If we implement (1) now and grab the latest heartbeat if available it's not perfect but better than before. I don't see how it causes more problems.

@mattvenz
Copy link
Author

mattvenz commented Feb 8, 2022

Super-rare corner cases for 2 would look something like this:

the last recorded heartbeat also had a schema change
positions are deleted
the binlog rolled over and is now lexicographically "earlier"
recapture stores the same heartbeat with an "earlier" binlog and the next restart picks up the old schema

Makes sense. If we're recapturing the schema we should probably set the heartbeat to current time rather than last recorded.

@timbertson-zd
Copy link
Contributor

1 creates a position initialization/heartbeat code-path that is only exercised when this type of change happens and can cause more issues without 2 when you have frequent schema changes.

This code path also happens when you have an (unsuccessful) master failover. The resulting captured schema (with its 0 last_heartbeat_read) causes follow-on issues because it doesn't get selected when it should. Deleting other schemas in that case could work, but it could also cause further harm if we fail back to the original master (its old schemas are all gone).

This feels obtuse, and maybe it is - it definitely happened to us once, but it was a while back since we don't have master failovers any more. So maybe deleting other schemas is acceptable in that case, the fail-back would just do more recapturing 🤷

set the heartbeat to current time rather than last recorded.

It feels weird that the heartbeat value could then go backwards (when we read the binlogs, we might see an earlier value based on clock skew / another concurrent process).

@redcape
Copy link
Contributor

redcape commented Feb 8, 2022

@mattvenz Your suggested fixes involve more logic and more special-casing compared to several line change to delete when detecting a missing position. Is there something I'm not aware of about deleting the schemas that makes the added complexity of multiple heartbeat sources based on different conditions more desirable?

@mattvenz
Copy link
Author

mattvenz commented Feb 8, 2022

@mattvenz Your suggested fixes involve more logic and more special-casing compared to several line change to delete when detecting a missing position. Is there something I'm not aware of about deleting the schemas that makes the added complexity of multiple heartbeat sources based on different conditions more desirable?

Yes I’m coming around to your point of view but also feel like there are traps everywhere.

@osheroff
Copy link
Collaborator

osheroff commented Feb 8, 2022

Hi guys.

@redcape thanks for taking a start at this code review! super appreciated to have more eyes here as these issues are thorny.

Can we take a bit of a step back and describe the failure case we're recovering from? Have the binlogs been deleted from the server or did maxwell just fall far enough behind that catching up would be arduous?

iif the use case I'm thinking of is what you're trying to solve, I'm thinking that maybe instead of trying jam this feature into heartbeats (which are, as pointed out, rather a tricky bit of code to get right), we might instead introduce like a --max_lag_seconds=44444 feature, which did like:

  • on startup, calculate a "seconds behind master" by sampling the current binlog position.
  • if the binlog position refers to a missing binlog, recapture the schema and set position to current
  • if the binlog exists but is older than max_lag_seconds, do a "fast forward" of the position -- play the binlog forward, supressing all more DML output but processing the DDL so we keep the schema preserved
  • otherwise startup as normal.

@timbertson-zd
Copy link
Contributor

did maxwell just fall far enough behind that catching up would be arduous

The typical scenario (which we experienced a few times recently) was effectively a poison pill - maxwell in a crashloop, unable to make it past a certain portion of binlog. We fix the underlying problem eventually, but in the short term we do this to get unstuck.

For this specific use case we want to manually trigger it. A specific lag target feels potentially dangerous (sometimes there's just normal lag, rather than it being stuck). Generally we prefer lag over skipped events, except when maxwell is actually stuck.

play the binlog forward, supressing all more DML output but processing the DDL so we keep the schema preserved

Would this wind it forward until we reach an event less than 44444 seconds ago, and then resume publishing? It's kind of tempting, but doesn't sound very easy to implement. In the past, maxwell's schema handling has been the cause for most instances of maxwell getting stuck, so it also feels like a partial solution. i.e. we'd still want the ability to just throw away the schema and resume from the latest position.

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

ok so no to that solution.

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

what about something more like:

"the next time maxwell starts up, I want it to fast-forward the schema to the current position, processing the schema updates but throwing away the data until we're 'current'"

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

We fix the underlying problem eventually

why no PRs come across my desk?

@timbertson-zd
Copy link
Contributor

why no PRs come across my desk?

Hah, this time it was our bad - bug in the Kafka client which we fixed by shipping the latest maxwell version.

"the next time maxwell starts up, I want it to fast-forward the schema to the current position, processing the schema updates but throwing away the data until we're 'current'"

Why bother processing schema updates in that case? Schema processing can fail, but capturing is pretty robust.

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

Why bother processing schema updates in that case? Schema processing can fail, but capturing is pretty robust.

ok.

then.

a command line tool to fast-forward to current while resetting the schema that doesn't involve dropping the database?

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

basically, I'm looking for a way to make this not just a weird backdoor into some arcana (that also modifies the system). It'd be nice if it played nicely with multiple clients somehow but if it just does DELETE from [all_tables] (and the corresponding code knows how to handle that case), that's fine too.

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

goddamnit cuthberson, i feel like there's something i'm not getting about this pr

@timbertson-zd
Copy link
Contributor

Sure, that'd work. For us it's easier to drop a particular row, because setting then unsetting a flag requires 2 deploys, and we already wrote a job to "unstuck maxwell on a particular database". But a flag is definitely the better API for it.

Independently though, I've been bothered for a long time about the fact that we synthesize positions with last_heartbeat_read=0, because it's just plain incorrect. I just haven't been motivated enough to fix that before now, because it was so rare.

@osheroff
Copy link
Collaborator

osheroff commented Feb 9, 2022

So the usage wouldn't be setting and then unsettling a flag -- you'd login to the node running maxwell and run a bundled utility (like maxwell-bootstrap) that changed the database in some way that made it so the next run would result in a recapture and position reset -- presumably the utility would just truncate all the maxwell tables, but it could be something else too

@timbertson-zd
Copy link
Contributor

timbertson-zd commented Feb 10, 2022

Oh, I see what you're saying now.

run a bundled utility (like maxwell-bootstrap) that changed the database in some way that made it so the next run would result in a recapture and position reset

That would be nice. The implementation could be one of:

  1. set a flag somewhere "reset_positions_on_next_run" which we look for on startup, triggering a different code path, to capture positions, capture schema, and delete this flag
  2. delete * from positions where client_id = ? (what our job currently does)
  3. delete from positions and schemas (@redcape's suggestion, but in the tool rather than maxwell proper)

For any of these, we need to implement deletion / recapturing of schemas upon position recapture. To me it feels like it might as well be in maxwell (i.e (1) or (2)), which brings us back to square 1, plus the addition of a new tool. I'm not champing at the bit to write such a tool, on the balance of effort required I'd lean towards just adding a CLI flag (despite it being awkward to use). But either way we still need something which recaptures the schema correctly.

@osheroff
Copy link
Collaborator

(despite it being awkward to use)

It's like a clear and utter pain to use, but the argument for doing a tool is that it's kinda dangerous -- so you stop maxwell, add this flag to its startup --reset-maxwell, and then maxwell starts up and starts running, you forget to clear it, next time something bounces? well you're resetting again.

If you wanted to get clever I guess you could add like a timestamp or something to it, --reset-maxwell-before='2022-02-10' and only honor the flag if before that date. feels hacky tho

@redcape
Copy link
Contributor

redcape commented Feb 10, 2022

A reset flag could work, I like the reset-before type flag, personally I would prefer a reset-before-version, if the number is higher than what is stored for a reset-version column in the DB, reset following whatever the desired procedure is.

Reason for preference: timestamps can be harder to set - was the format YYYY-MM-DD-HH:MM:SS? YYYY-MM-DD HH:MM:SS? what timezone? unix timestamp in millis? in seconds?

I'm not a fan of another tool, it assumes that someone would want to log in to a machine with prod credentials to fix instead of deploying the change through checked in code/configuration.

@osheroff
Copy link
Collaborator

osheroff commented Feb 11, 2022

Ok. Best solution so far, as far as I'm concerned:

  • add a boolean flag, reset_next_run tinyint(1) to maxwell.positions.
  • have maxwell see this when picking up a position, triggering schema recapture and position reset (as well as toggling this flag)
  • Probably don't truncate tables to preserve multi-client behavior. schema compaction should clean up orphaned schemas (I think? should verify.)
  • tool is bonus but if no one wants it and prefers twiddling the database by hand, so be it.

@timbertson-zd
Copy link
Contributor

have maxwell see this when picking up a position, triggering schema recapture and position reset (as well as toggling this flag)

Why is this preferable to just deleting the position in question? In either case you need to capture the current position (and schemas). Arguably it's more explicit, but assuming we're going to take the same action when either noPositionFound || resetFlagIsSet, what's the benefit of introducing this flag?

@osheroff
Copy link
Collaborator

in a multi-client world, when you delete the position it's gonna try to optimize for sharing schemas and reset back to the last-known-good client position. but the use case here is to generally start from current position.

@timbertson-zd
Copy link
Contributor

when you delete the position it's gonna try to optimize for sharing schemas and reset back to the last-known-good client position

Ah, I missed that logic (we don't run multi clients for the most part). So yep, I see the point in a new flag now.

But (broken record time), I still don't see a good reason to not implement "recapture the current schema" when there's no position found. And we'd do the same thing when this new flag is set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants