Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async db support #497

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

jernejfrank
Copy link
Contributor

@jernejfrank jernejfrank commented Jan 13, 2025

Adding async support for Postgres and Redis state persisters. This expands on #488 and addresses #484 when using Postgres or Redis.

Changes

  • Potgres state persisted integrates commands using asyncpg.
  • Redis supports async via redis.asyncio

How I tested this

  • unit test

Notes

  • For Postgres: asyncpg uses a coroutine to open/close the connection. In this case pickling / unpickling the state needs a workaround (not yet implemented) since __getstate__ and __setstate__ do not have async support.
  • Redis pickling/unpickling is straightforward (and implemented) since the connection to the db is sync.

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

Important

Add asynchronous support for Redis and PostgreSQL state persisters using redis.asyncio and asyncpg.

  • Async Support:
    • Added AsyncRedisBasePersister using redis.asyncio for async Redis operations in b_redis.py.
    • Added AsyncPostgreSQLPersister using asyncpg for async PostgreSQL operations in postgresql.py.
  • Testing:
    • Added async tests in test_b_redis.py for AsyncRedisBasePersister.
    • Added async tests in test_postgresql.py for AsyncPostgreSQLPersister.
  • Documentation:
    • Updated persister.rst to include AsyncRedisBasePersister and AsyncPostgreSQLPersister.
  • Dependencies:
    • Added asyncpg to pyproject.toml under postgresql dependencies.

This description was created by Ellipsis for 304eb21. It will automatically update as commits are pushed.

Copy link

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good to me! Reviewed everything up to 304eb21 in 37 seconds

More details
  • Looked at 662 lines of code in 6 files
  • Skipped 0 files when reviewing.
  • Skipped posting 2 drafted comments based on config settings.
1. burr/integrations/persisters/b_redis.py:315
  • Draft comment:
    The create_key method does not perform any asynchronous operations and can be converted to a regular method.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The create_key method in AsyncRedisBasePersister is unnecessarily defined as an async function. It does not perform any asynchronous operations and can be converted to a regular method.
2. burr/integrations/persisters/postgresql.py:362
  • Draft comment:
    The create_key method does not perform any asynchronous operations and can be converted to a regular method.
  • Reason this comment was not posted:
    Confidence changes required: 50%
    The create_key method in AsyncPostgreSQLPersister is unnecessarily defined as an async function. It does not perform any asynchronous operations and can be converted to a regular method.

Workflow ID: wflow_m5wjdK3DlH4vtpsj


You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.

Copy link
Contributor

@elijahbenizzy elijahbenizzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments, will take a look again

@@ -2,6 +2,7 @@

try:
import redis # can't name module redis because this import wouldn't work.
import redis.asyncio as aredis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking -- redis.asyncio is included in redis, right? It's not a separate library with separate dependencies?

Choice:

  • Keep async + sync redis in the same place (makes sense cause of the library)
  • Move async redis to b_aredis.py or something like that (makes sense cause it's a separate functionality and you could imagine them separating it out or using an alternative client at some point)

class AsyncRedisBasePersister(persistence.BaseStatePersister):
"""Main class for Async Redis persister.

Use this class if you want to directly control injecting the async Redis client.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

.. warning::
The synchronous persister closes the connection on deletion of the class using the ``__del__`` method.
In an async context that is not reliable (the event loop may already be closed by the time ``__del__``
gets invoked). Therefore, you are responsible for closing the connection yourself (i.e. manual cleanup).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth thinking -- can this be done by a context manager? E.G. if you use from_values() how do you close it? Might be worth either exposing the client or something.

"status": data[b"status"].decode(),
}

async def create_key(self, app_id, partition_key, sequence_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be async?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if this should subclass the synchronous redis persister to share code -- there's a lot of duplication...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively helper functions to help with common operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through both classes and there is not much shared stuff.. The annoying thing is that most code is of the pattern: talk to db -- extract some useful data (e.g. sequence_id) -- talk to db again. So all the logic that appears duplicate is sandwich between db calls that are sync/async depending on the class.

@@ -1,6 +1,7 @@
from burr.integrations import base

try:
import asyncpg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto with the above -- I think this should probably be a separate one so we don't force people who just need sync to also have async libraries

if self._initialized:
return True

async with self.connection.transaction():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a transaction? It's a single query, and it's purely a read (so it doesn't/can't be rolled back)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, also feels like we should be able to set self._initialized to be true here if _initialized is true...

partition_key = self.PARTITION_KEY_DEFAULT
logger.debug("Loading %s, %s, %s", partition_key, app_id, sequence_id)

async with self.connection.transaction():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto with transaction -- not needed I think

status,
)

async with self.connection.transaction():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should not need to be a transaction

Async implementation to persist the state to a postgresql db.

In addition adds methods to postgresql sync persister to make it
consistent. Migrates postgresql to psycopg2 module for consistent
renaming.
Implements the async function from redis-py.asyncio for state
persistance.

Add methods to sync redis persister for consistency.
The methods added make it consistent with the implementations of other
db persisters.
Migrates the MongoDBPersister to b_pymongo.py for consistent naming
convention.
@jernejfrank
Copy link
Contributor Author

I have increased the scope to include all existing persister implementations to make some consistency additions. There are 3 points to highlight:

  1. Consistency for methods across persisters:
  • .cleanup() to be able to close the connection
  • __enter__ / __exit__ or __aenter__ / __aexit__ to be used as context managers
  • cls.from_config()
  • set_serde_kwargs
  1. Naming convention for persister modules:

I opted to name the modules after the underlying dependency library. I am not a fan of that naming, but this gives us more flexibility down the line in case we implement the same persister with multiple different libraries since we can then keep the class name -- For example:

  • psycopg2.PostgreSQLPersister
  • PyGreSQL.PostgreSQLPersister
  1. New table in persister docs to highlight which implementations we have and which dependencies there are.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants