Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: System database support for AWS S3 #8435

Closed
menzenski opened this issue Mar 6, 2024 · 6 comments
Closed

feature: System database support for AWS S3 #8435

menzenski opened this issue Mar 6, 2024 · 6 comments

Comments

@menzenski
Copy link
Contributor

Feature scope

Configuration (settings parsing, validation, etc.)

Description

It would be great to have Meltano able to write runs data to AWS S3. S3 is a supported "state backend" (so Meltano can write state there) but it's not a supported "system database" (so Meltano cannot write runs data there).

We currently run Meltano using a Postgres system database and have become accustomed to having the runs table data available. However we'd like to retire this Postgres database and have been standing up a Meltano project using the S3 state backend. I am now realizing that this means we will no longer have runs data available to us.

@edgarrmondragon
Copy link
Collaborator

Hey Matt, thanks for filing!

Can you say more about how you're using the runs table? Is it something that you glance at occasionally, or are some of your workflows dependent on it?


I'm trying to think what this would look like. State backends are essentially key-value stores so it's easy to use object storage for that, but the runs table is more transactional.

See for example how the job/run model is used within a transaction to keep a heartbeat

@asynccontextmanager
async def run(self, session):
"""Run wrapped code in context of a job.
Transitions state to RUNNING and SUCCESS/FAIL as appropriate and
records heartbeat every second.
Args:
session: the session to use for writing to the db
Raises:
BaseException: re-raises an exception occurring in the job running
in this context
""" # noqa: DAR301
try:
self.start()
self.save(session)
with self._handling_sigterm(session):
async with self._heartbeating(session):
yield
self.success()
self.save(session)
except BaseException as err: # noqa: WPS424
if not self.is_running():
raise
self.fail(error=self._error_message(err))
self.save(session)
raise

and to control concurrency

async def run_with_job(self) -> None:
"""Run the ELT task within the context of a job.
Raises:
RunnerError: if failures are encountered during execution or if the
underlying pipeline/job is already running.
"""
job = self.context.job
fail_stale_jobs(self.context.session, job.job_name)
if not self.context.force and (
existing := JobFinder(job.job_name).latest_running(
self.context.session,
)
):
raise RunnerError(
f"Another '{job.job_name}' pipeline is already running "
f"which started at {existing.started_at}. To ignore this "
"check use the '--force' option.",
)
with closing(self.context.session) as session:
async with job.run(session):
await self.execute()

I'm happy to discuss spec and implementation proposals, and even review PRs, but this is something that we probably won't prioritize ourselves.


That said, one option that may be available today is to rely on the default SQLite system db and use something like Litestream1 to sync the database with S3.

Footnotes

  1. https://litestream.io Litestream - Streaming SQLite Replication

@edgarrmondragon
Copy link
Collaborator

Another idea that just came to mind is to search for or implement a sqlalchemy dialect that's sqlite + s3, so it could be used like the example in #7143 (comment).

The individual components seem to be out there:

@menzenski
Copy link
Contributor Author

Can you say more about how you're using the runs table? Is it something that you glance at occasionally, or are some of your workflows dependent on it?

We don't have anything today that depends on it specifically. We query it manually, occasionally, for debugging purposes.

As we build out our "second-generation" Meltano platform, though, we would like to better implement "reporting and analytics on our ELT workflows" - ideally we'd be able to e.g. surface in a dashboard which ELT jobs have run recently, succeeded, failed, etc.

We run Meltano in Kubernetes via Argo Workflows and we have the Argo Workflows workflow archive set up, so all Argo Workflows executions are recorded in a database today. A Meltano run corresponds to exactly one Argo Workflows run, so we still have good information available on what Meltano jobs ran when, succeeded, failed, etc.

The part I'm thinking about specifically as a potential limitation is not having the "payload" field from the meltano runs table available. It seems like it'd be useful to have that explicitly persisted - it seems to provide the value of the replication key for each stream in the run, at the start of the run.

@menzenski
Copy link
Contributor Author

The other thought I had is that we're moving from Postgres into Snowflake for our warehouse. System database support for Snowflake would accomplish the same goal for us (continue to leverage a persistent state backend without running a Postgres database).

@edgarrmondragon
Copy link
Collaborator

edgarrmondragon commented Mar 8, 2024

We don't have anything today that depends on it specifically. We query it manually, occasionally, for debugging purposes.

As we build out our "second-generation" Meltano platform, though, we would like to better implement "reporting and analytics on our ELT workflows" - ideally we'd be able to e.g. surface in a dashboard which ELT jobs have run recently, succeeded, failed, etc.

We run Meltano in Kubernetes via Argo Workflows and we have the Argo Workflows workflow archive set up, so all Argo Workflows executions are recorded in a database today. A Meltano run corresponds to exactly one Argo Workflows run, so we still have good information available on what Meltano jobs ran when, succeeded, failed, etc.

The part I'm thinking about specifically as a potential limitation is not having the "payload" field from the meltano runs table available. It seems like it'd be useful to have that explicitly persisted - it seems to provide the value of the replication key for each stream in the run, at the start of the run.

Thanks for adding context! That makes sense. The payload field can indeed give some insight into "state evolution" of a tap and its streams, which can be valuable.

The other thought I had is that we're moving from Postgres into Snowflake for our warehouse. System database support for Snowflake would accomplish the same goal for us (continue to leverage a persistent state backend without running a Postgres database).

Yeah, that's been asked in Slack before. The database_uri is a SQLAlchemy URL, so in theory you could point it to a snowflake instance by setting it to 'snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'1.

Now, I recall that doesn't work because at least one of the migration scripts is not compatible with snowflake's sql so changes would be required there (see #6529 and #6167). Do log an issue for it if Snowflake support for systemdb would make this transition easier for you, and of course PRs would be welcome 😄.

Footnotes

  1. https://github.com/snowflakedb/snowflake-sqlalchemy/?tab=readme-ov-file#connection-parameters

@menzenski
Copy link
Contributor Author

After some further considerations we've decided that it's actually not feasible to retire our Postgres database. We plan to continue to use it for the Meltano system database (and for some other metadata capturing, Argo Workflows archive etc).

So, from my perspective this issue could be closed.

@edgarrmondragon edgarrmondragon closed this as not planned Won't fix, can't repro, duplicate, stale May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Discussed
Development

No branches or pull requests

2 participants