Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failed-import batch archiving to aid debugging #24

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jessemortenson
Copy link
Contributor

My goal here is to make it easier to debug failed imports that occur under realtime processing. This code zips up the jurisdiction data directory that was being processed when an import fails, and puts it in the archive/ path of the realtime processing S3 bucket. It logs the name of the archive zip to the log message, so that a debugger can download and examine which data was being imported at the time.

This is kind of yet another hack on top of hacks, but hopefully at least just a logging thing and not something that further complicates the data flow here.

Thinking through this, I think a more reasonable overall process might be something like the following. I didn't implement this because it would be more work, and work spanning into openstates-core. But consider this a little mini-EP tagged onto this PR for your feedback to inform future work.

  1. Scraper is yielding scraped entities
  2. Process that is receiving those and saving to JSON keeps track up to a certain increment (15 minutes? 200 entities?)
  3. Once that increment is met, that process consolidates the data in that increment into a couple of parquet files (one per entity type); then uploads those to S3 along with an SQS message identifying them
  4. The realtime lambda receives the message and processes the full increment as one batch

I think that would help a few things:

  1. Reduce S3 API costs because we're uploading 1-3 parquet files per batch, instead of thousands of JSON Files all the time
  2. An error in processing in the lambda could result in the same parquet files being simply moved to an archive location
  3. Reduce the odds that multiple lambda executions are processing files from the same jurisdiction at the same time, and increase the odds that concurrent executions are instead each working on their own jurisdiction. I don't know for sure but I suspect this will reduce a few errors.

Now of course I think our original idea of the big SQL Mesh transformation engine is even better than the above, but the above is less work than that and probably still a significant improvement.

Copy link
Contributor

@alexobaseki alexobaseki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! The idea for improvement also look solid.v I have a question around conditional logic but just for clarification and testing to be sure it works as expected.

sqs_delete_fetched_messages = os.environ.get("SQS_DELETE_FETCHED_MESSAGES", True)
if (
sqs_delete_fetched_messages is not False
and sqs_delete_fetched_messages.lower() != "true" and sqs_delete_fetched_messages != "1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this logic. Doesn't sqs_delete_fetched_messages.lower() != "true" and sqs_delete_fetched_messages != "1" mean that sqs_delete_fetched_messages is probably "false" or "0". Shouldn't that be sqs_delete_fetched_messages = False then?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants