-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async db support #497
base: main
Are you sure you want to change the base?
Async db support #497
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Reviewed everything up to 304eb21 in 37 seconds
More details
- Looked at
662
lines of code in6
files - Skipped
0
files when reviewing. - Skipped posting
2
drafted comments based on config settings.
1. burr/integrations/persisters/b_redis.py:315
- Draft comment:
Thecreate_key
method does not perform any asynchronous operations and can be converted to a regular method. - Reason this comment was not posted:
Confidence changes required:50%
Thecreate_key
method inAsyncRedisBasePersister
is unnecessarily defined as an async function. It does not perform any asynchronous operations and can be converted to a regular method.
2. burr/integrations/persisters/postgresql.py:362
- Draft comment:
Thecreate_key
method does not perform any asynchronous operations and can be converted to a regular method. - Reason this comment was not posted:
Confidence changes required:50%
Thecreate_key
method inAsyncPostgreSQLPersister
is unnecessarily defined as an async function. It does not perform any asynchronous operations and can be converted to a regular method.
Workflow ID: wflow_m5wjdK3DlH4vtpsj
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet
mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments, will take a look again
@@ -2,6 +2,7 @@ | |||
|
|||
try: | |||
import redis # can't name module redis because this import wouldn't work. | |||
import redis.asyncio as aredis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checking -- redis.asyncio is included in redis, right? It's not a separate library with separate dependencies?
Choice:
- Keep async + sync redis in the same place (makes sense cause of the library)
- Move async redis to b_aredis.py or something like that (makes sense cause it's a separate functionality and you could imagine them separating it out or using an alternative client at some point)
class AsyncRedisBasePersister(persistence.BaseStatePersister): | ||
"""Main class for Async Redis persister. | ||
|
||
Use this class if you want to directly control injecting the async Redis client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean?
.. warning:: | ||
The synchronous persister closes the connection on deletion of the class using the ``__del__`` method. | ||
In an async context that is not reliable (the event loop may already be closed by the time ``__del__`` | ||
gets invoked). Therefore, you are responsible for closing the connection yourself (i.e. manual cleanup). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth thinking -- can this be done by a context manager? E.G. if you use from_values()
how do you close it? Might be worth either exposing the client or something.
"status": data[b"status"].decode(), | ||
} | ||
|
||
async def create_key(self, app_id, partition_key, sequence_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if this should subclass the synchronous redis persister to share code -- there's a lot of duplication...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively helper functions to help with common operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through both classes and there is not much shared stuff.. The annoying thing is that most code is of the pattern: talk to db -- extract some useful data (e.g. sequence_id) -- talk to db again. So all the logic that appears duplicate is sandwich between db calls that are sync/async depending on the class.
@@ -1,6 +1,7 @@ | |||
from burr.integrations import base | |||
|
|||
try: | |||
import asyncpg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto with the above -- I think this should probably be a separate one so we don't force people who just need sync to also have async libraries
if self._initialized: | ||
return True | ||
|
||
async with self.connection.transaction(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be a transaction? It's a single query, and it's purely a read (so it doesn't/can't be rolled back)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, also feels like we should be able to set self._initialized to be true here if _initialized is true...
partition_key = self.PARTITION_KEY_DEFAULT | ||
logger.debug("Loading %s, %s, %s", partition_key, app_id, sequence_id) | ||
|
||
async with self.connection.transaction(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto with transaction -- not needed I think
status, | ||
) | ||
|
||
async with self.connection.transaction(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also should not need to be a transaction
Async implementation to persist the state to a postgresql db. In addition adds methods to postgresql sync persister to make it consistent. Migrates postgresql to psycopg2 module for consistent renaming.
Implements the async function from redis-py.asyncio for state persistance. Add methods to sync redis persister for consistency.
The methods added make it consistent with the implementations of other db persisters.
Migrates the MongoDBPersister to b_pymongo.py for consistent naming convention.
9bce9c2
to
63c306c
Compare
I have increased the scope to include all existing persister implementations to make some consistency additions. There are 3 points to highlight:
I opted to name the modules after the underlying dependency library. I am not a fan of that naming, but this gives us more flexibility down the line in case we implement the same persister with multiple different libraries since we can then keep the class name -- For example:
|
Adding async support for Postgres and Redis state persisters. This expands on #488 and addresses #484 when using Postgres or Redis.
Changes
asyncpg
.redis.asyncio
How I tested this
Notes
asyncpg
uses a coroutine to open/close the connection. In this case pickling / unpickling the state needs a workaround (not yet implemented) since__getstate__
and__setstate__
do not have async support.Checklist
Important
Add asynchronous support for Redis and PostgreSQL state persisters using
redis.asyncio
andasyncpg
.AsyncRedisBasePersister
usingredis.asyncio
for async Redis operations inb_redis.py
.AsyncPostgreSQLPersister
usingasyncpg
for async PostgreSQL operations inpostgresql.py
.test_b_redis.py
forAsyncRedisBasePersister
.test_postgresql.py
forAsyncPostgreSQLPersister
.persister.rst
to includeAsyncRedisBasePersister
andAsyncPostgreSQLPersister
.asyncpg
topyproject.toml
underpostgresql
dependencies.This description was created by for 304eb21. It will automatically update as commits are pushed.